This is an automated email from the ASF dual-hosted git repository.

vanzin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-livy.git


The following commit(s) were added to refs/heads/master by this push:
     new 8c7f03b  [LIVY-513][LIVY-517][TEST] Fix SessionHeartbeatSpec 
flakiness, occasional NPE
8c7f03b is described below

commit 8c7f03b4a8c5fae73c14bac7d97c62cc0fd6241d
Author: Gyorgy Gal <gal.gyo...@gmail.com>
AuthorDate: Tue Mar 26 17:50:15 2019 -0700

    [LIVY-513][LIVY-517][TEST] Fix SessionHeartbeatSpec flakiness, occasional 
NPE
    
    ## What changes were proposed in this pull request?
    
    This PR aims to fix the following issues in the test suite 
`SessionHeartbeatSpec`:
    
     * Since there is no implementation provided for `stop()` in 
`mock[TestSession]`, Mockito returns `null` by default and this causes 
`SessionManager#delete` to throw
       `NullPointerException` when executing `map()` on the `Future[Unit]` 
object it expects.
       This can be observed in the unit test output 
(`server/target/unit-tests.log`):
    
    ```
    19/03/25 18:20:30.123 ScalaTest-main-running-SessionHeartbeatSpec INFO 
SessionHeartbeatSpec$$anonfun$2$TestWatchdog$1: Registering new session 0
    19/03/25 18:20:30.123 ScalaTest-main-running-SessionHeartbeatSpec INFO 
SessionHeartbeatSpec$$anonfun$2$TestWatchdog$1: Registering new session 1
    19/03/25 18:20:30.124 ScalaTest-main-running-SessionHeartbeatSpec INFO 
SessionHeartbeatSpec$$anonfun$2$TestWatchdog$1: Session 0 expired. Last 
heartbeat is at null.
    19/03/25 18:20:30.124 ScalaTest-main-running-SessionHeartbeatSpec WARN 
SessionHeartbeatSpec$$anonfun$2$TestWatchdog$1: Exception was thrown when 
deleting expired session 0
    java.lang.NullPointerException
            at 
org.apache.livy.sessions.SessionManager.delete(SessionManager.scala:123)
            at 
org.apache.livy.server.interactive.SessionHeartbeatWatchdog$$anonfun$deleteExpiredSessions$1.apply(SessionHeartbeat.scala:113)
            at 
org.apache.livy.server.interactive.SessionHeartbeatWatchdog$$anonfun$deleteExpiredSessions$1.apply(SessionHeartbeat.scala:110)
            at scala.collection.Iterator$class.foreach(Iterator.scala:891)
            at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
            at 
scala.collection.MapLike$DefaultValuesIterable.foreach(MapLike.scala:206)
            at 
org.apache.livy.server.interactive.SessionHeartbeatWatchdog$class.deleteExpiredSessions(SessionHeartbeat.scala:110)
            at 
org.apache.livy.server.interactive.SessionHeartbeatSpec$$anonfun$2$TestWatchdog$1.deleteExpiredSessions(SessionHeartbeatSpec.scala:60)
    ```
    
     * In some rare cases, the SessionManager GC thread may run 
`collectGarbage()` after the test sessions are registered but before the test 
is finished. The GC thread will also attempt to delete the `mock[TestSession]` 
objects because the mocked session's `lastActivity()` function returns 0 by 
default which means`currentTime - session.lastActivity > sessionTimeout` will 
always be true. This causes a race condition that can have the following 
outcomes:
    
         1. The `stop()` method may be called twice (both by 
`SessionHeartbeatWatchdog#deleteExpiredSessions` and 
`SessionManager#collectGarbage`), resulting in a `TooManyActualInvocations` 
exception (LIVY-513):
    
                ```
                  org.mockito.exceptions.verification.TooManyActualInvocations: 
testSession$1.stop();
                Wanted 1 time:
                -> at 
org.apache.livy.server.interactive.SessionHeartbeatSpec$$anonfun$2$$anonfun$apply$mcV$sp$5.apply$mcV$sp(SessionHeartbeatSpec.scala:93)
                But was 2 times. Undesired invocation:
                -> at 
org.apache.livy.sessions.SessionManager.delete(SessionManager.scala:124)
                  at 
org.apache.livy.server.interactive.SessionHeartbeatSpec$$anonfun$2$$anonfun$apply$mcV$sp$5.apply$mcV$sp(SessionHeartbeatSpec.scala:93)
                  at 
org.apache.livy.server.interactive.SessionHeartbeatSpec$$anonfun$2$$anonfun$apply$mcV$sp$5.apply(SessionHeartbeatSpec.scala:70)
                  at 
org.apache.livy.server.interactive.SessionHeartbeatSpec$$anonfun$2$$anonfun$apply$mcV$sp$5.apply(SessionHeartbeatSpec.scala:70)
                  at 
org.scalatest.Transformer$$anonfun$apply$1.apply$mcV$sp(Transformer.scala:22)
                  at org.scalatest.OutcomeOf$class.outcomeOf(OutcomeOf.scala:85)
                  at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
                  at org.scalatest.Transformer.apply(Transformer.scala:22)
                  at org.scalatest.Transformer.apply(Transformer.scala:20)
                  at 
org.scalatest.FunSpecLike$$anon$1.apply(FunSpecLike.scala:422)
                  at org.scalatest.Suite$class.withFixture(Suite.scala:1122)
                  ...
                ```
    
         2. The `stop()` method may be called by `collectGarbage()` after 
`deleteExpiredSessions()`, but before the test is finished, which causes the 
following error message to appear on the console (LIVY-517).
    
                ```
                Deleting Mock for TestSession$1, hashCode: 1666638753 because 
it was inactive for more than 3600000.0 ms.
                Exception in thread "session gc thread" 
java.lang.NullPointerException
                        at 
org.apache.livy.sessions.SessionManager.delete(SessionManager.scala:124)
                        at 
org.apache.livy.sessions.SessionManager$$anonfun$collectGarbage$2.apply(SessionManager.scala:171)
                        at 
org.apache.livy.sessions.SessionManager$$anonfun$collectGarbage$2.apply(SessionManager.scala:168)
                        at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
                        at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
                        at 
scala.collection.immutable.List.foreach(List.scala:392)
                        at 
scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
                        at scala.collection.immutable.List.map(List.scala:296)
                        at 
org.apache.livy.sessions.SessionManager.collectGarbage(SessionManager.scala:168)
                        at 
org.apache.livy.sessions.SessionManager$GarbageCollector.run(SessionManager.scala:201)
                ```
    
    These race conditions can be triggered easily by inserting 
`Thread.sleep(x)` (where x is between 10 to 50) before the loop in 
`org.apache.livy.sessions.SessionManager.GarbageCollector#run` and rerunning 
the test.
    
    With this change applied, the SessionManager GC should not attempt to 
collect the `mock[TestSession]` objects created by the test, so the race 
condition should be eliminated and the flakiness of the test should be fixed.
    
    ## How was this patch tested?
    
    By running the existing UTs.
    
    Author: Gyorgy Gal <gal.gyo...@gmail.com>
    
    Closes #163 from gyogal/LIVY-513.
---
 .../org/apache/livy/server/interactive/SessionHeartbeatSpec.scala    | 5 +++++
 1 file changed, 5 insertions(+)

diff --git 
a/server/src/test/scala/org/apache/livy/server/interactive/SessionHeartbeatSpec.scala
 
b/server/src/test/scala/org/apache/livy/server/interactive/SessionHeartbeatSpec.scala
index 6d2cff8..074b9c2 100644
--- 
a/server/src/test/scala/org/apache/livy/server/interactive/SessionHeartbeatSpec.scala
+++ 
b/server/src/test/scala/org/apache/livy/server/interactive/SessionHeartbeatSpec.scala
@@ -18,6 +18,7 @@
 package org.apache.livy.server.interactive
 
 import scala.concurrent.duration._
+import scala.concurrent.Future
 import scala.language.postfixOps
 
 import org.mockito.Mockito.{never, verify, when}
@@ -71,11 +72,15 @@ class SessionHeartbeatSpec extends FunSpec with Matchers {
       when(expiredSession.id).thenReturn(0)
       when(expiredSession.name).thenReturn(None)
       when(expiredSession.heartbeatExpired).thenReturn(true)
+      when(expiredSession.stop()).thenReturn(Future.successful(()))
+      when(expiredSession.lastActivity).thenReturn(System.nanoTime())
 
       val nonExpiredSession: TestSession = mock[TestSession]
       when(nonExpiredSession.id).thenReturn(1)
       when(nonExpiredSession.name).thenReturn(None)
       when(nonExpiredSession.heartbeatExpired).thenReturn(false)
+      when(nonExpiredSession.stop()).thenReturn(Future.successful(()))
+      when(nonExpiredSession.lastActivity).thenReturn(System.nanoTime())
 
       val n = new TestWatchdog(new LivyConf())
 

Reply via email to