Github user tdas commented on a diff in the pull request:
https://github.com/apache/spark/pull/20646#discussion_r169521344
--- Diff:
sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala ---
@@ -429,7 +429,25 @@ trait StreamTest extends QueryTest with
SharedSQLContext with TimeLimits with Be
val defaultCheckpointLocation =
Utils.createTempDir(namePrefix =
"streaming.metadata").getCanonicalPath
try {
- startedTest.foreach { action =>
+ val actionIterator = startedTest.iterator.buffered
+ while (actionIterator.hasNext) {
+ // Synchronize sequential addDataMemory actions.
+ val addDataMemoryActions = ArrayBuffer[AddDataMemory[_]]()
+ while (actionIterator.hasNext &&
actionIterator.head.isInstanceOf[AddDataMemory[_]]) {
+
addDataMemoryActions.append(actionIterator.next().asInstanceOf[AddDataMemory[_]])
+ }
+ if (addDataMemoryActions.nonEmpty) {
+ val synchronizeAll = addDataMemoryActions
--- End diff --
This is some magic-ish code. Can you add a bit more comments on how this
compose thing works?
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]