vanzin commented on a change in pull request #24616: [SPARK-27726] [Core] Fix 
performance of ElementTrackingStore deletes when using InMemoryStore under high 
loads
URL: https://github.com/apache/spark/pull/24616#discussion_r285756405
 
 

 ##########
 File path: 
core/src/test/scala/org/apache/spark/status/ElementTrackingStoreSuite.scala
 ##########
 @@ -17,14 +17,66 @@
 
 package org.apache.spark.status
 
+import java.util.concurrent.atomic.AtomicInteger
+
 import org.mockito.Mockito._
 
 import org.apache.spark.{SparkConf, SparkFunSuite}
 import org.apache.spark.internal.config.Status._
+import org.apache.spark.status.ElementTrackingStore._
 import org.apache.spark.util.kvstore._
 
 class ElementTrackingStoreSuite extends SparkFunSuite {
 
+  test("asynchronous tracking single-fire") {
+    val store = mock(classOf[KVStore])
+    val tracking = new ElementTrackingStore(store, new SparkConf()
+      .set(ASYNC_TRACKING_ENABLED, true))
+
+    val waiter = new Object()
+    var done = false
+    var type1 = new AtomicInteger(0)
+    var queued0: WriteQueueResult = null
+    var queued1: WriteQueueResult = null
+    var queued2: WriteQueueResult = null
+    var queued3: WriteQueueResult = null
+
+
+    tracking.addTrigger(classOf[Type1], 1) { count =>
+      val count = type1.getAndIncrement()
+
+      count match {
+        case 0 =>
+          // while in the asynchronous thread, attempt to increment twice.  
The first should
+          // succeed, the second should be skipped
+          queued1 = tracking.write(new Type1, checkTriggers = true)
+          queued2 = tracking.write(new Type1, checkTriggers = true)
+        case 1 =>
+          // Verify that once we've started deliver again, that we can enqueue 
another
+          queued3 = tracking.write(new Type1, checkTriggers = true)
+        case 2 =>
+          waiter.synchronized {
+            done = true
+            waiter.notifyAll()
+          }
+      }
+    }
+
+    when(store.count(classOf[Type1])).thenReturn(2L)
+    queued0 = tracking.write(new Type1, checkTriggers = true)
+    waiter.synchronized {
+      if (!done) {
+        waiter.wait()
 
 Review comment:
   Hmmm... maybe add a timeout here (or use `eventually`)? The issue is that if 
there's a bug in your code, this may not actually return. (It's passing, but if 
a bug is added later then this would be an annoying thing to debug.)

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to