vanzin commented on a change in pull request #24616: [SPARK-27726] [Core] Fix
performance of ElementTrackingStore deletes when using InMemoryStore under high
loads
URL: https://github.com/apache/spark/pull/24616#discussion_r285756405
##########
File path:
core/src/test/scala/org/apache/spark/status/ElementTrackingStoreSuite.scala
##########
@@ -17,14 +17,66 @@
package org.apache.spark.status
+import java.util.concurrent.atomic.AtomicInteger
+
import org.mockito.Mockito._
import org.apache.spark.{SparkConf, SparkFunSuite}
import org.apache.spark.internal.config.Status._
+import org.apache.spark.status.ElementTrackingStore._
import org.apache.spark.util.kvstore._
class ElementTrackingStoreSuite extends SparkFunSuite {
+ test("asynchronous tracking single-fire") {
+ val store = mock(classOf[KVStore])
+ val tracking = new ElementTrackingStore(store, new SparkConf()
+ .set(ASYNC_TRACKING_ENABLED, true))
+
+ val waiter = new Object()
+ var done = false
+ var type1 = new AtomicInteger(0)
+ var queued0: WriteQueueResult = null
+ var queued1: WriteQueueResult = null
+ var queued2: WriteQueueResult = null
+ var queued3: WriteQueueResult = null
+
+
+ tracking.addTrigger(classOf[Type1], 1) { count =>
+ val count = type1.getAndIncrement()
+
+ count match {
+ case 0 =>
+ // while in the asynchronous thread, attempt to increment twice.
The first should
+ // succeed, the second should be skipped
+ queued1 = tracking.write(new Type1, checkTriggers = true)
+ queued2 = tracking.write(new Type1, checkTriggers = true)
+ case 1 =>
+ // Verify that once we've started deliver again, that we can enqueue
another
+ queued3 = tracking.write(new Type1, checkTriggers = true)
+ case 2 =>
+ waiter.synchronized {
+ done = true
+ waiter.notifyAll()
+ }
+ }
+ }
+
+ when(store.count(classOf[Type1])).thenReturn(2L)
+ queued0 = tracking.write(new Type1, checkTriggers = true)
+ waiter.synchronized {
+ if (!done) {
+ waiter.wait()
Review comment:
Hmmm... maybe add a timeout here (or use `eventually`)? The issue is that if
there's a bug in your code, this may not actually return. (It's passing, but if
a bug is added later then this would be an annoying thing to debug.)
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]