This is an automated email from the ASF dual-hosted git repository. sijie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push: new 459ec6e Use System.nanoTime() instead of System.currentTimeMillis() (#6454) 459ec6e is described below commit 459ec6e8386f6d67e966321dcd0558d15c96205b Author: Rolf Arne Corneliussen <rac...@users.noreply.github.com> AuthorDate: Tue Mar 3 08:21:58 2020 +0100 Use System.nanoTime() instead of System.currentTimeMillis() (#6454) Fixes #6453 ### Motivation `ConsumerBase` and `ProducerImpl` use `System.currentTimeMillis()` to measure the elapsed time in the 'operations' inner classes (`ConsumerBase$OpBatchReceive` and `ProducerImpl$OpSendMsg`). An instance variable `createdAt` is initialized with `System.currentTimeMills()`, but it is not used for reading wall clock time, the variable is only used for computing elapsed time (e.g. timeout for a batch). When the variable is used to compute elapsed time, it would more sense to use `System.nanoTime()`. ### Modifications The instance variable `createdAt` in `ConsumerBase$OpBatchReceive` and `ProducerImpl$OpSendMsg` is initialized with `System.nanoTime()`. Usage of the variable is updated to reflect that the variable holds nano time; computations of elapsed time takes the difference between the current system nano time and the `createdAt` variable. The `createdAt` field is package protected, and is currently only used in the declaring class and outer class, limiting the chances for unwanted side effects. --- .../java/org/apache/pulsar/client/impl/ConsumerBase.java | 9 +++++---- .../java/org/apache/pulsar/client/impl/ProducerImpl.java | 15 ++++++++------- 2 files changed, 13 insertions(+), 11 deletions(-) diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java index 0f5219b..de9ec85 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java @@ -287,7 +287,7 @@ public abstract class ConsumerBase<T> extends HandlerState implements TimerTask, public void negativeAcknowledge(Message<?> message) { negativeAcknowledge(message.getMessageId()); } - + protected CompletableFuture<Void> doAcknowledgeWithTxn(MessageId messageId, AckType ackType, Map<String,Long> properties, TransactionImpl txn) { @@ -507,7 +507,7 @@ public abstract class ConsumerBase<T> extends HandlerState implements TimerTask, private OpBatchReceive(CompletableFuture<Messages<T>> future) { this.future = future; - this.createdAt = System.currentTimeMillis(); + this.createdAt = System.nanoTime(); } static <T> OpBatchReceive<T> of(CompletableFuture<Messages<T>> future) { @@ -566,8 +566,9 @@ public abstract class ConsumerBase<T> extends HandlerState implements TimerTask, while (firstOpBatchReceive != null) { // If there is at least one batch receive, calculate the diff between the batch receive timeout - // and the current time. - long diff = (firstOpBatchReceive.createdAt + batchReceivePolicy.getTimeoutMs()) - System.currentTimeMillis(); + // and the elapsed time since the operation was created. + long diff = batchReceivePolicy.getTimeoutMs() + - TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - firstOpBatchReceive.createdAt); if (diff <= 0) { // The diff is less than or equal to zero, meaning that the batch receive has been timed out. // complete the OpBatchReceive and continue to check the next OpBatchReceive in pendingBatchReceives. diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java index 776300d..eb05909 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java @@ -987,7 +987,7 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne op.cmd = cmd; op.callback = callback; op.sequenceId = sequenceId; - op.createdAt = System.currentTimeMillis(); + op.createdAt = System.nanoTime(); return op; } @@ -997,7 +997,7 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne op.cmd = cmd; op.callback = callback; op.sequenceId = sequenceId; - op.createdAt = System.currentTimeMillis(); + op.createdAt = System.nanoTime(); return op; } @@ -1009,7 +1009,7 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne op.callback = callback; op.sequenceId = lowestSequenceId; op.highestSequenceId = highestSequenceId; - op.createdAt = System.currentTimeMillis(); + op.createdAt = System.nanoTime(); return op; } @@ -1307,9 +1307,10 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne // If there are no pending messages, reset the timeout to the configured value. timeToWaitMs = conf.getSendTimeoutMs(); } else { - // If there is at least one message, calculate the diff between the message timeout and the current - // time. - long diff = (firstMsg.createdAt + conf.getSendTimeoutMs()) - System.currentTimeMillis(); + // If there is at least one message, calculate the diff between the message timeout and the elapsed + // time since first message was created. + long diff = conf.getSendTimeoutMs() + - TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - firstMsg.createdAt); if (diff <= 0) { // The diff is less than or equal to zero, meaning that the message has been timed out. // Set the callback to timeout on every message, then clear the pending queue. @@ -1560,7 +1561,7 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne public long getDelayInMillis() { OpSendMsg firstMsg = pendingMessages.peek(); if (firstMsg != null) { - return System.currentTimeMillis() - firstMsg.createdAt; + return TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - firstMsg.createdAt); } return 0L; }