Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/9373#discussion_r44609375
  
    --- Diff: 
streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala
 ---
    @@ -198,6 +197,45 @@ class FileBasedWriteAheadLogSuite
     
       import WriteAheadLogSuite._
     
    +  test("FileBasedWriteAheadLog - seqToParIterator") {
    +    /*
    +     If the setting `closeFileAfterWrite` is enabled, we start generating 
a very large number of
    +     files. This causes recovery to take a very long time. In order to 
make it quicker, we
    +     parallelized the reading of these files. This test makes sure that we 
limit the number of
    +     open files to the size of the number of threads in our thread pool 
rather than the size of
    +     the list of files.
    +     */
    +    val numThreads = 8
    +    val tpool = ThreadUtils.newDaemonFixedThreadPool(numThreads, 
"wal-test-thread-pool")
    +    class GetMaxCounter {
    +      private val value = new AtomicInteger()
    +      @volatile private var max: Int = 0
    +      def increment(): Unit = synchronized {
    +        val atInstant = value.incrementAndGet()
    +        if (atInstant > max) max = atInstant
    +      }
    +      def decrement(): Unit = synchronized { value.decrementAndGet() }
    +      def get(): Int = synchronized { value.get() }
    +      def getMax(): Int = synchronized { max }
    +    }
    +    try {
    +      val testSeq = 1 to 64
    +      val counter = new GetMaxCounter()
    +      def handle(value: Int): Iterator[Int] = {
    +        new CompletionIterator[Int, Iterator[Int]](Iterator(value)) {
    +          counter.increment()
    +          override def completion() { counter.decrement() }
    +        }
    +      }
    +      val iterator = FileBasedWriteAheadLog.seqToParIterator[Int, 
Int](tpool, testSeq, handle)
    +      assert(iterator.toSeq === testSeq)
    +      assert(counter.getMax() > 1) // make sure we are doing a parallel 
computation!
    --- End diff --
    
    Make sure you add comments on what the latch does. This is a complicated 
test and needs inline comments to understand whats going in.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to