[2/2] spark git commit: [SPARK-17731][SQL][STREAMING] Metrics for structured streaming for branch-2.0

2016-10-17 Thread tdas
[SPARK-17731][SQL][STREAMING] Metrics for structured streaming for branch-2.0

**This PR adds the same metrics to branch-2.0 that was added to master in 
#15307.**

The differences compared to the #15307 are
- The new configuration is added only in the `SQLConf `object (i.e. 
`SQLConf.STREAMING_METRICS_ENABLED`) and not in the `SQLConf` class (i.e. no 
`SQLConf.isStreamingMetricsEnabled`). Spark master has all the streaming 
configurations exposed as actual fields in SQLConf class (e.g. 
[streamingPollingDelay](https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala#L642)),
 but [not in Spark 
2.0](https://github.com/apache/spark/blob/branch-2.0/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala#L608).
 So I didnt add it in this 2.0 PR.

- In the previous master PR, the aboveconfiguration was read in 
`StreamExecution` as 
`sparkSession.sessionState.conf.isStreamingMetricsEnabled`. In this 2.0 PR, I 
am instead reading it as 
`sparkSession.conf.get(STREAMING_METRICS_ENABLED)`(i.e. no `sessionState`) to 
keep it consistent with how other confs are read in `StreamExecution` (e.g. 
[STREAMING_POLLING_DELAY](https://github.com/tdas/spark/blob/ee8e899e4c274c363a8b4d13e8bf57b0b467a50e/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala#L62)).

- Different Mima exclusions

--

## What changes were proposed in this pull request?

Metrics are needed for monitoring structured streaming apps. Here is the design 
doc for implementing the necessary metrics.
https://docs.google.com/document/d/1NIdcGuR1B3WIe8t7VxLrt58TJB4DtipWEbj5I_mzJys/edit?usp=sharing

Specifically, this PR adds the following public APIs changes.

- `StreamingQuery.status` returns a `StreamingQueryStatus` object (renamed from 
`StreamingQueryInfo`, see later)

- `StreamingQueryStatus` has the following important fields
  - inputRate - Current rate (rows/sec) at which data is being generated by all 
the sources
  - processingRate - Current rate (rows/sec) at which the query is processing 
data from
  all the sources
  - ~~outputRate~~ - *Does not work with wholestage codegen*
  - latency - Current average latency between the data being available in 
source and the sink writing the corresponding output
  - sourceStatuses: Array[SourceStatus] - Current statuses of the sources
  - sinkStatus: SinkStatus - Current status of the sink
  - triggerStatus - Low-level detailed status of the last completed/currently 
active trigger
- latencies - getOffset, getBatch, full trigger, wal writes
- timestamps - trigger start, finish, after getOffset, after getBatch
- numRows - input, output, state total/updated rows for aggregations

- `SourceStatus` has the following important fields
  - inputRate - Current rate (rows/sec) at which data is being generated by the 
source
  - processingRate - Current rate (rows/sec) at which the query is processing 
data from the source
  - triggerStatus - Low-level detailed status of the last completed/currently 
active trigger

- Python API for `StreamingQuery.status()`

### Breaking changes to existing APIs

**Existing direct public facing APIs**
- Deprecated direct public-facing APIs `StreamingQuery.sourceStatuses` and 
`StreamingQuery.sinkStatus` in favour of 
`StreamingQuery.status.sourceStatuses/sinkStatus`.
  - Branch 2.0 should have it deprecated, master should have it removed.

**Existing advanced listener APIs**
- `StreamingQueryInfo` renamed to `StreamingQueryStatus` for consistency with 
`SourceStatus`, `SinkStatus`
   - Earlier StreamingQueryInfo was used only in the advanced listener API, but 
now it is used in direct public-facing API (StreamingQuery.status)

- Field `queryInfo` in listener events `QueryStarted`, `QueryProgress`, 
`QueryTerminated` changed have name `queryStatus` and return type 
`StreamingQueryStatus`.

- Field `offsetDesc` in `SourceStatus` was Option[String], converted it to 
`String`.

- For `SourceStatus` and `SinkStatus` made constructor private instead of 
private[sql] to make them more java-safe. Instead added `private[sql] object 
SourceStatus/SinkStatus.apply()` which are harder to accidentally use in Java.

## How was this patch tested?

Old and new unit tests.
- Rate calculation and other internal logic of StreamMetrics tested by 
StreamMetricsSuite.
- New info in statuses returned through StreamingQueryListener is tested in 
StreamingQueryListenerSuite.
- New and old info returned through StreamingQuery.status is tested in 
StreamingQuerySuite.
- Source-specific tests for making sure input rows are counted are is 
source-specific test suites.
- Additional tests to test minor additions in LocalTableScanExec, StateStore, 
etc.

Metrics also manually tested using Ganglia sink

Author: Tathagata Das 

Closes #15472 from tdas/SPARK-17731-branch-2.0.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo

[2/2] spark git commit: [SPARK-17731][SQL][STREAMING] Metrics for structured streaming

2016-10-13 Thread tdas
[SPARK-17731][SQL][STREAMING] Metrics for structured streaming

## What changes were proposed in this pull request?

Metrics are needed for monitoring structured streaming apps. Here is the design 
doc for implementing the necessary metrics.
https://docs.google.com/document/d/1NIdcGuR1B3WIe8t7VxLrt58TJB4DtipWEbj5I_mzJys/edit?usp=sharing

Specifically, this PR adds the following public APIs changes.

### New APIs
- `StreamingQuery.status` returns a `StreamingQueryStatus` object (renamed from 
`StreamingQueryInfo`, see later)

- `StreamingQueryStatus` has the following important fields
  - inputRate - Current rate (rows/sec) at which data is being generated by all 
the sources
  - processingRate - Current rate (rows/sec) at which the query is processing 
data from
  all the sources
  - ~~outputRate~~ - *Does not work with wholestage codegen*
  - latency - Current average latency between the data being available in 
source and the sink writing the corresponding output
  - sourceStatuses: Array[SourceStatus] - Current statuses of the sources
  - sinkStatus: SinkStatus - Current status of the sink
  - triggerStatus - Low-level detailed status of the last completed/currently 
active trigger
- latencies - getOffset, getBatch, full trigger, wal writes
- timestamps - trigger start, finish, after getOffset, after getBatch
- numRows - input, output, state total/updated rows for aggregations

- `SourceStatus` has the following important fields
  - inputRate - Current rate (rows/sec) at which data is being generated by the 
source
  - processingRate - Current rate (rows/sec) at which the query is processing 
data from the source
  - triggerStatus - Low-level detailed status of the last completed/currently 
active trigger

- Python API for `StreamingQuery.status()`

### Breaking changes to existing APIs
**Existing direct public facing APIs**
- Deprecated direct public-facing APIs `StreamingQuery.sourceStatuses` and 
`StreamingQuery.sinkStatus` in favour of 
`StreamingQuery.status.sourceStatuses/sinkStatus`.
  - Branch 2.0 should have it deprecated, master should have it removed.

**Existing advanced listener APIs**
- `StreamingQueryInfo` renamed to `StreamingQueryStatus` for consistency with 
`SourceStatus`, `SinkStatus`
   - Earlier StreamingQueryInfo was used only in the advanced listener API, but 
now it is used in direct public-facing API (StreamingQuery.status)

- Field `queryInfo` in listener events `QueryStarted`, `QueryProgress`, 
`QueryTerminated` changed have name `queryStatus` and return type 
`StreamingQueryStatus`.

- Field `offsetDesc` in `SourceStatus` was Option[String], converted it to 
`String`.

- For `SourceStatus` and `SinkStatus` made constructor private instead of 
private[sql] to make them more java-safe. Instead added `private[sql] object 
SourceStatus/SinkStatus.apply()` which are harder to accidentally use in Java.

## How was this patch tested?

Old and new unit tests.
- Rate calculation and other internal logic of StreamMetrics tested by 
StreamMetricsSuite.
- New info in statuses returned through StreamingQueryListener is tested in 
StreamingQueryListenerSuite.
- New and old info returned through StreamingQuery.status is tested in 
StreamingQuerySuite.
- Source-specific tests for making sure input rows are counted are is 
source-specific test suites.
- Additional tests to test minor additions in LocalTableScanExec, StateStore, 
etc.

Metrics also manually tested using Ganglia sink

Author: Tathagata Das 

Closes #15307 from tdas/SPARK-17731.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7106866c
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7106866c
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7106866c

Branch: refs/heads/master
Commit: 7106866c220c73960c6fe2a70e4911516617e21f
Parents: 08eac35
Author: Tathagata Das 
Authored: Thu Oct 13 13:36:26 2016 -0700
Committer: Tathagata Das 
Committed: Thu Oct 13 13:36:26 2016 -0700

--
 .../spark/sql/kafka010/KafkaSourceSuite.scala   |  27 ++
 project/MimaExcludes.scala  |  13 +
 python/pyspark/sql/streaming.py | 301 ++
 .../spark/sql/catalyst/trees/TreeNode.scala |   7 +
 .../sql/execution/LocalTableScanExec.scala  |   5 +-
 .../execution/streaming/StatefulAggregate.scala |  31 +-
 .../execution/streaming/StreamExecution.scala   | 307 ++-
 .../sql/execution/streaming/StreamMetrics.scala | 242 +++
 .../spark/sql/execution/streaming/memory.scala  |   7 +
 .../state/HDFSBackedStateStoreProvider.scala|   2 +
 .../execution/streaming/state/StateStore.scala  |   3 +
 .../org/apache/spark/sql/internal/SQLConf.scala |   8 +
 .../apache/spark/sql/streaming/SinkStatus.scala |