Repository: spark
Updated Branches:
  refs/heads/branch-2.0 a0d9015b3 -> 881e0eb05


http://git-wip-us.apache.org/repos/asf/spark/blob/881e0eb0/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala
index 984b84f..06f1bd6 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala
@@ -74,6 +74,7 @@ class StateStoreSuite extends SparkFunSuite with 
BeforeAndAfter with PrivateMeth
 
     // Verify state after updating
     put(store, "a", 1)
+    assert(store.numKeys() === 1)
     intercept[IllegalStateException] {
       store.iterator()
     }
@@ -85,7 +86,9 @@ class StateStoreSuite extends SparkFunSuite with 
BeforeAndAfter with PrivateMeth
     // Make updates, commit and then verify state
     put(store, "b", 2)
     put(store, "aa", 3)
+    assert(store.numKeys() === 3)
     remove(store, _.startsWith("a"))
+    assert(store.numKeys() === 1)
     assert(store.commit() === 1)
 
     assert(store.hasCommitted)
@@ -107,7 +110,9 @@ class StateStoreSuite extends SparkFunSuite with 
BeforeAndAfter with PrivateMeth
     val reloadedProvider = new HDFSBackedStateStoreProvider(
       store.id, keySchema, valueSchema, StateStoreConf.empty, new 
Configuration)
     val reloadedStore = reloadedProvider.getStore(1)
+    assert(reloadedStore.numKeys() === 1)
     put(reloadedStore, "c", 4)
+    assert(reloadedStore.numKeys() === 2)
     assert(reloadedStore.commit() === 2)
     assert(rowsToSet(reloadedStore.iterator()) === Set("b" -> 2, "c" -> 4))
     assert(getDataFromFiles(provider) === Set("b" -> 2, "c" -> 4))

http://git-wip-us.apache.org/repos/asf/spark/blob/881e0eb0/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
index 55c95ae..c18d843 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
@@ -899,6 +899,20 @@ class FileStreamSourceSuite extends FileStreamSourceTest {
       }
     }
   }
+
+  test("input row metrics") {
+    withTempDirs { case (src, tmp) =>
+      val input = spark.readStream.format("text").load(src.getCanonicalPath)
+      testStream(input)(
+        AddTextFileData("100", src, tmp),
+        CheckAnswer("100"),
+        AssertOnLastQueryStatus { status =>
+          assert(status.triggerDetails.get("numRows.input.total") === "1")
+          assert(status.sourceStatuses(0).processingRate > 0.0)
+        }
+      )
+    }
+  }
 }
 
 class FileStreamSourceStressTestSuite extends FileStreamSourceTest {

http://git-wip-us.apache.org/repos/asf/spark/blob/881e0eb0/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala
index d4c64e4..be24cbb 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala
@@ -28,6 +28,8 @@ import scala.util.control.NonFatal
 
 import org.scalatest.Assertions
 import org.scalatest.concurrent.{Eventually, Timeouts}
+import org.scalatest.concurrent.AsyncAssertions.Waiter
+import org.scalatest.concurrent.Eventually._
 import org.scalatest.concurrent.PatienceConfiguration.Timeout
 import org.scalatest.exceptions.TestFailedDueToTimeoutException
 import org.scalatest.time.Span
@@ -38,6 +40,7 @@ import org.apache.spark.sql.catalyst.encoders.{encoderFor, 
ExpressionEncoder, Ro
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.catalyst.util._
 import org.apache.spark.sql.execution.streaming._
+import org.apache.spark.sql.streaming.StreamingQueryListener._
 import org.apache.spark.sql.test.SharedSQLContext
 import org.apache.spark.util.{Clock, ManualClock, SystemClock, Utils}
 
@@ -193,6 +196,10 @@ trait StreamTest extends QueryTest with SharedSQLContext 
with Timeouts {
     }
   }
 
+  case class AssertOnLastQueryStatus(condition: StreamingQueryStatus => Unit)
+    extends StreamAction
+
+
   /**
    * Executes the specified actions on the given streaming DataFrame and 
provides helpful
    * error messages in the case of failures or incorrect answers.
@@ -294,9 +301,12 @@ trait StreamTest extends QueryTest with SharedSQLContext 
with Timeouts {
 
     val testThread = Thread.currentThread()
     val metadataRoot = Utils.createTempDir(namePrefix = 
"streaming.metadata").getCanonicalPath
+    val statusCollector = new QueryStatusCollector
 
     try {
+      spark.streams.addListener(statusCollector)
       startedTest.foreach { action =>
+        logInfo(s"Processing test stream action: $action")
         action match {
           case StartStream(trigger, triggerClock) =>
             verify(currentStream == null, "stream already running")
@@ -394,6 +404,13 @@ trait StreamTest extends QueryTest with SharedSQLContext 
with Timeouts {
             val streamToAssert = Option(currentStream).getOrElse(lastStream)
             verify({ a.run(); true }, s"Assert failed: ${a.message}")
 
+          case a: AssertOnLastQueryStatus =>
+            Eventually.eventually(timeout(streamingTimeout)) {
+              require(statusCollector.lastTriggerStatus.nonEmpty)
+            }
+            val status = statusCollector.lastTriggerStatus.get
+            verify({ a.condition(status); true }, "Assert on last query status 
failed")
+
           case a: AddData =>
             try {
               // Add data and get the source where it was added, and the 
expected offset of the
@@ -465,6 +482,7 @@ trait StreamTest extends QueryTest with SharedSQLContext 
with Timeouts {
       if (currentStream != null && currentStream.microBatchThread.isAlive) {
         currentStream.stop()
       }
+      spark.streams.removeListener(statusCollector)
     }
   }
 
@@ -598,4 +616,58 @@ trait StreamTest extends QueryTest with SharedSQLContext 
with Timeouts {
       }
     }
   }
+
+
+  class QueryStatusCollector extends StreamingQueryListener {
+    // to catch errors in the async listener events
+    @volatile private var asyncTestWaiter = new Waiter
+
+    @volatile var startStatus: StreamingQueryStatus = null
+    @volatile var terminationStatus: StreamingQueryStatus = null
+    @volatile var terminationException: Option[String] = null
+
+    private val progressStatuses = new 
mutable.ArrayBuffer[StreamingQueryStatus]
+
+    /** Get the info of the last trigger that processed data */
+    def lastTriggerStatus: Option[StreamingQueryStatus] = synchronized {
+      progressStatuses.filter { i =>
+        i.triggerDetails.get("isTriggerActive").toBoolean == false &&
+          i.triggerDetails.get("isDataPresentInTrigger").toBoolean == true
+      }.lastOption
+    }
+
+    def reset(): Unit = {
+      startStatus = null
+      terminationStatus = null
+      progressStatuses.clear()
+      asyncTestWaiter = new Waiter
+    }
+
+    def checkAsyncErrors(): Unit = {
+      asyncTestWaiter.await(timeout(10 seconds))
+    }
+
+
+    override def onQueryStarted(queryStarted: QueryStarted): Unit = {
+      asyncTestWaiter {
+        startStatus = queryStarted.queryStatus
+      }
+    }
+
+    override def onQueryProgress(queryProgress: QueryProgress): Unit = {
+      asyncTestWaiter {
+        assert(startStatus != null, "onQueryProgress called before 
onQueryStarted")
+        synchronized { progressStatuses += queryProgress.queryStatus }
+      }
+    }
+
+    override def onQueryTerminated(queryTerminated: QueryTerminated): Unit = {
+      asyncTestWaiter {
+        assert(startStatus != null, "onQueryTerminated called before 
onQueryStarted")
+        terminationStatus = queryTerminated.queryStatus
+        terminationException = queryTerminated.exception
+      }
+      asyncTestWaiter.dismiss()
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/881e0eb0/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala
index 8681199..e59b549 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala
@@ -22,6 +22,7 @@ import org.scalatest.BeforeAndAfterAll
 import org.apache.spark.SparkException
 import org.apache.spark.sql.AnalysisException
 import org.apache.spark.sql.InternalOutputModes._
+import org.apache.spark.sql.execution.SparkPlan
 import org.apache.spark.sql.execution.streaming._
 import org.apache.spark.sql.execution.streaming.state.StateStore
 import org.apache.spark.sql.expressions.scalalang.typed
@@ -129,6 +130,59 @@ class StreamingAggregationSuite extends StreamTest with 
BeforeAndAfterAll {
     )
   }
 
+  test("state metrics") {
+    val inputData = MemoryStream[Int]
+
+    val aggregated =
+      inputData.toDS()
+        .flatMap(x => Seq(x, x + 1))
+        .toDF("value")
+        .groupBy($"value")
+        .agg(count("*"))
+        .as[(Int, Long)]
+
+    implicit class RichStreamExecution(query: StreamExecution) {
+      def stateNodes: Seq[SparkPlan] = {
+        query.lastExecution.executedPlan.collect {
+          case p if p.isInstanceOf[StateStoreSaveExec] => p
+        }
+      }
+    }
+
+    // Test with Update mode
+    testStream(aggregated, Update)(
+      AddData(inputData, 1),
+      CheckLastBatch((1, 1), (2, 1)),
+      AssertOnQuery { _.stateNodes.size === 1 },
+      AssertOnQuery { _.stateNodes.head.metrics.get("numOutputRows").get.value 
=== 2 },
+      AssertOnQuery { 
_.stateNodes.head.metrics.get("numUpdatedStateRows").get.value === 2 },
+      AssertOnQuery { 
_.stateNodes.head.metrics.get("numTotalStateRows").get.value === 2 },
+      AddData(inputData, 2, 3),
+      CheckLastBatch((2, 2), (3, 2), (4, 1)),
+      AssertOnQuery { _.stateNodes.size === 1 },
+      AssertOnQuery { _.stateNodes.head.metrics.get("numOutputRows").get.value 
=== 3 },
+      AssertOnQuery { 
_.stateNodes.head.metrics.get("numUpdatedStateRows").get.value === 3 },
+      AssertOnQuery { 
_.stateNodes.head.metrics.get("numTotalStateRows").get.value === 4 }
+    )
+
+    // Test with Complete mode
+    inputData.reset()
+    testStream(aggregated, Complete)(
+      AddData(inputData, 1),
+      CheckLastBatch((1, 1), (2, 1)),
+      AssertOnQuery { _.stateNodes.size === 1 },
+      AssertOnQuery { _.stateNodes.head.metrics.get("numOutputRows").get.value 
=== 2 },
+      AssertOnQuery { 
_.stateNodes.head.metrics.get("numUpdatedStateRows").get.value === 2 },
+      AssertOnQuery { 
_.stateNodes.head.metrics.get("numTotalStateRows").get.value === 2 },
+      AddData(inputData, 2, 3),
+      CheckLastBatch((1, 1), (2, 2), (3, 2), (4, 1)),
+      AssertOnQuery { _.stateNodes.size === 1 },
+      AssertOnQuery { _.stateNodes.head.metrics.get("numOutputRows").get.value 
=== 4 },
+      AssertOnQuery { 
_.stateNodes.head.metrics.get("numUpdatedStateRows").get.value === 3 },
+      AssertOnQuery { 
_.stateNodes.head.metrics.get("numTotalStateRows").get.value === 4 }
+    )
+  }
+
   test("multiple keys") {
     val inputData = MemoryStream[Int]
 

http://git-wip-us.apache.org/repos/asf/spark/blob/881e0eb0/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala
index 831543a..9e0eefb 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala
@@ -17,92 +17,97 @@
 
 package org.apache.spark.sql.streaming
 
-import java.util.concurrent.ConcurrentLinkedQueue
-
+import org.scalactic.TolerantNumerics
 import org.scalatest.BeforeAndAfter
 import org.scalatest.PrivateMethodTester._
-import org.scalatest.concurrent.AsyncAssertions.Waiter
-import org.scalatest.concurrent.Eventually._
-import org.scalatest.concurrent.PatienceConfiguration.Timeout
-import org.scalatest.time.SpanSugar._
 
 import org.apache.spark.SparkException
+import org.apache.spark.sql.DataFrame
 import org.apache.spark.sql.execution.streaming._
-import org.apache.spark.util.JsonProtocol
+import org.apache.spark.sql.functions._
+import org.apache.spark.util.{JsonProtocol, ManualClock}
 
 
 class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter {
 
   import testImplicits._
-  import StreamingQueryListener._
+  import StreamingQueryListenerSuite._
+
+  // To make === between double tolerate inexact values
+  implicit val doubleEquality = TolerantNumerics.tolerantDoubleEquality(0.01)
 
   after {
     spark.streams.active.foreach(_.stop())
     assert(spark.streams.active.isEmpty)
     assert(addedListeners.isEmpty)
     // Make sure we don't leak any events to the next test
-    spark.sparkContext.listenerBus.waitUntilEmpty(10000)
   }
 
-  test("single listener") {
-    val listener = new QueryStatusCollector
-    val input = MemoryStream[Int]
-    withListenerAdded(listener) {
-      testStream(input.toDS)(
-        StartStream(),
-        AssertOnQuery("Incorrect query status in onQueryStarted") { query =>
-          val status = listener.startStatus
-          assert(status != null)
-          assert(status.name === query.name)
-          assert(status.id === query.id)
-          assert(status.sourceStatuses.size === 1)
-          assert(status.sourceStatuses(0).description.contains("Memory"))
-
-          // The source and sink offsets must be None as this must be called 
before the
-          // batches have started
-          assert(status.sourceStatuses(0).offsetDesc === None)
-          assert(status.sinkStatus.offsetDesc === CompositeOffset(None :: 
Nil).toString)
-
-          // No progress events or termination events
-          assert(listener.progressStatuses.isEmpty)
-          assert(listener.terminationStatus === null)
-          true
-        },
-        AddDataMemory(input, Seq(1, 2, 3)),
-        CheckAnswer(1, 2, 3),
-        AssertOnQuery("Incorrect query status in onQueryProgress") { query =>
-          eventually(Timeout(streamingTimeout)) {
+  ignore("single listener, check trigger statuses") {
+    import StreamingQueryListenerSuite._
+    clock = new ManualClock()
+
+    /** Custom MemoryStream that waits for manual clock to reach a time */
+    val inputData = new MemoryStream[Int](0, sqlContext) {
+      // Wait for manual clock to be 100 first time there is data
+      override def getOffset: Option[Offset] = {
+        val offset = super.getOffset
+        if (offset.nonEmpty) {
+          clock.waitTillTime(100)
+        }
+        offset
+      }
 
-            // There should be only on progress event as batch has been 
processed
-            assert(listener.progressStatuses.size === 1)
-            val status = listener.progressStatuses.peek()
-            assert(status != null)
-            assert(status.name === query.name)
-            assert(status.id === query.id)
-            assert(status.sourceStatuses(0).offsetDesc === 
Some(LongOffset(0).toString))
-            assert(status.sinkStatus.offsetDesc === 
CompositeOffset.fill(LongOffset(0)).toString)
+      // Wait for manual clock to be 300 first time there is data
+      override def getBatch(start: Option[Offset], end: Offset): DataFrame = {
+        clock.waitTillTime(300)
+        super.getBatch(start, end)
+      }
+    }
 
-            // No termination events
-            assert(listener.terminationStatus === null)
-          }
-          true
-        },
-        StopStream,
-        AssertOnQuery("Incorrect query status in onQueryTerminated") { query =>
-          eventually(Timeout(streamingTimeout)) {
-            val status = listener.terminationStatus
-            assert(status != null)
-            assert(status.name === query.name)
-            assert(status.id === query.id)
-            assert(status.sourceStatuses(0).offsetDesc === 
Some(LongOffset(0).toString))
-            assert(status.sinkStatus.offsetDesc === 
CompositeOffset.fill(LongOffset(0)).toString)
-            assert(listener.terminationException === None)
-          }
-          listener.checkAsyncErrors()
-          true
-        }
-      )
+    // This is to make sure thatquery waits for manual clock to be 600 first 
time there is data
+    val mapped = inputData.toDS().agg(count("*")).as[Long].coalesce(1).map { x 
=>
+      clock.waitTillTime(600)
+      x
     }
+
+    testStream(mapped, OutputMode.Complete)(
+      StartStream(triggerClock = clock),
+      AddData(inputData, 1, 2),
+      AdvanceManualClock(100),  // unblock getOffset, will block on getBatch
+      AdvanceManualClock(200),  // unblock getBatch, will block on computation
+      AdvanceManualClock(300),  // unblock computation
+      AssertOnQuery { _ => clock.getTimeMillis() === 600 },
+      AssertOnLastQueryStatus { status: StreamingQueryStatus =>
+        // Check the correctness of the trigger info of the last completed 
batch reported by
+        // onQueryProgress
+        assert(status.triggerDetails.get("triggerId") == "0")
+        assert(status.triggerDetails.get("isTriggerActive") === "false")
+        assert(status.triggerDetails.get("isDataPresentInTrigger") === "true")
+
+        assert(status.triggerDetails.get("timestamp.triggerStart") === "0")
+        assert(status.triggerDetails.get("timestamp.afterGetOffset") === "100")
+        assert(status.triggerDetails.get("timestamp.afterGetBatch") === "300")
+        assert(status.triggerDetails.get("timestamp.triggerFinish") === "600")
+
+        assert(status.triggerDetails.get("latency.getOffset.total") === "100")
+        assert(status.triggerDetails.get("latency.getBatch.total") === "200")
+        assert(status.triggerDetails.get("latency.optimizer") === "0")
+        assert(status.triggerDetails.get("latency.offsetLogWrite") === "0")
+        assert(status.triggerDetails.get("latency.fullTrigger") === "600")
+
+        assert(status.triggerDetails.get("numRows.input.total") === "2")
+        assert(status.triggerDetails.get("numRows.state.aggregation1.total") 
=== "1")
+        assert(status.triggerDetails.get("numRows.state.aggregation1.updated") 
=== "1")
+
+        assert(status.sourceStatuses.length === 1)
+        assert(status.sourceStatuses(0).triggerDetails.get("triggerId") === 
"0")
+        
assert(status.sourceStatuses(0).triggerDetails.get("latency.getOffset.source") 
=== "100")
+        
assert(status.sourceStatuses(0).triggerDetails.get("latency.getBatch.source") 
=== "200")
+        
assert(status.sourceStatuses(0).triggerDetails.get("numRows.input.source") === 
"2")
+      },
+      CheckAnswer(2)
+    )
   }
 
   test("adding and removing listener") {
@@ -172,56 +177,37 @@ class StreamingQueryListenerSuite extends StreamTest with 
BeforeAndAfter {
   }
 
   test("QueryStarted serialization") {
-    val queryStartedInfo = new StreamingQueryInfo(
-      "name",
-      1,
-      Seq(new SourceStatus("source1", None), new SourceStatus("source2", 
None)),
-      new SinkStatus("sink", CompositeOffset(None :: None :: Nil).toString))
-    val queryStarted = new 
StreamingQueryListener.QueryStarted(queryStartedInfo)
+    val queryStarted = new 
StreamingQueryListener.QueryStarted(StreamingQueryStatus.testStatus)
     val json = JsonProtocol.sparkEventToJson(queryStarted)
     val newQueryStarted = JsonProtocol.sparkEventFromJson(json)
       .asInstanceOf[StreamingQueryListener.QueryStarted]
-    assertStreamingQueryInfoEquals(queryStarted.queryInfo, 
newQueryStarted.queryInfo)
+    assertStreamingQueryInfoEquals(queryStarted.queryStatus, 
newQueryStarted.queryStatus)
   }
 
   test("QueryProgress serialization") {
-    val queryProcessInfo = new StreamingQueryInfo(
-      "name",
-      1,
-      Seq(
-        new SourceStatus("source1", Some(LongOffset(0).toString)),
-        new SourceStatus("source2", Some(LongOffset(1).toString))),
-      new SinkStatus("sink", new CompositeOffset(Array(None, 
Some(LongOffset(1)))).toString))
-    val queryProcess = new 
StreamingQueryListener.QueryProgress(queryProcessInfo)
+    val queryProcess = new 
StreamingQueryListener.QueryProgress(StreamingQueryStatus.testStatus)
     val json = JsonProtocol.sparkEventToJson(queryProcess)
     val newQueryProcess = JsonProtocol.sparkEventFromJson(json)
       .asInstanceOf[StreamingQueryListener.QueryProgress]
-    assertStreamingQueryInfoEquals(queryProcess.queryInfo, 
newQueryProcess.queryInfo)
+    assertStreamingQueryInfoEquals(queryProcess.queryStatus, 
newQueryProcess.queryStatus)
   }
 
   test("QueryTerminated serialization") {
-    val queryTerminatedInfo = new StreamingQueryInfo(
-      "name",
-      1,
-      Seq(
-        new SourceStatus("source1", Some(LongOffset(0).toString)),
-        new SourceStatus("source2", Some(LongOffset(1).toString))),
-      new SinkStatus("sink", new CompositeOffset(Array(None, 
Some(LongOffset(1)))).toString))
     val exception = new RuntimeException("exception")
     val queryQueryTerminated = new StreamingQueryListener.QueryTerminated(
-      queryTerminatedInfo,
+      StreamingQueryStatus.testStatus,
       Some(exception.getMessage))
     val json =
       JsonProtocol.sparkEventToJson(queryQueryTerminated)
     val newQueryTerminated = JsonProtocol.sparkEventFromJson(json)
       .asInstanceOf[StreamingQueryListener.QueryTerminated]
-    assertStreamingQueryInfoEquals(queryQueryTerminated.queryInfo, 
newQueryTerminated.queryInfo)
+    assertStreamingQueryInfoEquals(queryQueryTerminated.queryStatus, 
newQueryTerminated.queryStatus)
     assert(queryQueryTerminated.exception === newQueryTerminated.exception)
   }
 
   private def assertStreamingQueryInfoEquals(
-      expected: StreamingQueryInfo,
-      actual: StreamingQueryInfo): Unit = {
+      expected: StreamingQueryStatus,
+      actual: StreamingQueryStatus): Unit = {
     assert(expected.name === actual.name)
     assert(expected.sourceStatuses.size === actual.sourceStatuses.size)
     expected.sourceStatuses.zip(actual.sourceStatuses).foreach {
@@ -243,7 +229,7 @@ class StreamingQueryListenerSuite extends StreamTest with 
BeforeAndAfter {
 
   private def withListenerAdded(listener: StreamingQueryListener)(body: => 
Unit): Unit = {
     try {
-      failAfter(1 minute) {
+      failAfter(streamingTimeout) {
         spark.streams.addListener(listener)
         body
       }
@@ -258,49 +244,9 @@ class StreamingQueryListenerSuite extends StreamTest with 
BeforeAndAfter {
     val listenerBus = spark.streams invokePrivate listenerBusMethod()
     listenerBus.listeners.toArray.map(_.asInstanceOf[StreamingQueryListener])
   }
+}
 
-  class QueryStatusCollector extends StreamingQueryListener {
-    // to catch errors in the async listener events
-    @volatile private var asyncTestWaiter = new Waiter
-
-    @volatile var startStatus: StreamingQueryInfo = null
-    @volatile var terminationStatus: StreamingQueryInfo = null
-    @volatile var terminationException: Option[String] = null
-
-    val progressStatuses = new ConcurrentLinkedQueue[StreamingQueryInfo]
-
-    def reset(): Unit = {
-      startStatus = null
-      terminationStatus = null
-      progressStatuses.clear()
-      asyncTestWaiter = new Waiter
-    }
-
-    def checkAsyncErrors(): Unit = {
-      asyncTestWaiter.await(timeout(streamingTimeout))
-    }
-
-
-    override def onQueryStarted(queryStarted: QueryStarted): Unit = {
-      asyncTestWaiter {
-        startStatus = queryStarted.queryInfo
-      }
-    }
-
-    override def onQueryProgress(queryProgress: QueryProgress): Unit = {
-      asyncTestWaiter {
-        assert(startStatus != null, "onQueryProgress called before 
onQueryStarted")
-        progressStatuses.add(queryProgress.queryInfo)
-      }
-    }
-
-    override def onQueryTerminated(queryTerminated: QueryTerminated): Unit = {
-      asyncTestWaiter {
-        assert(startStatus != null, "onQueryTerminated called before 
onQueryStarted")
-        terminationStatus = queryTerminated.queryInfo
-        terminationException = queryTerminated.exception
-      }
-      asyncTestWaiter.dismiss()
-    }
-  }
+object StreamingQueryListenerSuite {
+  // Singleton reference to clock that does not get serialized in task closures
+  @volatile var clock: ManualClock = null
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/881e0eb0/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
index 88f1f18..9f8e2db 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
@@ -17,18 +17,27 @@
 
 package org.apache.spark.sql.streaming
 
+import org.scalactic.TolerantNumerics
+import org.scalatest.concurrent.Eventually._
 import org.scalatest.BeforeAndAfter
 
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.DataFrame
+import org.apache.spark.sql.streaming.StreamingQueryListener._
+import org.apache.spark.sql.types.StructType
 import org.apache.spark.SparkException
-import org.apache.spark.sql.execution.streaming.{CompositeOffset, LongOffset, 
MemoryStream, StreamExecution}
+import org.apache.spark.sql.execution.streaming._
 import org.apache.spark.util.Utils
 
 
-class StreamingQuerySuite extends StreamTest with BeforeAndAfter {
+class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging {
 
   import AwaitTerminationTester._
   import testImplicits._
 
+  // To make === between double tolerate inexact values
+  implicit val doubleEquality = TolerantNumerics.tolerantDoubleEquality(0.01)
+
   after {
     sqlContext.streams.active.foreach(_.stop())
   }
@@ -100,31 +109,145 @@ class StreamingQuerySuite extends StreamTest with 
BeforeAndAfter {
     )
   }
 
-  testQuietly("source and sink statuses") {
+  testQuietly("query statuses") {
     val inputData = MemoryStream[Int]
     val mapped = inputData.toDS().map(6 / _)
-
     testStream(mapped)(
-      AssertOnQuery(_.sourceStatuses.length === 1),
+      AssertOnQuery(q => q.status.name === q.name),
+      AssertOnQuery(q => q.status.id === q.id),
+      AssertOnQuery(_.status.timestamp <= System.currentTimeMillis),
+      AssertOnQuery(_.status.inputRate === 0.0),
+      AssertOnQuery(_.status.processingRate === 0.0),
+      AssertOnQuery(_.status.sourceStatuses.length === 1),
+      AssertOnQuery(_.status.sourceStatuses(0).description.contains("Memory")),
+      AssertOnQuery(_.status.sourceStatuses(0).offsetDesc === "-"),
+      AssertOnQuery(_.status.sourceStatuses(0).inputRate === 0.0),
+      AssertOnQuery(_.status.sourceStatuses(0).processingRate === 0.0),
+      AssertOnQuery(_.status.sinkStatus.description.contains("Memory")),
+      AssertOnQuery(_.status.sinkStatus.offsetDesc === CompositeOffset(None :: 
Nil).toString),
       AssertOnQuery(_.sourceStatuses(0).description.contains("Memory")),
-      AssertOnQuery(_.sourceStatuses(0).offsetDesc === None),
+      AssertOnQuery(_.sourceStatuses(0).offsetDesc === "-"),
+      AssertOnQuery(_.sourceStatuses(0).inputRate === 0.0),
+      AssertOnQuery(_.sourceStatuses(0).processingRate === 0.0),
       AssertOnQuery(_.sinkStatus.description.contains("Memory")),
       AssertOnQuery(_.sinkStatus.offsetDesc === new CompositeOffset(None :: 
Nil).toString),
+
       AddData(inputData, 1, 2),
       CheckAnswer(6, 3),
-      AssertOnQuery(_.sourceStatuses(0).offsetDesc === 
Some(LongOffset(0).toString)),
+      AssertOnQuery(_.status.timestamp <= System.currentTimeMillis),
+      AssertOnQuery(_.status.inputRate >= 0.0),
+      AssertOnQuery(_.status.processingRate >= 0.0),
+      AssertOnQuery(_.status.sourceStatuses.length === 1),
+      AssertOnQuery(_.status.sourceStatuses(0).description.contains("Memory")),
+      AssertOnQuery(_.status.sourceStatuses(0).offsetDesc === 
LongOffset(0).toString),
+      AssertOnQuery(_.status.sourceStatuses(0).inputRate >= 0.0),
+      AssertOnQuery(_.status.sourceStatuses(0).processingRate >= 0.0),
+      AssertOnQuery(_.status.sinkStatus.description.contains("Memory")),
+      AssertOnQuery(_.status.sinkStatus.offsetDesc ===
+        CompositeOffset.fill(LongOffset(0)).toString),
+      AssertOnQuery(_.sourceStatuses(0).offsetDesc === LongOffset(0).toString),
+      AssertOnQuery(_.sourceStatuses(0).inputRate >= 0.0),
+      AssertOnQuery(_.sourceStatuses(0).processingRate >= 0.0),
       AssertOnQuery(_.sinkStatus.offsetDesc === 
CompositeOffset.fill(LongOffset(0)).toString),
+
       AddData(inputData, 1, 2),
       CheckAnswer(6, 3, 6, 3),
-      AssertOnQuery(_.sourceStatuses(0).offsetDesc === 
Some(LongOffset(1).toString)),
+      AssertOnQuery(_.status.sourceStatuses(0).offsetDesc === 
LongOffset(1).toString),
+      AssertOnQuery(_.status.sinkStatus.offsetDesc ===
+        CompositeOffset.fill(LongOffset(1)).toString),
+      AssertOnQuery(_.sourceStatuses(0).offsetDesc === LongOffset(1).toString),
       AssertOnQuery(_.sinkStatus.offsetDesc === 
CompositeOffset.fill(LongOffset(1)).toString),
+
+      StopStream,
+      AssertOnQuery(_.status.inputRate === 0.0),
+      AssertOnQuery(_.status.processingRate === 0.0),
+      AssertOnQuery(_.status.sourceStatuses.length === 1),
+      AssertOnQuery(_.status.sourceStatuses(0).offsetDesc === 
LongOffset(1).toString),
+      AssertOnQuery(_.status.sourceStatuses(0).inputRate === 0.0),
+      AssertOnQuery(_.status.sourceStatuses(0).processingRate === 0.0),
+      AssertOnQuery(_.status.sinkStatus.offsetDesc ===
+        CompositeOffset.fill(LongOffset(1)).toString),
+      AssertOnQuery(_.sourceStatuses(0).offsetDesc === LongOffset(1).toString),
+      AssertOnQuery(_.sourceStatuses(0).inputRate === 0.0),
+      AssertOnQuery(_.sourceStatuses(0).processingRate === 0.0),
+      AssertOnQuery(_.sinkStatus.offsetDesc === 
CompositeOffset.fill(LongOffset(1)).toString),
+      AssertOnQuery(_.status.triggerDetails.isEmpty),
+
+      StartStream(),
       AddData(inputData, 0),
       ExpectFailure[SparkException],
-      AssertOnQuery(_.sourceStatuses(0).offsetDesc === 
Some(LongOffset(2).toString)),
+      AssertOnQuery(_.status.inputRate === 0.0),
+      AssertOnQuery(_.status.processingRate === 0.0),
+      AssertOnQuery(_.status.sourceStatuses.length === 1),
+      AssertOnQuery(_.status.sourceStatuses(0).offsetDesc === 
LongOffset(2).toString),
+      AssertOnQuery(_.status.sourceStatuses(0).inputRate === 0.0),
+      AssertOnQuery(_.status.sourceStatuses(0).processingRate === 0.0),
+      AssertOnQuery(_.status.sinkStatus.offsetDesc ===
+        CompositeOffset.fill(LongOffset(1)).toString),
+      AssertOnQuery(_.sourceStatuses(0).offsetDesc === LongOffset(2).toString),
+      AssertOnQuery(_.sourceStatuses(0).inputRate === 0.0),
+      AssertOnQuery(_.sourceStatuses(0).processingRate === 0.0),
       AssertOnQuery(_.sinkStatus.offsetDesc === 
CompositeOffset.fill(LongOffset(1)).toString)
     )
   }
 
+  test("codahale metrics") {
+    val inputData = MemoryStream[Int]
+
+    /** Whether metrics of a query is registered for reporting */
+    def isMetricsRegistered(query: StreamingQuery): Boolean = {
+      val sourceName = s"StructuredStreaming.${query.name}"
+      val sources = 
spark.sparkContext.env.metricsSystem.getSourcesByName(sourceName)
+      require(sources.size <= 1)
+      sources.nonEmpty
+    }
+    // Disabled by default
+    assert(spark.conf.get("spark.sql.streaming.metricsEnabled").toBoolean === 
false)
+
+    withSQLConf("spark.sql.streaming.metricsEnabled" -> "false") {
+      testStream(inputData.toDF)(
+        AssertOnQuery { q => !isMetricsRegistered(q) },
+        StopStream,
+        AssertOnQuery { q => !isMetricsRegistered(q) }
+      )
+    }
+
+    // Registered when enabled
+    withSQLConf("spark.sql.streaming.metricsEnabled" -> "true") {
+      testStream(inputData.toDF)(
+        AssertOnQuery { q => isMetricsRegistered(q) },
+        StopStream,
+        AssertOnQuery { q => !isMetricsRegistered(q) }
+      )
+    }
+  }
+
+  test("input row calculation with mixed batch and streaming sources") {
+    val streamingTriggerDF = spark.createDataset(1 to 10).toDF
+    val streamingInputDF = 
createSingleTriggerStreamingDF(streamingTriggerDF).toDF("value")
+    val staticInputDF = spark.createDataFrame(Seq(1 -> "1", 2 -> 
"2")).toDF("value", "anotherValue")
+
+    // Trigger input has 10 rows, static input has 2 rows,
+    // therefore after the first trigger, the calculated input rows should be 
10
+    val status = getFirstTriggerStatus(streamingInputDF.join(staticInputDF, 
"value"))
+    assert(status.triggerDetails.get("numRows.input.total") === "10")
+    assert(status.sourceStatuses.size === 1)
+    assert(status.sourceStatuses(0).triggerDetails.get("numRows.input.source") 
=== "10")
+  }
+
+  test("input row calculation with trigger DF having multiple leaves") {
+    val streamingTriggerDF =
+      spark.createDataset(1 to 5).toDF.union(spark.createDataset(6 to 10).toDF)
+    require(streamingTriggerDF.logicalPlan.collectLeaves().size > 1)
+    val streamingInputDF = createSingleTriggerStreamingDF(streamingTriggerDF)
+
+    // After the first trigger, the calculated input rows should be 10
+    val status = getFirstTriggerStatus(streamingInputDF)
+    assert(status.triggerDetails.get("numRows.input.total") === "10")
+    assert(status.sourceStatuses.size === 1)
+    assert(status.sourceStatuses(0).triggerDetails.get("numRows.input.source") 
=== "10")
+  }
+
   testQuietly("StreamExecution metadata garbage collection") {
     val inputData = MemoryStream[Int]
     val mapped = inputData.toDS().map(6 / _)
@@ -149,6 +272,45 @@ class StreamingQuerySuite extends StreamTest with 
BeforeAndAfter {
     )
   }
 
+  /** Create a streaming DF that only execute one batch in which it returns 
the given static DF */
+  private def createSingleTriggerStreamingDF(triggerDF: DataFrame): DataFrame 
= {
+    require(!triggerDF.isStreaming)
+    // A streaming Source that generate only on trigger and returns the given 
Dataframe as batch
+    val source = new Source() {
+      override def schema: StructType = triggerDF.schema
+      override def getOffset: Option[Offset] = Some(LongOffset(0))
+      override def getBatch(start: Option[Offset], end: Offset): DataFrame = 
triggerDF
+      override def stop(): Unit = {}
+    }
+    StreamingExecutionRelation(source)
+  }
+
+  /** Returns the query status at the end of the first trigger of streaming DF 
*/
+  private def getFirstTriggerStatus(streamingDF: DataFrame): 
StreamingQueryStatus = {
+    // A StreamingQueryListener that gets the query status after the first 
completed trigger
+    val listener = new StreamingQueryListener {
+      @volatile var firstStatus: StreamingQueryStatus = null
+      override def onQueryStarted(queryStarted: QueryStarted): Unit = { }
+      override def onQueryProgress(queryProgress: QueryProgress): Unit = {
+       if (firstStatus == null) firstStatus = queryProgress.queryStatus
+      }
+      override def onQueryTerminated(queryTerminated: QueryTerminated): Unit = 
{ }
+    }
+
+    try {
+      spark.streams.addListener(listener)
+      val q = 
streamingDF.writeStream.format("memory").queryName("test").start()
+      q.processAllAvailable()
+      eventually(timeout(streamingTimeout)) {
+        assert(listener.firstStatus != null)
+      }
+      listener.firstStatus
+    } finally {
+      spark.streams.active.map(_.stop())
+      spark.streams.removeListener(listener)
+    }
+  }
+
   /**
    * A [[StreamAction]] to test the behavior of 
`StreamingQuery.awaitTermination()`.
    *


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to