This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 459ec6e  Use System.nanoTime() instead of System.currentTimeMillis() 
(#6454)
459ec6e is described below

commit 459ec6e8386f6d67e966321dcd0558d15c96205b
Author: Rolf Arne Corneliussen <rac...@users.noreply.github.com>
AuthorDate: Tue Mar 3 08:21:58 2020 +0100

    Use System.nanoTime() instead of System.currentTimeMillis() (#6454)
    
    Fixes #6453
    
    ### Motivation
    `ConsumerBase` and `ProducerImpl` use `System.currentTimeMillis()` to 
measure the elapsed time in the 'operations' inner classes 
(`ConsumerBase$OpBatchReceive` and `ProducerImpl$OpSendMsg`).
    
    An instance variable `createdAt` is initialized with 
`System.currentTimeMills()`, but it is not used for reading wall clock time, 
the variable is only used for computing elapsed time (e.g. timeout for a batch).
    
    When the variable is used to compute elapsed time, it would more sense to 
use `System.nanoTime()`.
    
    ### Modifications
    
    The instance variable `createdAt` in `ConsumerBase$OpBatchReceive` and  
`ProducerImpl$OpSendMsg` is initialized with `System.nanoTime()`. Usage of the 
variable is updated to reflect that the variable holds nano time; computations 
of elapsed time takes the difference between the current system nano time and 
the `createdAt` variable.
    
    The `createdAt` field is package protected, and is currently only used in 
the declaring class and outer class, limiting the chances for unwanted side 
effects.
---
 .../java/org/apache/pulsar/client/impl/ConsumerBase.java  |  9 +++++----
 .../java/org/apache/pulsar/client/impl/ProducerImpl.java  | 15 ++++++++-------
 2 files changed, 13 insertions(+), 11 deletions(-)

diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
index 0f5219b..de9ec85 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
@@ -287,7 +287,7 @@ public abstract class ConsumerBase<T> extends HandlerState 
implements TimerTask,
     public void negativeAcknowledge(Message<?> message) {
         negativeAcknowledge(message.getMessageId());
     }
-  
+
     protected CompletableFuture<Void> doAcknowledgeWithTxn(MessageId 
messageId, AckType ackType,
                                                            Map<String,Long> 
properties,
                                                            TransactionImpl 
txn) {
@@ -507,7 +507,7 @@ public abstract class ConsumerBase<T> extends HandlerState 
implements TimerTask,
 
         private OpBatchReceive(CompletableFuture<Messages<T>> future) {
             this.future = future;
-            this.createdAt = System.currentTimeMillis();
+            this.createdAt = System.nanoTime();
         }
 
         static <T> OpBatchReceive<T> of(CompletableFuture<Messages<T>> future) 
{
@@ -566,8 +566,9 @@ public abstract class ConsumerBase<T> extends HandlerState 
implements TimerTask,
 
             while (firstOpBatchReceive != null) {
                 // If there is at least one batch receive, calculate the diff 
between the batch receive timeout
-                // and the current time.
-                long diff = (firstOpBatchReceive.createdAt + 
batchReceivePolicy.getTimeoutMs()) - System.currentTimeMillis();
+                // and the elapsed time since the operation was created.
+                long diff = batchReceivePolicy.getTimeoutMs()
+                        - TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - 
firstOpBatchReceive.createdAt);
                 if (diff <= 0) {
                     // The diff is less than or equal to zero, meaning that 
the batch receive has been timed out.
                     // complete the OpBatchReceive and continue to check the 
next OpBatchReceive in pendingBatchReceives.
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
index 776300d..eb05909 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
@@ -987,7 +987,7 @@ public class ProducerImpl<T> extends ProducerBase<T> 
implements TimerTask, Conne
             op.cmd = cmd;
             op.callback = callback;
             op.sequenceId = sequenceId;
-            op.createdAt = System.currentTimeMillis();
+            op.createdAt = System.nanoTime();
             return op;
         }
 
@@ -997,7 +997,7 @@ public class ProducerImpl<T> extends ProducerBase<T> 
implements TimerTask, Conne
             op.cmd = cmd;
             op.callback = callback;
             op.sequenceId = sequenceId;
-            op.createdAt = System.currentTimeMillis();
+            op.createdAt = System.nanoTime();
             return op;
         }
 
@@ -1009,7 +1009,7 @@ public class ProducerImpl<T> extends ProducerBase<T> 
implements TimerTask, Conne
             op.callback = callback;
             op.sequenceId = lowestSequenceId;
             op.highestSequenceId = highestSequenceId;
-            op.createdAt = System.currentTimeMillis();
+            op.createdAt = System.nanoTime();
             return op;
         }
 
@@ -1307,9 +1307,10 @@ public class ProducerImpl<T> extends ProducerBase<T> 
implements TimerTask, Conne
                 // If there are no pending messages, reset the timeout to the 
configured value.
                 timeToWaitMs = conf.getSendTimeoutMs();
             } else {
-                // If there is at least one message, calculate the diff 
between the message timeout and the current
-                // time.
-                long diff = (firstMsg.createdAt + conf.getSendTimeoutMs()) - 
System.currentTimeMillis();
+                // If there is at least one message, calculate the diff 
between the message timeout and the elapsed
+                // time since first message was created.
+                long diff = conf.getSendTimeoutMs()
+                        - TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - 
firstMsg.createdAt);
                 if (diff <= 0) {
                     // The diff is less than or equal to zero, meaning that 
the message has been timed out.
                     // Set the callback to timeout on every message, then 
clear the pending queue.
@@ -1560,7 +1561,7 @@ public class ProducerImpl<T> extends ProducerBase<T> 
implements TimerTask, Conne
     public long getDelayInMillis() {
         OpSendMsg firstMsg = pendingMessages.peek();
         if (firstMsg != null) {
-            return System.currentTimeMillis() - firstMsg.createdAt;
+            return TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - 
firstMsg.createdAt);
         }
         return 0L;
     }

Reply via email to