[SPARK-17731][SQL][STREAMING] Metrics for structured streaming for branch-2.0
**This PR adds the same metrics to branch-2.0 that was added to master in
#15307.**
The differences compared to the #15307 are
- The new configuration is added only in the `SQLConf `object (i.e.
`SQLConf.STREAMING_METRICS_ENABLED`) and not in the `SQLConf` class (i.e. no
`SQLConf.isStreamingMetricsEnabled`). Spark master has all the streaming
configurations exposed as actual fields in SQLConf class (e.g.
[streamingPollingDelay](https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala#L642)),
but [not in Spark
2.0](https://github.com/apache/spark/blob/branch-2.0/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala#L608).
So I didnt add it in this 2.0 PR.
- In the previous master PR, the aboveconfiguration was read in
`StreamExecution` as
`sparkSession.sessionState.conf.isStreamingMetricsEnabled`. In this 2.0 PR, I
am instead reading it as
`sparkSession.conf.get(STREAMING_METRICS_ENABLED)`(i.e. no `sessionState`) to
keep it consistent with how other confs are read in `StreamExecution` (e.g.
[STREAMING_POLLING_DELAY](https://github.com/tdas/spark/blob/ee8e899e4c274c363a8b4d13e8bf57b0b467a50e/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala#L62)).
- Different Mima exclusions
--
## What changes were proposed in this pull request?
Metrics are needed for monitoring structured streaming apps. Here is the design
doc for implementing the necessary metrics.
https://docs.google.com/document/d/1NIdcGuR1B3WIe8t7VxLrt58TJB4DtipWEbj5I_mzJys/edit?usp=sharing
Specifically, this PR adds the following public APIs changes.
- `StreamingQuery.status` returns a `StreamingQueryStatus` object (renamed from
`StreamingQueryInfo`, see later)
- `StreamingQueryStatus` has the following important fields
- inputRate - Current rate (rows/sec) at which data is being generated by all
the sources
- processingRate - Current rate (rows/sec) at which the query is processing
data from
all the sources
- ~~outputRate~~ - *Does not work with wholestage codegen*
- latency - Current average latency between the data being available in
source and the sink writing the corresponding output
- sourceStatuses: Array[SourceStatus] - Current statuses of the sources
- sinkStatus: SinkStatus - Current status of the sink
- triggerStatus - Low-level detailed status of the last completed/currently
active trigger
- latencies - getOffset, getBatch, full trigger, wal writes
- timestamps - trigger start, finish, after getOffset, after getBatch
- numRows - input, output, state total/updated rows for aggregations
- `SourceStatus` has the following important fields
- inputRate - Current rate (rows/sec) at which data is being generated by the
source
- processingRate - Current rate (rows/sec) at which the query is processing
data from the source
- triggerStatus - Low-level detailed status of the last completed/currently
active trigger
- Python API for `StreamingQuery.status()`
### Breaking changes to existing APIs
**Existing direct public facing APIs**
- Deprecated direct public-facing APIs `StreamingQuery.sourceStatuses` and
`StreamingQuery.sinkStatus` in favour of
`StreamingQuery.status.sourceStatuses/sinkStatus`.
- Branch 2.0 should have it deprecated, master should have it removed.
**Existing advanced listener APIs**
- `StreamingQueryInfo` renamed to `StreamingQueryStatus` for consistency with
`SourceStatus`, `SinkStatus`
- Earlier StreamingQueryInfo was used only in the advanced listener API, but
now it is used in direct public-facing API (StreamingQuery.status)
- Field `queryInfo` in listener events `QueryStarted`, `QueryProgress`,
`QueryTerminated` changed have name `queryStatus` and return type
`StreamingQueryStatus`.
- Field `offsetDesc` in `SourceStatus` was Option[String], converted it to
`String`.
- For `SourceStatus` and `SinkStatus` made constructor private instead of
private[sql] to make them more java-safe. Instead added `private[sql] object
SourceStatus/SinkStatus.apply()` which are harder to accidentally use in Java.
## How was this patch tested?
Old and new unit tests.
- Rate calculation and other internal logic of StreamMetrics tested by
StreamMetricsSuite.
- New info in statuses returned through StreamingQueryListener is tested in
StreamingQueryListenerSuite.
- New and old info returned through StreamingQuery.status is tested in
StreamingQuerySuite.
- Source-specific tests for making sure input rows are counted are is
source-specific test suites.
- Additional tests to test minor additions in LocalTableScanExec, StateStore,
etc.
Metrics also manually tested using Ganglia sink
Author: Tathagata Das
Closes #15472 from tdas/SPARK-17731-branch-2.0.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo