[GitHub] [kafka] lbradstreet commented on a change in pull request #8517: MINOR: use monotonic clock for replica fetcher DelayedItem

2020-04-23 Thread GitBox


lbradstreet commented on a change in pull request #8517:
URL: https://github.com/apache/kafka/pull/8517#discussion_r413563167



##
File path: 
core/src/test/scala/unit/kafka/server/ReplicaAlterLogDirsThreadTest.scala
##
@@ -60,6 +61,7 @@ class ReplicaAlterLogDirsThreadTest {
 
 when(replicaManager.futureLogExists(t1p0)).thenReturn(false)
 
+val time = new SystemTime()

Review comment:
   Makes sense, I followed the abstract fetcher tests which created new 
SystemTime(s) too. I'll see if it's easy to make SystemTime package private.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] lbradstreet commented on a change in pull request #8517: MINOR: use monotonic clock for replica fetcher DelayedItem

2020-04-23 Thread GitBox


lbradstreet commented on a change in pull request #8517:
URL: https://github.com/apache/kafka/pull/8517#discussion_r413563167



##
File path: 
core/src/test/scala/unit/kafka/server/ReplicaAlterLogDirsThreadTest.scala
##
@@ -60,6 +61,7 @@ class ReplicaAlterLogDirsThreadTest {
 
 when(replicaManager.futureLogExists(t1p0)).thenReturn(false)
 
+val time = new SystemTime()

Review comment:
   Makes sense.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] lbradstreet commented on a change in pull request #8517: MINOR: use monotonic clock for replica fetcher DelayedItem

2020-04-22 Thread GitBox


lbradstreet commented on a change in pull request #8517:
URL: https://github.com/apache/kafka/pull/8517#discussion_r413086046



##
File path: core/src/main/scala/kafka/utils/DelayedItem.scala
##
@@ -21,24 +21,19 @@ import java.util.concurrent._
 
 import org.apache.kafka.common.utils.Time
 
-import scala.math._
+class DelayedItem(val delayMs: Long, val time: Time) extends Logging {
+  def this(delayMs: Long) = this(delayMs, Time.SYSTEM)

Review comment:
   Thanks, yes, I was just coming back to say your suggestion was better 
for testability reasons.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] lbradstreet commented on a change in pull request #8517: MINOR: use monotonic clock for replica fetcher DelayedItem

2020-04-22 Thread GitBox


lbradstreet commented on a change in pull request #8517:
URL: https://github.com/apache/kafka/pull/8517#discussion_r413085641



##
File path: core/src/main/scala/kafka/utils/DelayedItem.scala
##
@@ -21,24 +21,19 @@ import java.util.concurrent._
 
 import org.apache.kafka.common.utils.Time
 
-import scala.math._
+class DelayedItem(val delayMs: Long, val time: Time) extends Logging {

Review comment:
   Good point. I'll make it private.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] lbradstreet commented on a change in pull request #8517: MINOR: use monotonic clock for replica fetcher DelayedItem

2020-04-22 Thread GitBox


lbradstreet commented on a change in pull request #8517:
URL: https://github.com/apache/kafka/pull/8517#discussion_r413074912



##
File path: core/src/main/scala/kafka/utils/DelayedItem.scala
##
@@ -21,24 +21,19 @@ import java.util.concurrent._
 
 import org.apache.kafka.common.utils.Time
 
-import scala.math._
+class DelayedItem(val delayMs: Long, val time: Time) extends Logging {
+  def this(delayMs: Long) = this(delayMs, Time.SYSTEM)

Review comment:
   We can, I was just trying to avoid the replica fetcher having to pass 
Time.SYSTEM in everywhere. If you prefer that I can change it. Is there any 
downside what I did though?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] lbradstreet commented on a change in pull request #8517: MINOR: use monotonic clock for replica fetcher DelayedItem

2020-04-21 Thread GitBox


lbradstreet commented on a change in pull request #8517:
URL: https://github.com/apache/kafka/pull/8517#discussion_r412313524



##
File path: core/src/main/scala/kafka/utils/DelayedItem.scala
##
@@ -21,24 +21,23 @@ import java.util.concurrent._
 
 import org.apache.kafka.common.utils.Time
 
-import scala.math._
+class DelayedItem(val delayMs: Long, val time: Time) extends Logging {
+  def this(delayMs: Long) = this(delayMs, Time.SYSTEM)
 
-class DelayedItem(val delayMs: Long) extends Delayed with Logging {
-
-  private val dueMs = Time.SYSTEM.milliseconds + delayMs
-
-  def this(delay: Long, unit: TimeUnit) = this(unit.toMillis(delay))
+  private val dueNs = time.nanoseconds + TimeUnit.MILLISECONDS.toNanos(delayMs)
 
   /**
-   * The remaining delay time
+   * true if the item is still delayed
*/
-  def getDelay(unit: TimeUnit): Long = {
-unit.convert(max(dueMs - Time.SYSTEM.milliseconds, 0), 
TimeUnit.MILLISECONDS)
+  def isDelayed: Boolean = {
+time.nanoseconds < dueNs
   }
 
-  def compareTo(d: Delayed): Int = {
-val other = d.asInstanceOf[DelayedItem]
-java.lang.Long.compare(dueMs, other.dueMs)
+  def compareTo(d: DelayedItem): Int = {

Review comment:
   Done





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] lbradstreet commented on a change in pull request #8517: MINOR: use monotonic clock for replica fetcher DelayedItem

2020-04-21 Thread GitBox


lbradstreet commented on a change in pull request #8517:
URL: https://github.com/apache/kafka/pull/8517#discussion_r412305902



##
File path: core/src/main/scala/kafka/utils/DelayedItem.scala
##
@@ -21,24 +21,23 @@ import java.util.concurrent._
 
 import org.apache.kafka.common.utils.Time
 
-import scala.math._
+class DelayedItem(val delayMs: Long, val time: Time) extends Logging {
+  def this(delayMs: Long) = this(delayMs, Time.SYSTEM)
 
-class DelayedItem(val delayMs: Long) extends Delayed with Logging {
-
-  private val dueMs = Time.SYSTEM.milliseconds + delayMs
-
-  def this(delay: Long, unit: TimeUnit) = this(unit.toMillis(delay))
+  private val dueNs = time.nanoseconds + TimeUnit.MILLISECONDS.toNanos(delayMs)
 
   /**
-   * The remaining delay time
+   * true if the item is still delayed
*/
-  def getDelay(unit: TimeUnit): Long = {
-unit.convert(max(dueMs - Time.SYSTEM.milliseconds, 0), 
TimeUnit.MILLISECONDS)
+  def isDelayed: Boolean = {
+time.nanoseconds < dueNs
   }
 
-  def compareTo(d: Delayed): Int = {
-val other = d.asInstanceOf[DelayedItem]
-java.lang.Long.compare(dueMs, other.dueMs)
+  def compareTo(d: DelayedItem): Int = {

Review comment:
   Makes sense. I'll remove this.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] lbradstreet commented on a change in pull request #8517: MINOR: use monotonic clock for replica fetcher DelayedItem

2020-04-20 Thread GitBox


lbradstreet commented on a change in pull request #8517:
URL: https://github.com/apache/kafka/pull/8517#discussion_r411866242



##
File path: core/src/main/scala/kafka/utils/DelayedItem.scala
##
@@ -21,24 +21,24 @@ import java.util.concurrent._
 
 import org.apache.kafka.common.utils.Time
 
-import scala.math._
+class DelayedItem(val delayMs: Long, val time: Time) extends Logging {
+  def this(delayMs: Long) = this(delayMs, Time.SYSTEM)
 
-class DelayedItem(val delayMs: Long) extends Delayed with Logging {
-
-  private val dueMs = Time.SYSTEM.milliseconds + delayMs
-
-  def this(delay: Long, unit: TimeUnit) = this(unit.toMillis(delay))
+  private val dueNs = time.nanoseconds + TimeUnit.MILLISECONDS.toNanos(delayMs)
 
   /**
-   * The remaining delay time
+   * true if the item is still delayed
*/
-  def getDelay(unit: TimeUnit): Long = {
-unit.convert(max(dueMs - Time.SYSTEM.milliseconds, 0), 
TimeUnit.MILLISECONDS)
+  def isDelayed: Boolean = {
+dueNs - time.nanoseconds > 0

Review comment:
   Fixed





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] lbradstreet commented on a change in pull request #8517: MINOR: use monotonic clock for replica fetcher DelayedItem

2020-04-20 Thread GitBox


lbradstreet commented on a change in pull request #8517:
URL: https://github.com/apache/kafka/pull/8517#discussion_r411866042



##
File path: core/src/main/scala/kafka/utils/DelayedItem.scala
##
@@ -21,24 +21,24 @@ import java.util.concurrent._
 
 import org.apache.kafka.common.utils.Time
 
-import scala.math._
+class DelayedItem(val delayMs: Long, val time: Time) extends Logging {
+  def this(delayMs: Long) = this(delayMs, Time.SYSTEM)
 
-class DelayedItem(val delayMs: Long) extends Delayed with Logging {
-
-  private val dueMs = Time.SYSTEM.milliseconds + delayMs
-
-  def this(delay: Long, unit: TimeUnit) = this(unit.toMillis(delay))
+  private val dueNs = time.nanoseconds + TimeUnit.MILLISECONDS.toNanos(delayMs)
 
   /**
-   * The remaining delay time
+   * true if the item is still delayed
*/
-  def getDelay(unit: TimeUnit): Long = {
-unit.convert(max(dueMs - Time.SYSTEM.milliseconds, 0), 
TimeUnit.MILLISECONDS)
+  def isDelayed: Boolean = {
+dueNs - time.nanoseconds > 0
   }
 
   def compareTo(d: Delayed): Int = {
 val other = d.asInstanceOf[DelayedItem]
-java.lang.Long.compare(dueMs, other.dueMs)
+java.lang.Long.compare(dueNs, other.dueNs)
   }
 
+  override def toString: String = {
+"DelayedItem(delayMs="+(dueNs-time.nanoseconds())+")"

Review comment:
   I changed it to convert to millis since nanoseconds aren't very readable 
and this is closer to the existing behavior.

##
File path: core/src/main/scala/kafka/utils/DelayedItem.scala
##
@@ -21,24 +21,24 @@ import java.util.concurrent._
 
 import org.apache.kafka.common.utils.Time
 
-import scala.math._
+class DelayedItem(val delayMs: Long, val time: Time) extends Logging {
+  def this(delayMs: Long) = this(delayMs, Time.SYSTEM)
 
-class DelayedItem(val delayMs: Long) extends Delayed with Logging {
-
-  private val dueMs = Time.SYSTEM.milliseconds + delayMs
-
-  def this(delay: Long, unit: TimeUnit) = this(unit.toMillis(delay))
+  private val dueNs = time.nanoseconds + TimeUnit.MILLISECONDS.toNanos(delayMs)
 
   /**
-   * The remaining delay time
+   * true if the item is still delayed
*/
-  def getDelay(unit: TimeUnit): Long = {
-unit.convert(max(dueMs - Time.SYSTEM.milliseconds, 0), 
TimeUnit.MILLISECONDS)
+  def isDelayed: Boolean = {
+dueNs - time.nanoseconds > 0
   }
 
   def compareTo(d: Delayed): Int = {

Review comment:
   Fixed





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] lbradstreet commented on a change in pull request #8517: MINOR: use monotonic clock for replica fetcher DelayedItem

2020-04-20 Thread GitBox


lbradstreet commented on a change in pull request #8517:
URL: https://github.com/apache/kafka/pull/8517#discussion_r411864699



##
File path: core/src/main/scala/kafka/utils/DelayedItem.scala
##
@@ -21,24 +21,24 @@ import java.util.concurrent._
 
 import org.apache.kafka.common.utils.Time
 
-import scala.math._
+class DelayedItem(val delayMs: Long, val time: Time) extends Logging {
+  def this(delayMs: Long) = this(delayMs, Time.SYSTEM)
 
-class DelayedItem(val delayMs: Long) extends Delayed with Logging {
-
-  private val dueMs = Time.SYSTEM.milliseconds + delayMs
-
-  def this(delay: Long, unit: TimeUnit) = this(unit.toMillis(delay))
+  private val dueNs = time.nanoseconds + TimeUnit.MILLISECONDS.toNanos(delayMs)
 
   /**
-   * The remaining delay time
+   * true if the item is still delayed
*/
-  def getDelay(unit: TimeUnit): Long = {
-unit.convert(max(dueMs - Time.SYSTEM.milliseconds, 0), 
TimeUnit.MILLISECONDS)
+  def isDelayed: Boolean = {
+dueNs - time.nanoseconds > 0
   }
 
   def compareTo(d: Delayed): Int = {

Review comment:
   Good point. I'll remove that. I don't think we use it.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] lbradstreet commented on a change in pull request #8517: MINOR: use monotonic clock for replica fetcher DelayedItem

2020-04-20 Thread GitBox


lbradstreet commented on a change in pull request #8517:
URL: https://github.com/apache/kafka/pull/8517#discussion_r411864205



##
File path: core/src/main/scala/kafka/utils/DelayedItem.scala
##
@@ -21,24 +21,24 @@ import java.util.concurrent._
 
 import org.apache.kafka.common.utils.Time
 
-import scala.math._
+class DelayedItem(val delayMs: Long, val time: Time) extends Logging {
+  def this(delayMs: Long) = this(delayMs, Time.SYSTEM)
 
-class DelayedItem(val delayMs: Long) extends Delayed with Logging {
-
-  private val dueMs = Time.SYSTEM.milliseconds + delayMs
-
-  def this(delay: Long, unit: TimeUnit) = this(unit.toMillis(delay))
+  private val dueNs = time.nanoseconds + TimeUnit.MILLISECONDS.toNanos(delayMs)

Review comment:
   Any idea how to efficiently handle the overflow here? I think it's very 
unlikely we'll ever hit it given our normal delays.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] lbradstreet commented on a change in pull request #8517: MINOR: use monotonic clock for replica fetcher DelayedItem

2020-04-20 Thread GitBox


lbradstreet commented on a change in pull request #8517:
URL: https://github.com/apache/kafka/pull/8517#discussion_r411863881



##
File path: core/src/main/scala/kafka/utils/DelayedItem.scala
##
@@ -21,24 +21,24 @@ import java.util.concurrent._
 
 import org.apache.kafka.common.utils.Time
 
-import scala.math._
+class DelayedItem(val delayMs: Long, val time: Time) extends Logging {
+  def this(delayMs: Long) = this(delayMs, Time.SYSTEM)
 
-class DelayedItem(val delayMs: Long) extends Delayed with Logging {
-
-  private val dueMs = Time.SYSTEM.milliseconds + delayMs
-
-  def this(delay: Long, unit: TimeUnit) = this(unit.toMillis(delay))
+  private val dueNs = time.nanoseconds + TimeUnit.MILLISECONDS.toNanos(delayMs)
 
   /**
-   * The remaining delay time
+   * true if the item is still delayed
*/
-  def getDelay(unit: TimeUnit): Long = {
-unit.convert(max(dueMs - Time.SYSTEM.milliseconds, 0), 
TimeUnit.MILLISECONDS)
+  def isDelayed: Boolean = {
+dueNs - time.nanoseconds > 0
   }
 
   def compareTo(d: Delayed): Int = {
 val other = d.asInstanceOf[DelayedItem]
-java.lang.Long.compare(dueMs, other.dueMs)
+java.lang.Long.compare(dueNs, other.dueNs)
   }
 
+  override def toString: String = {
+"DelayedItem(delayMs="+(dueNs-time.nanoseconds())+")"

Review comment:
   Thanks, I actually forgot the unit conversion here. I was thinking it'd 
still be nicer to print it in ms. What do you think?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] lbradstreet commented on a change in pull request #8517: MINOR: use monotonic clock for replica fetcher DelayedItem

2020-04-20 Thread GitBox


lbradstreet commented on a change in pull request #8517:
URL: https://github.com/apache/kafka/pull/8517#discussion_r411863591



##
File path: core/src/main/scala/kafka/utils/DelayedItem.scala
##
@@ -21,24 +21,24 @@ import java.util.concurrent._
 
 import org.apache.kafka.common.utils.Time
 
-import scala.math._
+class DelayedItem(val delayMs: Long, val time: Time) extends Logging {
+  def this(delayMs: Long) = this(delayMs, Time.SYSTEM)
 
-class DelayedItem(val delayMs: Long) extends Delayed with Logging {
-
-  private val dueMs = Time.SYSTEM.milliseconds + delayMs
-
-  def this(delay: Long, unit: TimeUnit) = this(unit.toMillis(delay))
+  private val dueNs = time.nanoseconds + TimeUnit.MILLISECONDS.toNanos(delayMs)
 
   /**
-   * The remaining delay time
+   * true if the item is still delayed
*/
-  def getDelay(unit: TimeUnit): Long = {
-unit.convert(max(dueMs - Time.SYSTEM.milliseconds, 0), 
TimeUnit.MILLISECONDS)
+  def isDelayed: Boolean = {
+dueNs - time.nanoseconds > 0

Review comment:
   Thanks, that was a lazy code conversion





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] lbradstreet commented on a change in pull request #8517: MINOR: use monotonic clock for replica fetcher DelayedItem

2020-04-20 Thread GitBox


lbradstreet commented on a change in pull request #8517:
URL: https://github.com/apache/kafka/pull/8517#discussion_r411840027



##
File path: core/src/main/scala/kafka/utils/DelayedItem.scala
##
@@ -33,7 +33,11 @@ class DelayedItem(val delayMs: Long) extends Delayed with 
Logging {
* The remaining delay time
*/
   def getDelay(unit: TimeUnit): Long = {
-unit.convert(max(dueMs - Time.SYSTEM.milliseconds, 0), 
TimeUnit.MILLISECONDS)
+unit.convert(getDelayMs, TimeUnit.MILLISECONDS)

Review comment:
   @ijuma I have removed getDelay and we no longer implement Delayed. In 
doing so I also switched the fetcher to a monotonic clock, as our existing 
implementation is dangerous.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org