jt2594838 commented on code in PR #17780:
URL: https://github.com/apache/iotdb/pull/17780#discussion_r3400221521
##########
integration-test/src/test/java/org/apache/iotdb/subscription/it/local/IoTDBSubscriptionTopicOwnerIT.java:
##########
@@ -150,49 +142,24 @@ public void
testHeartbeatRenewsOwnerLeaseAndAlterOwnerWaitsForLeaseExpiration()
TopicConstant.OWNER_LEASE_DURATION_MS_KEY,
String.valueOf(ownerLeaseDurationMs));
session.createTopic(topicName, properties);
- final Long initialOwnerLeaseExpireTimeMs =
getOwnerLeaseExpireTimeMs(session, topicName);
- Assert.assertNotNull(initialOwnerLeaseExpireTimeMs);
- Assert.assertTrue(initialOwnerLeaseExpireTimeMs >
System.currentTimeMillis());
-
- try (final SubscriptionTreePullConsumer currentOwnerConsumer =
- new SubscriptionTreePullConsumer.Builder()
- .host(host)
- .port(port)
- .consumerId("current_sn")
- .consumerGroupId("topic_owner_lease_group")
- .ownerId("sn1")
- .ownerEpoch(5L)
- .heartbeatIntervalMs(heartbeatIntervalMs)
- .autoCommit(false)
- .buildPullConsumer()) {
- currentOwnerConsumer.open();
- currentOwnerConsumer.subscribe(topicName);
-
- final AtomicReference<Long> renewedOwnerLeaseExpireTimeMs =
- new AtomicReference<>(initialOwnerLeaseExpireTimeMs);
- IoTDBSubscriptionITConstant.AWAIT.untilAsserted(
- () -> {
- final Long ownerLeaseExpireTimeMs =
getOwnerLeaseExpireTimeMs(session, topicName);
- Assert.assertNotNull(ownerLeaseExpireTimeMs);
- Assert.assertTrue(ownerLeaseExpireTimeMs >
initialOwnerLeaseExpireTimeMs);
- renewedOwnerLeaseExpireTimeMs.set(ownerLeaseExpireTimeMs);
- });
-
+ try {
+ // Transferring to a different owner must wait for the old owner's
lease to drain on every
+ // DataNode before the new owner is installed. ConfigNode stops
renewing and waits at least
+ // the lease duration (measured on its own clock), so the call blocks
for >= the lease
+ // duration. This is the admission gate that prevents cross-DataNode
double-active
+ // consuming.
final long alterStartTimeMs = System.currentTimeMillis();
session.alterTopicOwner(topicName, "sn2", 6L);
final long alterElapsedTimeMs = System.currentTimeMillis() -
alterStartTimeMs;
-
- final long ownerLeaseRemainingTimeMs =
- renewedOwnerLeaseExpireTimeMs.get() - alterStartTimeMs;
- if (ownerLeaseRemainingTimeMs > 500L) {
- Assert.assertTrue(alterElapsedTimeMs >= ownerLeaseRemainingTimeMs -
250L);
- }
+ Assert.assertTrue(
+ "alterTopicOwner should block at least the lease drain duration,
but only took "
+ + alterElapsedTimeMs
+ + "ms",
+ alterElapsedTimeMs >= ownerLeaseDurationMs);
Review Comment:
The waiting time also depends on the renewal interval (heartbeat interval).
For example,
lease renewed at t0
owner altered at t1
lease expired at t2
then the waiting time will be t2 - t1 <= t2 - t0 = leaseDuration
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]