Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20646#discussion_r169521344
  
    --- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala ---
    @@ -429,7 +429,25 @@ trait StreamTest extends QueryTest with 
SharedSQLContext with TimeLimits with Be
         val defaultCheckpointLocation =
           Utils.createTempDir(namePrefix = 
"streaming.metadata").getCanonicalPath
         try {
    -      startedTest.foreach { action =>
    +      val actionIterator = startedTest.iterator.buffered
    +      while (actionIterator.hasNext) {
    +        // Synchronize sequential addDataMemory actions.
    +        val addDataMemoryActions = ArrayBuffer[AddDataMemory[_]]()
    +        while (actionIterator.hasNext && 
actionIterator.head.isInstanceOf[AddDataMemory[_]]) {
    +          
addDataMemoryActions.append(actionIterator.next().asInstanceOf[AddDataMemory[_]])
    +        }
    +        if (addDataMemoryActions.nonEmpty) {
    +          val synchronizeAll = addDataMemoryActions
    --- End diff --
    
    This is some magic-ish code. Can you add a bit more comments on how this 
compose thing works?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to