pjfanning commented on code in PR #4:
URL: https://github.com/apache/incubator-pekko/pull/4#discussion_r1284704564


##########
stream/src/main/scala/org/apache/pekko/stream/impl/Timers.scala:
##########
@@ -268,33 +268,29 @@ import pekko.stream.stage._
             push(out, grab(in))
             if (isClosed(in)) completeStage()
             else pull(in)
-          } else {
-            val now = System.nanoTime()
-            // Idle timeout triggered a while ago and we were just waiting for 
pull.
-            // In the case of now == deadline, the deadline has not passed 
strictly, but scheduling another thunk
-            // for that seems wasteful.
-            if (now - nextDeadline >= 0) {
-              nextDeadline = now + timeout.toNanos
-              push(out, inject())
-            } else
-              scheduleOnce(GraphStageLogicTimer, FiniteDuration(nextDeadline - 
now, TimeUnit.NANOSECONDS))
-          }
+          } else emitInjectedElementOrReschedule(onTimer = false)
         }
 
-        override protected def onTimer(timerKey: Any): Unit = {
+        private def emitInjectedElementOrReschedule(onTimer: Boolean): Unit = {
           val now = System.nanoTime()
-          // Timer is reliably cancelled if a regular element arrives first. 
Scheduler rather schedules too late
-          // than too early so the deadline must have passed at this time.
-          assert(
-            now - nextDeadline >= 0,
-            s"Timer should have triggered only after deadline but now is $now 
and deadline was $nextDeadline diff ${now - nextDeadline}.")
-          push(out, inject())
-          nextDeadline = now + timeout.toNanos
+          val diff = now - nextDeadline
+          if (diff < 0) {
+            if (onTimer) {
+              // Clock may be non-monotonic, see 
https://stackoverflow.com/questions/51344787/in-what-cases-clock-monotonic-might-not-be-available
+              log.info(
+                s"Timer should have triggered only after deadline but now is 
$now and deadline was $nextDeadline diff $diff. (time running backwards?) 
Reschedule instead of emitting.")
+            }
+            scheduleOnce(GraphStageLogicTimer, FiniteDuration(-diff, 
TimeUnit.NANOSECONDS))
+          } else {
+            push(out, inject())
+            nextDeadline = now + timeout.toNanos
+          }
         }
-      }
 
-    override def toString = "IdleTimer"
+        override protected def onTimer(timerKey: Any): Unit = 
emitInjectedElementOrReschedule(onTimer = true)
+      }
 
+    override def toString = "IdleInject"

Review Comment:
   why change the `toString` ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to