This is an automated email from the ASF dual-hosted git repository. vanzin pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-livy.git
The following commit(s) were added to refs/heads/master by this push: new 8c7f03b [LIVY-513][LIVY-517][TEST] Fix SessionHeartbeatSpec flakiness, occasional NPE 8c7f03b is described below commit 8c7f03b4a8c5fae73c14bac7d97c62cc0fd6241d Author: Gyorgy Gal <gal.gyo...@gmail.com> AuthorDate: Tue Mar 26 17:50:15 2019 -0700 [LIVY-513][LIVY-517][TEST] Fix SessionHeartbeatSpec flakiness, occasional NPE ## What changes were proposed in this pull request? This PR aims to fix the following issues in the test suite `SessionHeartbeatSpec`: * Since there is no implementation provided for `stop()` in `mock[TestSession]`, Mockito returns `null` by default and this causes `SessionManager#delete` to throw `NullPointerException` when executing `map()` on the `Future[Unit]` object it expects. This can be observed in the unit test output (`server/target/unit-tests.log`): ``` 19/03/25 18:20:30.123 ScalaTest-main-running-SessionHeartbeatSpec INFO SessionHeartbeatSpec$$anonfun$2$TestWatchdog$1: Registering new session 0 19/03/25 18:20:30.123 ScalaTest-main-running-SessionHeartbeatSpec INFO SessionHeartbeatSpec$$anonfun$2$TestWatchdog$1: Registering new session 1 19/03/25 18:20:30.124 ScalaTest-main-running-SessionHeartbeatSpec INFO SessionHeartbeatSpec$$anonfun$2$TestWatchdog$1: Session 0 expired. Last heartbeat is at null. 19/03/25 18:20:30.124 ScalaTest-main-running-SessionHeartbeatSpec WARN SessionHeartbeatSpec$$anonfun$2$TestWatchdog$1: Exception was thrown when deleting expired session 0 java.lang.NullPointerException at org.apache.livy.sessions.SessionManager.delete(SessionManager.scala:123) at org.apache.livy.server.interactive.SessionHeartbeatWatchdog$$anonfun$deleteExpiredSessions$1.apply(SessionHeartbeat.scala:113) at org.apache.livy.server.interactive.SessionHeartbeatWatchdog$$anonfun$deleteExpiredSessions$1.apply(SessionHeartbeat.scala:110) at scala.collection.Iterator$class.foreach(Iterator.scala:891) at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) at scala.collection.MapLike$DefaultValuesIterable.foreach(MapLike.scala:206) at org.apache.livy.server.interactive.SessionHeartbeatWatchdog$class.deleteExpiredSessions(SessionHeartbeat.scala:110) at org.apache.livy.server.interactive.SessionHeartbeatSpec$$anonfun$2$TestWatchdog$1.deleteExpiredSessions(SessionHeartbeatSpec.scala:60) ``` * In some rare cases, the SessionManager GC thread may run `collectGarbage()` after the test sessions are registered but before the test is finished. The GC thread will also attempt to delete the `mock[TestSession]` objects because the mocked session's `lastActivity()` function returns 0 by default which means`currentTime - session.lastActivity > sessionTimeout` will always be true. This causes a race condition that can have the following outcomes: 1. The `stop()` method may be called twice (both by `SessionHeartbeatWatchdog#deleteExpiredSessions` and `SessionManager#collectGarbage`), resulting in a `TooManyActualInvocations` exception (LIVY-513): ``` org.mockito.exceptions.verification.TooManyActualInvocations: testSession$1.stop(); Wanted 1 time: -> at org.apache.livy.server.interactive.SessionHeartbeatSpec$$anonfun$2$$anonfun$apply$mcV$sp$5.apply$mcV$sp(SessionHeartbeatSpec.scala:93) But was 2 times. Undesired invocation: -> at org.apache.livy.sessions.SessionManager.delete(SessionManager.scala:124) at org.apache.livy.server.interactive.SessionHeartbeatSpec$$anonfun$2$$anonfun$apply$mcV$sp$5.apply$mcV$sp(SessionHeartbeatSpec.scala:93) at org.apache.livy.server.interactive.SessionHeartbeatSpec$$anonfun$2$$anonfun$apply$mcV$sp$5.apply(SessionHeartbeatSpec.scala:70) at org.apache.livy.server.interactive.SessionHeartbeatSpec$$anonfun$2$$anonfun$apply$mcV$sp$5.apply(SessionHeartbeatSpec.scala:70) at org.scalatest.Transformer$$anonfun$apply$1.apply$mcV$sp(Transformer.scala:22) at org.scalatest.OutcomeOf$class.outcomeOf(OutcomeOf.scala:85) at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104) at org.scalatest.Transformer.apply(Transformer.scala:22) at org.scalatest.Transformer.apply(Transformer.scala:20) at org.scalatest.FunSpecLike$$anon$1.apply(FunSpecLike.scala:422) at org.scalatest.Suite$class.withFixture(Suite.scala:1122) ... ``` 2. The `stop()` method may be called by `collectGarbage()` after `deleteExpiredSessions()`, but before the test is finished, which causes the following error message to appear on the console (LIVY-517). ``` Deleting Mock for TestSession$1, hashCode: 1666638753 because it was inactive for more than 3600000.0 ms. Exception in thread "session gc thread" java.lang.NullPointerException at org.apache.livy.sessions.SessionManager.delete(SessionManager.scala:124) at org.apache.livy.sessions.SessionManager$$anonfun$collectGarbage$2.apply(SessionManager.scala:171) at org.apache.livy.sessions.SessionManager$$anonfun$collectGarbage$2.apply(SessionManager.scala:168) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.immutable.List.foreach(List.scala:392) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.immutable.List.map(List.scala:296) at org.apache.livy.sessions.SessionManager.collectGarbage(SessionManager.scala:168) at org.apache.livy.sessions.SessionManager$GarbageCollector.run(SessionManager.scala:201) ``` These race conditions can be triggered easily by inserting `Thread.sleep(x)` (where x is between 10 to 50) before the loop in `org.apache.livy.sessions.SessionManager.GarbageCollector#run` and rerunning the test. With this change applied, the SessionManager GC should not attempt to collect the `mock[TestSession]` objects created by the test, so the race condition should be eliminated and the flakiness of the test should be fixed. ## How was this patch tested? By running the existing UTs. Author: Gyorgy Gal <gal.gyo...@gmail.com> Closes #163 from gyogal/LIVY-513. --- .../org/apache/livy/server/interactive/SessionHeartbeatSpec.scala | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/server/src/test/scala/org/apache/livy/server/interactive/SessionHeartbeatSpec.scala b/server/src/test/scala/org/apache/livy/server/interactive/SessionHeartbeatSpec.scala index 6d2cff8..074b9c2 100644 --- a/server/src/test/scala/org/apache/livy/server/interactive/SessionHeartbeatSpec.scala +++ b/server/src/test/scala/org/apache/livy/server/interactive/SessionHeartbeatSpec.scala @@ -18,6 +18,7 @@ package org.apache.livy.server.interactive import scala.concurrent.duration._ +import scala.concurrent.Future import scala.language.postfixOps import org.mockito.Mockito.{never, verify, when} @@ -71,11 +72,15 @@ class SessionHeartbeatSpec extends FunSpec with Matchers { when(expiredSession.id).thenReturn(0) when(expiredSession.name).thenReturn(None) when(expiredSession.heartbeatExpired).thenReturn(true) + when(expiredSession.stop()).thenReturn(Future.successful(())) + when(expiredSession.lastActivity).thenReturn(System.nanoTime()) val nonExpiredSession: TestSession = mock[TestSession] when(nonExpiredSession.id).thenReturn(1) when(nonExpiredSession.name).thenReturn(None) when(nonExpiredSession.heartbeatExpired).thenReturn(false) + when(nonExpiredSession.stop()).thenReturn(Future.successful(())) + when(nonExpiredSession.lastActivity).thenReturn(System.nanoTime()) val n = new TestWatchdog(new LivyConf())