GitHub user tdas opened a pull request:
https://github.com/apache/spark/pull/15472
[SPARK-17731][SQL][STREAMING] Metrics for structured streaming
## What changes were proposed in this pull request?
**This PR adds the same metrics to branch-2.0 that was added to master in
#15307**
Metrics are needed for monitoring structured streaming apps. Here is the
design doc for implementing the necessary metrics.
https://docs.google.com/document/d/1NIdcGuR1B3WIe8t7VxLrt58TJB4DtipWEbj5I_mzJys/edit?usp=sharing
Specifically, this PR adds the following public APIs changes.
- `StreamingQuery.status` returns a `StreamingQueryStatus` object (renamed
from `StreamingQueryInfo`, see later)
- `StreamingQueryStatus` has the following important fields
- inputRate - Current rate (rows/sec) at which data is being generated by
all the sources
- processingRate - Current rate (rows/sec) at which the query is
processing data from
all the sources
- ~~outputRate~~ - *Does not work with wholestage codegen*
- latency - Current average latency between the data being available in
source and the sink writing the corresponding output
- sourceStatuses: Array[SourceStatus] - Current statuses of the sources
- sinkStatus: SinkStatus - Current status of the sink
- triggerStatus - Low-level detailed status of the last
completed/currently active trigger
- latencies - getOffset, getBatch, full trigger, wal writes
- timestamps - trigger start, finish, after getOffset, after getBatch
- numRows - input, output, state total/updated rows for aggregations
- `SourceStatus` has the following important fields
- inputRate - Current rate (rows/sec) at which data is being generated by
the source
- processingRate - Current rate (rows/sec) at which the query is
processing data from the source
- triggerStatus - Low-level detailed status of the last
completed/currently active trigger
- Python API for `StreamingQuery.status()`
**Existing direct public facing APIs**
- Deprecated direct public-facing APIs `StreamingQuery.sourceStatuses` and
`StreamingQuery.sinkStatus` in favour of
`StreamingQuery.status.sourceStatuses/sinkStatus`.
- Branch 2.0 should have it deprecated, master should have it removed.
**Existing advanced listener APIs**
- `StreamingQueryInfo` renamed to `StreamingQueryStatus` for consistency
with `SourceStatus`, `SinkStatus`
- Earlier StreamingQueryInfo was used only in the advanced listener API,
but now it is used in direct public-facing API (StreamingQuery.status)
- Field `queryInfo` in listener events `QueryStarted`, `QueryProgress`,
`QueryTerminated` changed have name `queryStatus` and return type
`StreamingQueryStatus`.
- Field `offsetDesc` in `SourceStatus` was Option[String], converted it to
`String`.
- For `SourceStatus` and `SinkStatus` made constructor private instead of
private[sql] to make them more java-safe. Instead added `private[sql] object
SourceStatus/SinkStatus.apply()` which are harder to accidentally use in Java.
## How was this patch tested?
Old and new unit tests.
- Rate calculation and other internal logic of StreamMetrics tested by
StreamMetricsSuite.
- New info in statuses returned through StreamingQueryListener is tested in
StreamingQueryListenerSuite.
- New and old info returned through StreamingQuery.status is tested in
StreamingQuerySuite.
- Source-specific tests for making sure input rows are counted are is
source-specific test suites.
- Additional tests to test minor additions in LocalTableScanExec,
StateStore, etc.
Metrics also manually tested using Ganglia sink
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/tdas/spark SPARK-17731-branch-2.0
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/spark/pull/15472.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #15472
----
commit ee8e899e4c274c363a8b4d13e8bf57b0b467a50e
Author: Tathagata Das <[email protected]>
Date: 2016-10-13T20:36:26Z
[SPARK-17731][SQL][STREAMING] Metrics for structured streaming
Metrics are needed for monitoring structured streaming apps. Here is the
design doc for implementing the necessary metrics.
https://docs.google.com/document/d/1NIdcGuR1B3WIe8t7VxLrt58TJB4DtipWEbj5I_mzJys/edit?usp=sharing
Specifically, this PR adds the following public APIs changes.
- `StreamingQuery.status` returns a `StreamingQueryStatus` object (renamed
from `StreamingQueryInfo`, see later)
- `StreamingQueryStatus` has the following important fields
- inputRate - Current rate (rows/sec) at which data is being generated by
all the sources
- processingRate - Current rate (rows/sec) at which the query is
processing data from
all the sources
- ~~outputRate~~ - *Does not work with wholestage codegen*
- latency - Current average latency between the data being available in
source and the sink writing the corresponding output
- sourceStatuses: Array[SourceStatus] - Current statuses of the sources
- sinkStatus: SinkStatus - Current status of the sink
- triggerStatus - Low-level detailed status of the last
completed/currently active trigger
- latencies - getOffset, getBatch, full trigger, wal writes
- timestamps - trigger start, finish, after getOffset, after getBatch
- numRows - input, output, state total/updated rows for aggregations
- `SourceStatus` has the following important fields
- inputRate - Current rate (rows/sec) at which data is being generated by
the source
- processingRate - Current rate (rows/sec) at which the query is
processing data from the source
- triggerStatus - Low-level detailed status of the last
completed/currently active trigger
- Python API for `StreamingQuery.status()`
**Existing direct public facing APIs**
- Deprecated direct public-facing APIs `StreamingQuery.sourceStatuses` and
`StreamingQuery.sinkStatus` in favour of
`StreamingQuery.status.sourceStatuses/sinkStatus`.
- Branch 2.0 should have it deprecated, master should have it removed.
**Existing advanced listener APIs**
- `StreamingQueryInfo` renamed to `StreamingQueryStatus` for consistency
with `SourceStatus`, `SinkStatus`
- Earlier StreamingQueryInfo was used only in the advanced listener API,
but now it is used in direct public-facing API (StreamingQuery.status)
- Field `queryInfo` in listener events `QueryStarted`, `QueryProgress`,
`QueryTerminated` changed have name `queryStatus` and return type
`StreamingQueryStatus`.
- Field `offsetDesc` in `SourceStatus` was Option[String], converted it to
`String`.
- For `SourceStatus` and `SinkStatus` made constructor private instead of
private[sql] to make them more java-safe. Instead added `private[sql] object
SourceStatus/SinkStatus.apply()` which are harder to accidentally use in Java.
Old and new unit tests.
- Rate calculation and other internal logic of StreamMetrics tested by
StreamMetricsSuite.
- New info in statuses returned through StreamingQueryListener is tested in
StreamingQueryListenerSuite.
- New and old info returned through StreamingQuery.status is tested in
StreamingQuerySuite.
- Source-specific tests for making sure input rows are counted are is
source-specific test suites.
- Additional tests to test minor additions in LocalTableScanExec,
StateStore, etc.
Metrics also manually tested using Ganglia sink
Author: Tathagata Das <[email protected]>
Closes #15307 from tdas/SPARK-17731.
----
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]