Github user davies commented on a diff in the pull request:

    https://github.com/apache/spark/pull/12673#discussion_r61150611
  
    --- Diff: python/pyspark/sql/streaming.py ---
    @@ -87,6 +87,86 @@ def stop(self):
             self._jcq.stop()
     
     
    +class ContinuousQueryManager(object):
    +    """A class to manage all the :class:`ContinuousQuery` 
ContinuousQueries active
    +    on a :class:`SQLContext`.
    +
    +    .. note:: Experimental
    +
    +    .. versionadded:: 2.0
    +    """
    +
    +    def __init__(self, jcqm):
    +        self._jcqm = jcqm
    +
    +    @property
    +    @ignore_unicode_prefix
    +    @since(2.0)
    +    def active(self):
    +        """Returns a list of active queries associated with this SQLContext
    +
    +        >>> cq = 
df.write.format('memory').queryName('this_query').startStream()
    +        >>> cqm = sqlContext.streams
    +        >>> # get the list of active continuous queries
    +        >>> [q.name for q in cqm.active]
    +        [u'this_query']
    +        >>> cq.stop()
    +        """
    +        return [ContinuousQuery(jcq) for jcq in self._jcqm.active()]
    +
    +    @since(2.0)
    +    def get(self, name):
    +        """Returns an active query from this SQLContext or throws 
exception if an active query
    +        with this name doesn't exist.
    +
    +        >>> df.write.format('memory').queryName('this_query').startStream()
    +        >>> cq = sqlContext.streams.get('this_query')
    +        >>> cq.isActive
    +        True
    +        >>> cq.stop()
    +        """
    +        if type(name) != str or len(name.strip()) == 0:
    +            raise ValueError("The name for the query must be a non-empty 
string. Got: %s" % name)
    +        return ContinuousQuery(self._jcqm.get(name))
    +
    +    @since(2.0)
    +    def awaitAnyTermination(self, timeout=None):
    +        """Wait until any of the queries on the associated SQLContext has 
terminated since the
    +        creation of the context, or since :func:`resetTerminated()` was 
called. If any query was
    +        terminated with an exception, then the exception will be thrown.
    +        If `timeout` is set, it returns whether the query has terminated 
or not within the
    +        `timeout` seconds.
    +
    +        If a query has terminated, then subsequent calls to 
:func:`awaitAnyTermination()` will
    +        either return immediately (if the query was terminated by 
:func:`query.stop()`),
    +        or throw the exception immediately (if the query was terminated 
with exception). Use
    +        :func:`resetTerminated()` to clear past terminations and wait for 
new terminations.
    +
    +        In the case where multiple queries have terminated since 
:func:`resetTermination()`
    +        was called, if any query has terminated with exception, then 
:func:`awaitAnyTermination()`
    +        will throw any of the exception. For correctly documenting 
exceptions across multiple
    +        queries, users need to stop all of them after any of them 
terminates with exception, and
    +        then check the `query.exception()` for each query.
    +
    +        throws :class:`ContinuousQueryException`, if `this` query has 
terminated with an exception
    +        """
    +        if timeout is not None:
    +            if type(timeout) != int or timeout < 0:
    +                raise ValueError("timeout must be a positive integer. Got 
%s" % timeout)
    +            return self._jcqm.awaitAnyTermination(timeout)
    --- End diff --
    
    Convert seconds to milli-seconds here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to