Github user brkyvz commented on a diff in the pull request:
https://github.com/apache/spark/pull/15307#discussion_r82706923
--- Diff: python/pyspark/sql/streaming.py ---
@@ -189,6 +189,282 @@ def resetTerminated(self):
self._jsqm.resetTerminated()
+class StreamingQueryStatus(object):
+ """A class used to report information about the progress of a
StreamingQuery.
+
+ .. note:: Experimental
+
+ .. versionadded:: 2.1
+ """
+
+ def __init__(self, jsqs):
+ self._jsqs = jsqs
+
+ def __str__(self):
+ """
+ Pretty string of this query status.
+
+ >>> print(sqs)
+ StreamingQueryStatus:
+ Query name: query
+ Query id: 1
+ Status timestamp: 123
+ Input rate: 1.0 rows/sec
+ Processing rate 2.0 rows/sec
+ Latency: 345.0 ms
+ Trigger status:
+ key: value
+ Source statuses [1 source]:
+ Source 1: MySource1
+ Available offset: #0
+ Input rate: 4.0 rows/sec
+ Processing rate: 5.0 rows/sec
+ Trigger status:
+ key: value
+ Sink status: MySink
+ Committed offsets: [#1, -]
+ """
+ return self._jsqs.toString()
+
+ @property
+ @ignore_unicode_prefix
+ @since(2.1)
+ def name(self):
+ """
+ Name of the query. This name is unique across all active queries.
+
+ >>> sqs.name
+ u'query'
+ """
+ return self._jsqs.name()
+
+ @property
+ @since(2.1)
+ def id(self):
+ """
+ Id of the query. This id is unique across all queries that have
been started in
+ the current process.
+
+ >>> int(sqs.id)
+ 1
+ """
+ return self._jsqs.id()
+
+ @property
+ @since(2.1)
+ def timestamp(self):
+ """
+ Timestamp (ms) of when this query was generated.
+
+ >>> int(sqs.timestamp)
+ 123
+ """
+ return self._jsqs.timestamp()
+
+ @property
+ @since(2.1)
+ def inputRate(self):
+ """
+ Current rate (rows/sec) at which data is being generated by all
the sources.
+
+ >>> sqs.inputRate
+ 1.0
+ """
+ return self._jsqs.inputRate()
+
+ @property
+ @since(2.1)
+ def processingRate(self):
+ """
+ Current rate (rows/sec) at which the query is processing data from
all the sources.
+
+ >>> sqs.processingRate
+ 2.0
+ """
+ return self._jsqs.processingRate()
+
+ @property
+ @since(2.1)
+ def latency(self):
+ """
+ Current average latency between the data being available in source
and the sink
+ writing the corresponding output.
+
+ >>> sqs.latency
+ 345.0
+ """
+ if (self._jsqs.latency().nonEmpty()):
+ return self._jsqs.latency().get()
+ else:
+ return None
+
+ @property
+ @since(2.1)
+ def sourceStatuses(self):
+ """
+ Current statuses of the sources.
--- End diff --
Returns the current statuses of the sources as a list?
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]