Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/20646#discussion_r169521344 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala --- @@ -429,7 +429,25 @@ trait StreamTest extends QueryTest with SharedSQLContext with TimeLimits with Be val defaultCheckpointLocation = Utils.createTempDir(namePrefix = "streaming.metadata").getCanonicalPath try { - startedTest.foreach { action => + val actionIterator = startedTest.iterator.buffered + while (actionIterator.hasNext) { + // Synchronize sequential addDataMemory actions. + val addDataMemoryActions = ArrayBuffer[AddDataMemory[_]]() + while (actionIterator.hasNext && actionIterator.head.isInstanceOf[AddDataMemory[_]]) { + addDataMemoryActions.append(actionIterator.next().asInstanceOf[AddDataMemory[_]]) + } + if (addDataMemoryActions.nonEmpty) { + val synchronizeAll = addDataMemoryActions --- End diff -- This is some magic-ish code. Can you add a bit more comments on how this compose thing works?
--- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org