Github user brkyvz commented on a diff in the pull request:

    https://github.com/apache/spark/pull/9373#discussion_r44552651
  
    --- Diff: 
streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala
 ---
    @@ -199,6 +198,63 @@ class FileBasedWriteAheadLogSuite
     
       import WriteAheadLogSuite._
     
    +  test("FileBasedWriteAheadLog - parallel readAll opens at most 
'numThreads' files") {
    +    /*
    +     If the setting `closeFileAfterWrite` is enabled, we start generating 
a very large number of
    +     files. This causes recovery to take a very long time. In order to 
make it quicker, we
    +     parallelized the reading of these files. This test makes sure that we 
limit the number of
    +     open files to the size of the number of threads in our thread pool 
rather than the size of
    +     the list of files.
    +     */
    +    val numThreads = 8
    +    val tpool = ThreadUtils.newDaemonFixedThreadPool(numThreads, 
"wal-test-thread-pool")
    +    class GetMaxCounter {
    +      private val value = new AtomicInteger()
    +      @volatile private var max: Int = 0
    +      def increment(): Unit = {
    +        val atInstant = value.incrementAndGet()
    +        if (atInstant > max) max = atInstant
    +      }
    +      def decrement(): Unit = { value.decrementAndGet() }
    +      def get(): Int = value.get()
    +      def getMax(): Int = max
    +    }
    +    /**
    +     * We need an object that can be iterated through, which will 
increment our counter once
    +     * initialized, and decrement once closed. This way we can simulate 
how many "streams" will
    +     * be opened during a real use case.
    +     */
    +    class ReaderObject(cnt: GetMaxCounter, value: Int) extends 
Iterator[Int] with Closeable {
    +      cnt.increment()
    +      private var returnedValue: Boolean = false
    +      override def hasNext(): Boolean = !returnedValue
    +      override def next(): Int = {
    +        if (!returnedValue) {
    +          returnedValue = true
    +          value
    +        } else {
    +          -1
    +        }
    +      }
    +      override def close(): Unit = {
    +        cnt.decrement()
    +      }
    +    }
    +    try {
    +      val testSeq = 1 to 64
    +      val counter = new GetMaxCounter()
    +      def handle(value: Int): Iterator[Int] = {
    +        val reader = new ReaderObject(counter, value)
    +        CompletionIterator[Int, Iterator[Int]](reader, reader.close)
    --- End diff --
    
    It's not only feasible, it's much more awesome!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to