[SPARK-16266][SQL][STREAING] Moved DataStreamReader/Writer from pyspark.sql to pyspark.sql.streaming
## What changes were proposed in this pull request? - Moved DataStreamReader/Writer from pyspark.sql to pyspark.sql.streaming to make them consistent with scala packaging - Exposed the necessary classes in sql.streaming package so that they appear in the docs - Added pyspark.sql.streaming module to the docs ## How was this patch tested? - updated unit tests. - generated docs for testing visibility of pyspark.sql.streaming classes. Author: Tathagata Das <tathagata.das1...@gmail.com> Closes #13955 from tdas/SPARK-16266. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/6650c053 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/6650c053 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/6650c053 Branch: refs/heads/branch-2.0 Commit: 6650c0533e5c60f8653d2e0a608a42d5838fa553 Parents: 345212b Author: Tathagata Das <tathagata.das1...@gmail.com> Authored: Tue Jun 28 22:07:11 2016 -0700 Committer: Shixiong Zhu <shixi...@databricks.com> Committed: Tue Jun 28 22:17:57 2016 -0700 ---------------------------------------------------------------------- python/docs/pyspark.sql.rst | 6 + python/pyspark/sql/context.py | 3 +- python/pyspark/sql/dataframe.py | 3 +- python/pyspark/sql/readwriter.py | 493 +-------------------------------- python/pyspark/sql/session.py | 3 +- python/pyspark/sql/streaming.py | 502 +++++++++++++++++++++++++++++++++- 6 files changed, 511 insertions(+), 499 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/6650c053/python/docs/pyspark.sql.rst ---------------------------------------------------------------------- diff --git a/python/docs/pyspark.sql.rst b/python/docs/pyspark.sql.rst index 6259379..3be9533 100644 --- a/python/docs/pyspark.sql.rst +++ b/python/docs/pyspark.sql.rst @@ -21,3 +21,9 @@ pyspark.sql.functions module .. automodule:: pyspark.sql.functions :members: :undoc-members: + +pyspark.sql.streaming module +---------------------------- +.. automodule:: pyspark.sql.streaming + :members: + :undoc-members: http://git-wip-us.apache.org/repos/asf/spark/blob/6650c053/python/pyspark/sql/context.py ---------------------------------------------------------------------- diff --git a/python/pyspark/sql/context.py b/python/pyspark/sql/context.py index b5dde13..3503fb9 100644 --- a/python/pyspark/sql/context.py +++ b/python/pyspark/sql/context.py @@ -26,7 +26,8 @@ from pyspark import since from pyspark.rdd import ignore_unicode_prefix from pyspark.sql.session import _monkey_patch_RDD, SparkSession from pyspark.sql.dataframe import DataFrame -from pyspark.sql.readwriter import DataFrameReader, DataStreamReader +from pyspark.sql.readwriter import DataFrameReader +from pyspark.sql.streaming import DataStreamReader from pyspark.sql.types import Row, StringType from pyspark.sql.utils import install_exception_handler http://git-wip-us.apache.org/repos/asf/spark/blob/6650c053/python/pyspark/sql/dataframe.py ---------------------------------------------------------------------- diff --git a/python/pyspark/sql/dataframe.py b/python/pyspark/sql/dataframe.py index c8c8e7d..e6e7029 100644 --- a/python/pyspark/sql/dataframe.py +++ b/python/pyspark/sql/dataframe.py @@ -33,7 +33,8 @@ from pyspark.storagelevel import StorageLevel from pyspark.traceback_utils import SCCallSiteSync from pyspark.sql.types import _parse_datatype_json_string from pyspark.sql.column import Column, _to_seq, _to_list, _to_java_column -from pyspark.sql.readwriter import DataFrameWriter, DataStreamWriter +from pyspark.sql.readwriter import DataFrameWriter +from pyspark.sql.streaming import DataStreamWriter from pyspark.sql.types import * __all__ = ["DataFrame", "DataFrameNaFunctions", "DataFrameStatFunctions"] http://git-wip-us.apache.org/repos/asf/spark/blob/6650c053/python/pyspark/sql/readwriter.py ---------------------------------------------------------------------- diff --git a/python/pyspark/sql/readwriter.py b/python/pyspark/sql/readwriter.py index 3f28d7a..10f307b 100644 --- a/python/pyspark/sql/readwriter.py +++ b/python/pyspark/sql/readwriter.py @@ -28,7 +28,7 @@ from pyspark.sql.column import _to_seq from pyspark.sql.types import * from pyspark.sql import utils -__all__ = ["DataFrameReader", "DataFrameWriter", "DataStreamReader", "DataStreamWriter"] +__all__ = ["DataFrameReader", "DataFrameWriter"] def to_str(value): @@ -724,494 +724,6 @@ class DataFrameWriter(OptionUtils): self._jwrite.mode(mode).jdbc(url, table, jprop) -class DataStreamReader(OptionUtils): - """ - Interface used to load a streaming :class:`DataFrame` from external storage systems - (e.g. file systems, key-value stores, etc). Use :func:`spark.readStream` - to access this. - - .. note:: Experimental. - - .. versionadded:: 2.0 - """ - - def __init__(self, spark): - self._jreader = spark._ssql_ctx.readStream() - self._spark = spark - - def _df(self, jdf): - from pyspark.sql.dataframe import DataFrame - return DataFrame(jdf, self._spark) - - @since(2.0) - def format(self, source): - """Specifies the input data source format. - - .. note:: Experimental. - - :param source: string, name of the data source, e.g. 'json', 'parquet'. - - >>> s = spark.readStream.format("text") - """ - self._jreader = self._jreader.format(source) - return self - - @since(2.0) - def schema(self, schema): - """Specifies the input schema. - - Some data sources (e.g. JSON) can infer the input schema automatically from data. - By specifying the schema here, the underlying data source can skip the schema - inference step, and thus speed up data loading. - - .. note:: Experimental. - - :param schema: a StructType object - - >>> s = spark.readStream.schema(sdf_schema) - """ - if not isinstance(schema, StructType): - raise TypeError("schema should be StructType") - jschema = self._spark._ssql_ctx.parseDataType(schema.json()) - self._jreader = self._jreader.schema(jschema) - return self - - @since(2.0) - def option(self, key, value): - """Adds an input option for the underlying data source. - - .. note:: Experimental. - - >>> s = spark.readStream.option("x", 1) - """ - self._jreader = self._jreader.option(key, to_str(value)) - return self - - @since(2.0) - def options(self, **options): - """Adds input options for the underlying data source. - - .. note:: Experimental. - - >>> s = spark.readStream.options(x="1", y=2) - """ - for k in options: - self._jreader = self._jreader.option(k, to_str(options[k])) - return self - - @since(2.0) - def load(self, path=None, format=None, schema=None, **options): - """Loads a data stream from a data source and returns it as a :class`DataFrame`. - - .. note:: Experimental. - - :param path: optional string for file-system backed data sources. - :param format: optional string for format of the data source. Default to 'parquet'. - :param schema: optional :class:`StructType` for the input schema. - :param options: all other string options - - >>> json_sdf = spark.readStream.format("json")\ - .schema(sdf_schema)\ - .load(os.path.join(tempfile.mkdtemp(),'data')) - >>> json_sdf.isStreaming - True - >>> json_sdf.schema == sdf_schema - True - """ - if format is not None: - self.format(format) - if schema is not None: - self.schema(schema) - self.options(**options) - if path is not None: - if type(path) != str or len(path.strip()) == 0: - raise ValueError("If the path is provided for stream, it needs to be a " + - "non-empty string. List of paths are not supported.") - return self._df(self._jreader.load(path)) - else: - return self._df(self._jreader.load()) - - @since(2.0) - def json(self, path, schema=None, primitivesAsString=None, prefersDecimal=None, - allowComments=None, allowUnquotedFieldNames=None, allowSingleQuotes=None, - allowNumericLeadingZero=None, allowBackslashEscapingAnyCharacter=None, - mode=None, columnNameOfCorruptRecord=None): - """ - Loads a JSON file stream (one object per line) and returns a :class`DataFrame`. - - If the ``schema`` parameter is not specified, this function goes - through the input once to determine the input schema. - - .. note:: Experimental. - - :param path: string represents path to the JSON dataset, - or RDD of Strings storing JSON objects. - :param schema: an optional :class:`StructType` for the input schema. - :param primitivesAsString: infers all primitive values as a string type. If None is set, - it uses the default value, ``false``. - :param prefersDecimal: infers all floating-point values as a decimal type. If the values - do not fit in decimal, then it infers them as doubles. If None is - set, it uses the default value, ``false``. - :param allowComments: ignores Java/C++ style comment in JSON records. If None is set, - it uses the default value, ``false``. - :param allowUnquotedFieldNames: allows unquoted JSON field names. If None is set, - it uses the default value, ``false``. - :param allowSingleQuotes: allows single quotes in addition to double quotes. If None is - set, it uses the default value, ``true``. - :param allowNumericLeadingZero: allows leading zeros in numbers (e.g. 00012). If None is - set, it uses the default value, ``false``. - :param allowBackslashEscapingAnyCharacter: allows accepting quoting of all character - using backslash quoting mechanism. If None is - set, it uses the default value, ``false``. - :param mode: allows a mode for dealing with corrupt records during parsing. If None is - set, it uses the default value, ``PERMISSIVE``. - - * ``PERMISSIVE`` : sets other fields to ``null`` when it meets a corrupted \ - record and puts the malformed string into a new field configured by \ - ``columnNameOfCorruptRecord``. When a schema is set by user, it sets \ - ``null`` for extra fields. - * ``DROPMALFORMED`` : ignores the whole corrupted records. - * ``FAILFAST`` : throws an exception when it meets corrupted records. - - :param columnNameOfCorruptRecord: allows renaming the new field having malformed string - created by ``PERMISSIVE`` mode. This overrides - ``spark.sql.columnNameOfCorruptRecord``. If None is set, - it uses the value specified in - ``spark.sql.columnNameOfCorruptRecord``. - - >>> json_sdf = spark.readStream.json(os.path.join(tempfile.mkdtemp(), 'data'), \ - schema = sdf_schema) - >>> json_sdf.isStreaming - True - >>> json_sdf.schema == sdf_schema - True - """ - self._set_opts( - schema=schema, primitivesAsString=primitivesAsString, prefersDecimal=prefersDecimal, - allowComments=allowComments, allowUnquotedFieldNames=allowUnquotedFieldNames, - allowSingleQuotes=allowSingleQuotes, allowNumericLeadingZero=allowNumericLeadingZero, - allowBackslashEscapingAnyCharacter=allowBackslashEscapingAnyCharacter, - mode=mode, columnNameOfCorruptRecord=columnNameOfCorruptRecord) - if isinstance(path, basestring): - return self._df(self._jreader.json(path)) - else: - raise TypeError("path can be only a single string") - - @since(2.0) - def parquet(self, path): - """Loads a Parquet file stream, returning the result as a :class:`DataFrame`. - - You can set the following Parquet-specific option(s) for reading Parquet files: - * ``mergeSchema``: sets whether we should merge schemas collected from all \ - Parquet part-files. This will override ``spark.sql.parquet.mergeSchema``. \ - The default value is specified in ``spark.sql.parquet.mergeSchema``. - - .. note:: Experimental. - - >>> parquet_sdf = spark.readStream.schema(sdf_schema)\ - .parquet(os.path.join(tempfile.mkdtemp())) - >>> parquet_sdf.isStreaming - True - >>> parquet_sdf.schema == sdf_schema - True - """ - if isinstance(path, basestring): - return self._df(self._jreader.parquet(path)) - else: - raise TypeError("path can be only a single string") - - @ignore_unicode_prefix - @since(2.0) - def text(self, path): - """ - Loads a text file stream and returns a :class:`DataFrame` whose schema starts with a - string column named "value", and followed by partitioned columns if there - are any. - - Each line in the text file is a new row in the resulting DataFrame. - - .. note:: Experimental. - - :param paths: string, or list of strings, for input path(s). - - >>> text_sdf = spark.readStream.text(os.path.join(tempfile.mkdtemp(), 'data')) - >>> text_sdf.isStreaming - True - >>> "value" in str(text_sdf.schema) - True - """ - if isinstance(path, basestring): - return self._df(self._jreader.text(path)) - else: - raise TypeError("path can be only a single string") - - @since(2.0) - def csv(self, path, schema=None, sep=None, encoding=None, quote=None, escape=None, - comment=None, header=None, inferSchema=None, ignoreLeadingWhiteSpace=None, - ignoreTrailingWhiteSpace=None, nullValue=None, nanValue=None, positiveInf=None, - negativeInf=None, dateFormat=None, maxColumns=None, maxCharsPerColumn=None, - maxMalformedLogPerPartition=None, mode=None): - """Loads a CSV file stream and returns the result as a :class:`DataFrame`. - - This function will go through the input once to determine the input schema if - ``inferSchema`` is enabled. To avoid going through the entire data once, disable - ``inferSchema`` option or specify the schema explicitly using ``schema``. - - .. note:: Experimental. - - :param path: string, or list of strings, for input path(s). - :param schema: an optional :class:`StructType` for the input schema. - :param sep: sets the single character as a separator for each field and value. - If None is set, it uses the default value, ``,``. - :param encoding: decodes the CSV files by the given encoding type. If None is set, - it uses the default value, ``UTF-8``. - :param quote: sets the single character used for escaping quoted values where the - separator can be part of the value. If None is set, it uses the default - value, ``"``. If you would like to turn off quotations, you need to set an - empty string. - :param escape: sets the single character used for escaping quotes inside an already - quoted value. If None is set, it uses the default value, ``\``. - :param comment: sets the single character used for skipping lines beginning with this - character. By default (None), it is disabled. - :param header: uses the first line as names of columns. If None is set, it uses the - default value, ``false``. - :param inferSchema: infers the input schema automatically from data. It requires one extra - pass over the data. If None is set, it uses the default value, ``false``. - :param ignoreLeadingWhiteSpace: defines whether or not leading whitespaces from values - being read should be skipped. If None is set, it uses - the default value, ``false``. - :param ignoreTrailingWhiteSpace: defines whether or not trailing whitespaces from values - being read should be skipped. If None is set, it uses - the default value, ``false``. - :param nullValue: sets the string representation of a null value. If None is set, it uses - the default value, empty string. - :param nanValue: sets the string representation of a non-number value. If None is set, it - uses the default value, ``NaN``. - :param positiveInf: sets the string representation of a positive infinity value. If None - is set, it uses the default value, ``Inf``. - :param negativeInf: sets the string representation of a negative infinity value. If None - is set, it uses the default value, ``Inf``. - :param dateFormat: sets the string that indicates a date format. Custom date formats - follow the formats at ``java.text.SimpleDateFormat``. This - applies to both date type and timestamp type. By default, it is None - which means trying to parse times and date by - ``java.sql.Timestamp.valueOf()`` and ``java.sql.Date.valueOf()``. - :param maxColumns: defines a hard limit of how many columns a record can have. If None is - set, it uses the default value, ``20480``. - :param maxCharsPerColumn: defines the maximum number of characters allowed for any given - value being read. If None is set, it uses the default value, - ``1000000``. - :param mode: allows a mode for dealing with corrupt records during parsing. If None is - set, it uses the default value, ``PERMISSIVE``. - - * ``PERMISSIVE`` : sets other fields to ``null`` when it meets a corrupted record. - When a schema is set by user, it sets ``null`` for extra fields. - * ``DROPMALFORMED`` : ignores the whole corrupted records. - * ``FAILFAST`` : throws an exception when it meets corrupted records. - - >>> csv_sdf = spark.readStream.csv(os.path.join(tempfile.mkdtemp(), 'data'), \ - schema = sdf_schema) - >>> csv_sdf.isStreaming - True - >>> csv_sdf.schema == sdf_schema - True - """ - self._set_opts( - schema=schema, sep=sep, encoding=encoding, quote=quote, escape=escape, comment=comment, - header=header, inferSchema=inferSchema, ignoreLeadingWhiteSpace=ignoreLeadingWhiteSpace, - ignoreTrailingWhiteSpace=ignoreTrailingWhiteSpace, nullValue=nullValue, - nanValue=nanValue, positiveInf=positiveInf, negativeInf=negativeInf, - dateFormat=dateFormat, maxColumns=maxColumns, maxCharsPerColumn=maxCharsPerColumn, - maxMalformedLogPerPartition=maxMalformedLogPerPartition, mode=mode) - if isinstance(path, basestring): - return self._df(self._jreader.csv(path)) - else: - raise TypeError("path can be only a single string") - - -class DataStreamWriter(object): - """ - Interface used to write a streaming :class:`DataFrame` to external storage systems - (e.g. file systems, key-value stores, etc). Use :func:`DataFrame.writeStream` - to access this. - - .. note:: Experimental. - - .. versionadded:: 2.0 - """ - - def __init__(self, df): - self._df = df - self._spark = df.sql_ctx - self._jwrite = df._jdf.writeStream() - - def _sq(self, jsq): - from pyspark.sql.streaming import StreamingQuery - return StreamingQuery(jsq) - - @since(2.0) - def outputMode(self, outputMode): - """Specifies how data of a streaming DataFrame/Dataset is written to a streaming sink. - - Options include: - - * `append`:Only the new rows in the streaming DataFrame/Dataset will be written to - the sink - * `complete`:All the rows in the streaming DataFrame/Dataset will be written to the sink - every time these is some updates - - .. note:: Experimental. - - >>> writer = sdf.writeStream.outputMode('append') - """ - if not outputMode or type(outputMode) != str or len(outputMode.strip()) == 0: - raise ValueError('The output mode must be a non-empty string. Got: %s' % outputMode) - self._jwrite = self._jwrite.outputMode(outputMode) - return self - - @since(2.0) - def format(self, source): - """Specifies the underlying output data source. - - .. note:: Experimental. - - :param source: string, name of the data source, e.g. 'json', 'parquet'. - - >>> writer = sdf.writeStream.format('json') - """ - self._jwrite = self._jwrite.format(source) - return self - - @since(2.0) - def option(self, key, value): - """Adds an output option for the underlying data source. - - .. note:: Experimental. - """ - self._jwrite = self._jwrite.option(key, to_str(value)) - return self - - @since(2.0) - def options(self, **options): - """Adds output options for the underlying data source. - - .. note:: Experimental. - """ - for k in options: - self._jwrite = self._jwrite.option(k, to_str(options[k])) - return self - - @since(2.0) - def partitionBy(self, *cols): - """Partitions the output by the given columns on the file system. - - If specified, the output is laid out on the file system similar - to Hive's partitioning scheme. - - .. note:: Experimental. - - :param cols: name of columns - - """ - if len(cols) == 1 and isinstance(cols[0], (list, tuple)): - cols = cols[0] - self._jwrite = self._jwrite.partitionBy(_to_seq(self._spark._sc, cols)) - return self - - @since(2.0) - def queryName(self, queryName): - """Specifies the name of the :class:`StreamingQuery` that can be started with - :func:`start`. This name must be unique among all the currently active queries - in the associated SparkSession. - - .. note:: Experimental. - - :param queryName: unique name for the query - - >>> writer = sdf.writeStream.queryName('streaming_query') - """ - if not queryName or type(queryName) != str or len(queryName.strip()) == 0: - raise ValueError('The queryName must be a non-empty string. Got: %s' % queryName) - self._jwrite = self._jwrite.queryName(queryName) - return self - - @keyword_only - @since(2.0) - def trigger(self, processingTime=None): - """Set the trigger for the stream query. If this is not set it will run the query as fast - as possible, which is equivalent to setting the trigger to ``processingTime='0 seconds'``. - - .. note:: Experimental. - - :param processingTime: a processing time interval as a string, e.g. '5 seconds', '1 minute'. - - >>> # trigger the query for execution every 5 seconds - >>> writer = sdf.writeStream.trigger(processingTime='5 seconds') - """ - from pyspark.sql.streaming import ProcessingTime - trigger = None - if processingTime is not None: - if type(processingTime) != str or len(processingTime.strip()) == 0: - raise ValueError('The processing time must be a non empty string. Got: %s' % - processingTime) - trigger = ProcessingTime(processingTime) - if trigger is None: - raise ValueError('A trigger was not provided. Supported triggers: processingTime.') - self._jwrite = self._jwrite.trigger(trigger._to_java_trigger(self._spark)) - return self - - @ignore_unicode_prefix - @since(2.0) - def start(self, path=None, format=None, partitionBy=None, queryName=None, **options): - """Streams the contents of the :class:`DataFrame` to a data source. - - The data source is specified by the ``format`` and a set of ``options``. - If ``format`` is not specified, the default data source configured by - ``spark.sql.sources.default`` will be used. - - .. note:: Experimental. - - :param path: the path in a Hadoop supported file system - :param format: the format used to save - - * ``append``: Append contents of this :class:`DataFrame` to existing data. - * ``overwrite``: Overwrite existing data. - * ``ignore``: Silently ignore this operation if data already exists. - * ``error`` (default case): Throw an exception if data already exists. - :param partitionBy: names of partitioning columns - :param queryName: unique name for the query - :param options: All other string options. You may want to provide a `checkpointLocation` - for most streams, however it is not required for a `memory` stream. - - >>> sq = sdf.writeStream.format('memory').queryName('this_query').start() - >>> sq.isActive - True - >>> sq.name - u'this_query' - >>> sq.stop() - >>> sq.isActive - False - >>> sq = sdf.writeStream.trigger(processingTime='5 seconds').start( - ... queryName='that_query', format='memory') - >>> sq.name - u'that_query' - >>> sq.isActive - True - >>> sq.stop() - """ - self.options(**options) - if partitionBy is not None: - self.partitionBy(partitionBy) - if format is not None: - self.format(format) - if queryName is not None: - self.queryName(queryName) - if path is None: - return self._sq(self._jwrite.start()) - else: - return self._sq(self._jwrite.start(path)) - - def _test(): import doctest import os @@ -1235,9 +747,6 @@ def _test(): globs['sc'] = sc globs['spark'] = spark globs['df'] = spark.read.parquet('python/test_support/sql/parquet_partitioned') - globs['sdf'] = \ - spark.readStream.format('text').load('python/test_support/sql/streaming') - globs['sdf_schema'] = StructType([StructField("data", StringType(), False)]) (failure_count, test_count) = doctest.testmod( pyspark.sql.readwriter, globs=globs, optionflags=doctest.ELLIPSIS | doctest.NORMALIZE_WHITESPACE | doctest.REPORT_NDIFF) http://git-wip-us.apache.org/repos/asf/spark/blob/6650c053/python/pyspark/sql/session.py ---------------------------------------------------------------------- diff --git a/python/pyspark/sql/session.py b/python/pyspark/sql/session.py index b4152a3..55f86a1 100644 --- a/python/pyspark/sql/session.py +++ b/python/pyspark/sql/session.py @@ -31,7 +31,8 @@ from pyspark.rdd import RDD, ignore_unicode_prefix from pyspark.sql.catalog import Catalog from pyspark.sql.conf import RuntimeConfig from pyspark.sql.dataframe import DataFrame -from pyspark.sql.readwriter import DataFrameReader, DataStreamReader +from pyspark.sql.readwriter import DataFrameReader +from pyspark.sql.streaming import DataStreamReader from pyspark.sql.types import Row, DataType, StringType, StructType, _verify_type, \ _infer_schema, _has_nulltype, _merge_type, _create_converter, _parse_datatype_string from pyspark.sql.utils import install_exception_handler http://git-wip-us.apache.org/repos/asf/spark/blob/6650c053/python/pyspark/sql/streaming.py ---------------------------------------------------------------------- diff --git a/python/pyspark/sql/streaming.py b/python/pyspark/sql/streaming.py index ae45c99..8cf7098 100644 --- a/python/pyspark/sql/streaming.py +++ b/python/pyspark/sql/streaming.py @@ -18,15 +18,18 @@ import sys if sys.version >= '3': intlike = int + basestring = unicode = str else: intlike = (int, long) from abc import ABCMeta, abstractmethod -from pyspark import since +from pyspark import since, keyword_only from pyspark.rdd import ignore_unicode_prefix +from pyspark.sql.readwriter import OptionUtils, to_str +from pyspark.sql.types import * -__all__ = ["StreamingQuery"] +__all__ = ["StreamingQuery", "StreamingQueryManager", "DataStreamReader", "DataStreamWriter"] class StreamingQuery(object): @@ -118,7 +121,7 @@ class StreamingQueryManager(object): def active(self): """Returns a list of active queries associated with this SQLContext - >>> sq = df.writeStream.format('memory').queryName('this_query').start() + >>> sq = sdf.writeStream.format('memory').queryName('this_query').start() >>> sqm = spark.streams >>> # get the list of active streaming queries >>> [q.name for q in sqm.active] @@ -133,7 +136,7 @@ class StreamingQueryManager(object): """Returns an active query from this SQLContext or throws exception if an active query with this name doesn't exist. - >>> sq = df.writeStream.format('memory').queryName('this_query').start() + >>> sq = sdf.writeStream.format('memory').queryName('this_query').start() >>> sq.name u'this_query' >>> sq = spark.streams.get(sq.id) @@ -224,6 +227,494 @@ class ProcessingTime(Trigger): self.interval) +class DataStreamReader(OptionUtils): + """ + Interface used to load a streaming :class:`DataFrame` from external storage systems + (e.g. file systems, key-value stores, etc). Use :func:`spark.readStream` + to access this. + + .. note:: Experimental. + + .. versionadded:: 2.0 + """ + + def __init__(self, spark): + self._jreader = spark._ssql_ctx.readStream() + self._spark = spark + + def _df(self, jdf): + from pyspark.sql.dataframe import DataFrame + return DataFrame(jdf, self._spark) + + @since(2.0) + def format(self, source): + """Specifies the input data source format. + + .. note:: Experimental. + + :param source: string, name of the data source, e.g. 'json', 'parquet'. + + >>> s = spark.readStream.format("text") + """ + self._jreader = self._jreader.format(source) + return self + + @since(2.0) + def schema(self, schema): + """Specifies the input schema. + + Some data sources (e.g. JSON) can infer the input schema automatically from data. + By specifying the schema here, the underlying data source can skip the schema + inference step, and thus speed up data loading. + + .. note:: Experimental. + + :param schema: a StructType object + + >>> s = spark.readStream.schema(sdf_schema) + """ + if not isinstance(schema, StructType): + raise TypeError("schema should be StructType") + jschema = self._spark._ssql_ctx.parseDataType(schema.json()) + self._jreader = self._jreader.schema(jschema) + return self + + @since(2.0) + def option(self, key, value): + """Adds an input option for the underlying data source. + + .. note:: Experimental. + + >>> s = spark.readStream.option("x", 1) + """ + self._jreader = self._jreader.option(key, to_str(value)) + return self + + @since(2.0) + def options(self, **options): + """Adds input options for the underlying data source. + + .. note:: Experimental. + + >>> s = spark.readStream.options(x="1", y=2) + """ + for k in options: + self._jreader = self._jreader.option(k, to_str(options[k])) + return self + + @since(2.0) + def load(self, path=None, format=None, schema=None, **options): + """Loads a data stream from a data source and returns it as a :class`DataFrame`. + + .. note:: Experimental. + + :param path: optional string for file-system backed data sources. + :param format: optional string for format of the data source. Default to 'parquet'. + :param schema: optional :class:`StructType` for the input schema. + :param options: all other string options + + >>> json_sdf = spark.readStream.format("json")\ + .schema(sdf_schema)\ + .load(os.path.join(tempfile.mkdtemp(),'data')) + >>> json_sdf.isStreaming + True + >>> json_sdf.schema == sdf_schema + True + """ + if format is not None: + self.format(format) + if schema is not None: + self.schema(schema) + self.options(**options) + if path is not None: + if type(path) != str or len(path.strip()) == 0: + raise ValueError("If the path is provided for stream, it needs to be a " + + "non-empty string. List of paths are not supported.") + return self._df(self._jreader.load(path)) + else: + return self._df(self._jreader.load()) + + @since(2.0) + def json(self, path, schema=None, primitivesAsString=None, prefersDecimal=None, + allowComments=None, allowUnquotedFieldNames=None, allowSingleQuotes=None, + allowNumericLeadingZero=None, allowBackslashEscapingAnyCharacter=None, + mode=None, columnNameOfCorruptRecord=None): + """ + Loads a JSON file stream (one object per line) and returns a :class`DataFrame`. + + If the ``schema`` parameter is not specified, this function goes + through the input once to determine the input schema. + + .. note:: Experimental. + + :param path: string represents path to the JSON dataset, + or RDD of Strings storing JSON objects. + :param schema: an optional :class:`StructType` for the input schema. + :param primitivesAsString: infers all primitive values as a string type. If None is set, + it uses the default value, ``false``. + :param prefersDecimal: infers all floating-point values as a decimal type. If the values + do not fit in decimal, then it infers them as doubles. If None is + set, it uses the default value, ``false``. + :param allowComments: ignores Java/C++ style comment in JSON records. If None is set, + it uses the default value, ``false``. + :param allowUnquotedFieldNames: allows unquoted JSON field names. If None is set, + it uses the default value, ``false``. + :param allowSingleQuotes: allows single quotes in addition to double quotes. If None is + set, it uses the default value, ``true``. + :param allowNumericLeadingZero: allows leading zeros in numbers (e.g. 00012). If None is + set, it uses the default value, ``false``. + :param allowBackslashEscapingAnyCharacter: allows accepting quoting of all character + using backslash quoting mechanism. If None is + set, it uses the default value, ``false``. + :param mode: allows a mode for dealing with corrupt records during parsing. If None is + set, it uses the default value, ``PERMISSIVE``. + + * ``PERMISSIVE`` : sets other fields to ``null`` when it meets a corrupted \ + record and puts the malformed string into a new field configured by \ + ``columnNameOfCorruptRecord``. When a schema is set by user, it sets \ + ``null`` for extra fields. + * ``DROPMALFORMED`` : ignores the whole corrupted records. + * ``FAILFAST`` : throws an exception when it meets corrupted records. + + :param columnNameOfCorruptRecord: allows renaming the new field having malformed string + created by ``PERMISSIVE`` mode. This overrides + ``spark.sql.columnNameOfCorruptRecord``. If None is set, + it uses the value specified in + ``spark.sql.columnNameOfCorruptRecord``. + + >>> json_sdf = spark.readStream.json(os.path.join(tempfile.mkdtemp(), 'data'), \ + schema = sdf_schema) + >>> json_sdf.isStreaming + True + >>> json_sdf.schema == sdf_schema + True + """ + self._set_opts( + schema=schema, primitivesAsString=primitivesAsString, prefersDecimal=prefersDecimal, + allowComments=allowComments, allowUnquotedFieldNames=allowUnquotedFieldNames, + allowSingleQuotes=allowSingleQuotes, allowNumericLeadingZero=allowNumericLeadingZero, + allowBackslashEscapingAnyCharacter=allowBackslashEscapingAnyCharacter, + mode=mode, columnNameOfCorruptRecord=columnNameOfCorruptRecord) + if isinstance(path, basestring): + return self._df(self._jreader.json(path)) + else: + raise TypeError("path can be only a single string") + + @since(2.0) + def parquet(self, path): + """Loads a Parquet file stream, returning the result as a :class:`DataFrame`. + + You can set the following Parquet-specific option(s) for reading Parquet files: + * ``mergeSchema``: sets whether we should merge schemas collected from all \ + Parquet part-files. This will override ``spark.sql.parquet.mergeSchema``. \ + The default value is specified in ``spark.sql.parquet.mergeSchema``. + + .. note:: Experimental. + + >>> parquet_sdf = spark.readStream.schema(sdf_schema)\ + .parquet(os.path.join(tempfile.mkdtemp())) + >>> parquet_sdf.isStreaming + True + >>> parquet_sdf.schema == sdf_schema + True + """ + if isinstance(path, basestring): + return self._df(self._jreader.parquet(path)) + else: + raise TypeError("path can be only a single string") + + @ignore_unicode_prefix + @since(2.0) + def text(self, path): + """ + Loads a text file stream and returns a :class:`DataFrame` whose schema starts with a + string column named "value", and followed by partitioned columns if there + are any. + + Each line in the text file is a new row in the resulting DataFrame. + + .. note:: Experimental. + + :param paths: string, or list of strings, for input path(s). + + >>> text_sdf = spark.readStream.text(os.path.join(tempfile.mkdtemp(), 'data')) + >>> text_sdf.isStreaming + True + >>> "value" in str(text_sdf.schema) + True + """ + if isinstance(path, basestring): + return self._df(self._jreader.text(path)) + else: + raise TypeError("path can be only a single string") + + @since(2.0) + def csv(self, path, schema=None, sep=None, encoding=None, quote=None, escape=None, + comment=None, header=None, inferSchema=None, ignoreLeadingWhiteSpace=None, + ignoreTrailingWhiteSpace=None, nullValue=None, nanValue=None, positiveInf=None, + negativeInf=None, dateFormat=None, maxColumns=None, maxCharsPerColumn=None, + maxMalformedLogPerPartition=None, mode=None): + """Loads a CSV file stream and returns the result as a :class:`DataFrame`. + + This function will go through the input once to determine the input schema if + ``inferSchema`` is enabled. To avoid going through the entire data once, disable + ``inferSchema`` option or specify the schema explicitly using ``schema``. + + .. note:: Experimental. + + :param path: string, or list of strings, for input path(s). + :param schema: an optional :class:`StructType` for the input schema. + :param sep: sets the single character as a separator for each field and value. + If None is set, it uses the default value, ``,``. + :param encoding: decodes the CSV files by the given encoding type. If None is set, + it uses the default value, ``UTF-8``. + :param quote: sets the single character used for escaping quoted values where the + separator can be part of the value. If None is set, it uses the default + value, ``"``. If you would like to turn off quotations, you need to set an + empty string. + :param escape: sets the single character used for escaping quotes inside an already + quoted value. If None is set, it uses the default value, ``\``. + :param comment: sets the single character used for skipping lines beginning with this + character. By default (None), it is disabled. + :param header: uses the first line as names of columns. If None is set, it uses the + default value, ``false``. + :param inferSchema: infers the input schema automatically from data. It requires one extra + pass over the data. If None is set, it uses the default value, ``false``. + :param ignoreLeadingWhiteSpace: defines whether or not leading whitespaces from values + being read should be skipped. If None is set, it uses + the default value, ``false``. + :param ignoreTrailingWhiteSpace: defines whether or not trailing whitespaces from values + being read should be skipped. If None is set, it uses + the default value, ``false``. + :param nullValue: sets the string representation of a null value. If None is set, it uses + the default value, empty string. + :param nanValue: sets the string representation of a non-number value. If None is set, it + uses the default value, ``NaN``. + :param positiveInf: sets the string representation of a positive infinity value. If None + is set, it uses the default value, ``Inf``. + :param negativeInf: sets the string representation of a negative infinity value. If None + is set, it uses the default value, ``Inf``. + :param dateFormat: sets the string that indicates a date format. Custom date formats + follow the formats at ``java.text.SimpleDateFormat``. This + applies to both date type and timestamp type. By default, it is None + which means trying to parse times and date by + ``java.sql.Timestamp.valueOf()`` and ``java.sql.Date.valueOf()``. + :param maxColumns: defines a hard limit of how many columns a record can have. If None is + set, it uses the default value, ``20480``. + :param maxCharsPerColumn: defines the maximum number of characters allowed for any given + value being read. If None is set, it uses the default value, + ``1000000``. + :param mode: allows a mode for dealing with corrupt records during parsing. If None is + set, it uses the default value, ``PERMISSIVE``. + + * ``PERMISSIVE`` : sets other fields to ``null`` when it meets a corrupted record. + When a schema is set by user, it sets ``null`` for extra fields. + * ``DROPMALFORMED`` : ignores the whole corrupted records. + * ``FAILFAST`` : throws an exception when it meets corrupted records. + + >>> csv_sdf = spark.readStream.csv(os.path.join(tempfile.mkdtemp(), 'data'), \ + schema = sdf_schema) + >>> csv_sdf.isStreaming + True + >>> csv_sdf.schema == sdf_schema + True + """ + self._set_opts( + schema=schema, sep=sep, encoding=encoding, quote=quote, escape=escape, comment=comment, + header=header, inferSchema=inferSchema, ignoreLeadingWhiteSpace=ignoreLeadingWhiteSpace, + ignoreTrailingWhiteSpace=ignoreTrailingWhiteSpace, nullValue=nullValue, + nanValue=nanValue, positiveInf=positiveInf, negativeInf=negativeInf, + dateFormat=dateFormat, maxColumns=maxColumns, maxCharsPerColumn=maxCharsPerColumn, + maxMalformedLogPerPartition=maxMalformedLogPerPartition, mode=mode) + if isinstance(path, basestring): + return self._df(self._jreader.csv(path)) + else: + raise TypeError("path can be only a single string") + + +class DataStreamWriter(object): + """ + Interface used to write a streaming :class:`DataFrame` to external storage systems + (e.g. file systems, key-value stores, etc). Use :func:`DataFrame.writeStream` + to access this. + + .. note:: Experimental. + + .. versionadded:: 2.0 + """ + + def __init__(self, df): + self._df = df + self._spark = df.sql_ctx + self._jwrite = df._jdf.writeStream() + + def _sq(self, jsq): + from pyspark.sql.streaming import StreamingQuery + return StreamingQuery(jsq) + + @since(2.0) + def outputMode(self, outputMode): + """Specifies how data of a streaming DataFrame/Dataset is written to a streaming sink. + + Options include: + + * `append`:Only the new rows in the streaming DataFrame/Dataset will be written to + the sink + * `complete`:All the rows in the streaming DataFrame/Dataset will be written to the sink + every time these is some updates + + .. note:: Experimental. + + >>> writer = sdf.writeStream.outputMode('append') + """ + if not outputMode or type(outputMode) != str or len(outputMode.strip()) == 0: + raise ValueError('The output mode must be a non-empty string. Got: %s' % outputMode) + self._jwrite = self._jwrite.outputMode(outputMode) + return self + + @since(2.0) + def format(self, source): + """Specifies the underlying output data source. + + .. note:: Experimental. + + :param source: string, name of the data source, e.g. 'json', 'parquet'. + + >>> writer = sdf.writeStream.format('json') + """ + self._jwrite = self._jwrite.format(source) + return self + + @since(2.0) + def option(self, key, value): + """Adds an output option for the underlying data source. + + .. note:: Experimental. + """ + self._jwrite = self._jwrite.option(key, to_str(value)) + return self + + @since(2.0) + def options(self, **options): + """Adds output options for the underlying data source. + + .. note:: Experimental. + """ + for k in options: + self._jwrite = self._jwrite.option(k, to_str(options[k])) + return self + + @since(2.0) + def partitionBy(self, *cols): + """Partitions the output by the given columns on the file system. + + If specified, the output is laid out on the file system similar + to Hive's partitioning scheme. + + .. note:: Experimental. + + :param cols: name of columns + + """ + if len(cols) == 1 and isinstance(cols[0], (list, tuple)): + cols = cols[0] + self._jwrite = self._jwrite.partitionBy(_to_seq(self._spark._sc, cols)) + return self + + @since(2.0) + def queryName(self, queryName): + """Specifies the name of the :class:`StreamingQuery` that can be started with + :func:`start`. This name must be unique among all the currently active queries + in the associated SparkSession. + + .. note:: Experimental. + + :param queryName: unique name for the query + + >>> writer = sdf.writeStream.queryName('streaming_query') + """ + if not queryName or type(queryName) != str or len(queryName.strip()) == 0: + raise ValueError('The queryName must be a non-empty string. Got: %s' % queryName) + self._jwrite = self._jwrite.queryName(queryName) + return self + + @keyword_only + @since(2.0) + def trigger(self, processingTime=None): + """Set the trigger for the stream query. If this is not set it will run the query as fast + as possible, which is equivalent to setting the trigger to ``processingTime='0 seconds'``. + + .. note:: Experimental. + + :param processingTime: a processing time interval as a string, e.g. '5 seconds', '1 minute'. + + >>> # trigger the query for execution every 5 seconds + >>> writer = sdf.writeStream.trigger(processingTime='5 seconds') + """ + from pyspark.sql.streaming import ProcessingTime + trigger = None + if processingTime is not None: + if type(processingTime) != str or len(processingTime.strip()) == 0: + raise ValueError('The processing time must be a non empty string. Got: %s' % + processingTime) + trigger = ProcessingTime(processingTime) + if trigger is None: + raise ValueError('A trigger was not provided. Supported triggers: processingTime.') + self._jwrite = self._jwrite.trigger(trigger._to_java_trigger(self._spark)) + return self + + @ignore_unicode_prefix + @since(2.0) + def start(self, path=None, format=None, partitionBy=None, queryName=None, **options): + """Streams the contents of the :class:`DataFrame` to a data source. + + The data source is specified by the ``format`` and a set of ``options``. + If ``format`` is not specified, the default data source configured by + ``spark.sql.sources.default`` will be used. + + .. note:: Experimental. + + :param path: the path in a Hadoop supported file system + :param format: the format used to save + + * ``append``: Append contents of this :class:`DataFrame` to existing data. + * ``overwrite``: Overwrite existing data. + * ``ignore``: Silently ignore this operation if data already exists. + * ``error`` (default case): Throw an exception if data already exists. + :param partitionBy: names of partitioning columns + :param queryName: unique name for the query + :param options: All other string options. You may want to provide a `checkpointLocation` + for most streams, however it is not required for a `memory` stream. + + >>> sq = sdf.writeStream.format('memory').queryName('this_query').start() + >>> sq.isActive + True + >>> sq.name + u'this_query' + >>> sq.stop() + >>> sq.isActive + False + >>> sq = sdf.writeStream.trigger(processingTime='5 seconds').start( + ... queryName='that_query', format='memory') + >>> sq.name + u'that_query' + >>> sq.isActive + True + >>> sq.stop() + """ + self.options(**options) + if partitionBy is not None: + self.partitionBy(partitionBy) + if format is not None: + self.format(format) + if queryName is not None: + self.queryName(queryName) + if path is None: + return self._sq(self._jwrite.start()) + else: + return self._sq(self._jwrite.start(path)) + + def _test(): import doctest import os @@ -243,6 +734,9 @@ def _test(): globs['os'] = os globs['spark'] = spark globs['sqlContext'] = SQLContext.getOrCreate(spark.sparkContext) + globs['sdf'] = \ + spark.readStream.format('text').load('python/test_support/sql/streaming') + globs['sdf_schema'] = StructType([StructField("data", StringType(), False)]) globs['df'] = \ globs['spark'].readStream.format('text').load('python/test_support/sql/streaming') --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org