Github user tdas commented on a diff in the pull request:
https://github.com/apache/spark/pull/13653#discussion_r67080464
--- Diff: python/pyspark/sql/readwriter.py ---
@@ -905,6 +764,503 @@ def jdbc(self, url, table, mode=None,
properties=None):
self._jwrite.mode(mode).jdbc(url, table, jprop)
+class DataStreamReader(object):
+ """
+ Interface used to load a streaming :class:`DataFrame` from external
storage systems
+ (e.g. file systems, key-value stores, etc). Use
:func:`spark.readStream`
+ to access this.
+
+ .. note:: Experimental.
+
+ .. versionadded:: 2.0
+ """
+
+ def __init__(self, spark):
+ self._jreader = spark._ssql_ctx.readStream()
+ self._spark = spark
+
+ def _df(self, jdf):
+ from pyspark.sql.dataframe import DataFrame
+ return DataFrame(jdf, self._spark)
+
+ @since(2.0)
+ def format(self, source):
+ """Specifies the input data source format.
+
+ .. note:: Experimental.
+
+ :param source: string, name of the data source, e.g. 'json',
'parquet'.
+
+ """
+ self._jreader = self._jreader.format(source)
+ return self
+
+ @since(2.0)
+ def schema(self, schema):
+ """Specifies the input schema.
+
+ Some data sources (e.g. JSON) can infer the input schema
automatically from data.
+ By specifying the schema here, the underlying data source can skip
the schema
+ inference step, and thus speed up data loading.
+
+ .. note:: Experimental.
+
+ :param schema: a StructType object
+ """
+ if not isinstance(schema, StructType):
+ raise TypeError("schema should be StructType")
+ jschema = self._spark._ssql_ctx.parseDataType(schema.json())
+ self._jreader = self._jreader.schema(jschema)
+ return self
+
+ @since(2.0)
+ def option(self, key, value):
+ """Adds an input option for the underlying data source.
+
+ .. note:: Experimental.
+ """
+ self._jreader = self._jreader.option(key, to_str(value))
+ return self
+
+ @since(2.0)
+ def options(self, **options):
+ """Adds input options for the underlying data source.
+
+ .. note:: Experimental.
+ """
+ for k in options:
+ self._jreader = self._jreader.option(k, to_str(options[k]))
+ return self
+
+ @since(2.0)
+ def load(self, path=None, format=None, schema=None, **options):
+ """Loads a data stream from a data source and returns it as a
:class`DataFrame`.
+
+ .. note:: Experimental.
+
+ :param path: optional string for file-system backed data sources.
+ :param format: optional string for format of the data source.
Default to 'parquet'.
+ :param schema: optional :class:`StructType` for the input schema.
+ :param options: all other string options
+
+ """
+ if format is not None:
+ self.format(format)
+ if schema is not None:
+ self.schema(schema)
+ self.options(**options)
+ if path is not None:
+ if type(path) != str or len(path.strip()) == 0:
+ raise ValueError("If the path is provided for stream, it
needs to be a " +
+ "non-empty string. List of paths are not
supported.")
+ return self._df(self._jreader.load(path))
+ else:
+ return self._df(self._jreader.load())
+
+ @since(2.0)
+ def json(self, path, schema=None, primitivesAsString=None,
prefersDecimal=None,
+ allowComments=None, allowUnquotedFieldNames=None,
allowSingleQuotes=None,
+ allowNumericLeadingZero=None,
allowBackslashEscapingAnyCharacter=None,
+ mode=None, columnNameOfCorruptRecord=None):
+ """
+ Loads a JSON file stream (one object per line) and returns a
:class`DataFrame`.
+
+ If the ``schema`` parameter is not specified, this function goes
+ through the input once to determine the input schema.
+
+ .. note:: Experimental.
+
+ :param path: string represents path to the JSON dataset,
+ or RDD of Strings storing JSON objects.
+ :param schema: an optional :class:`StructType` for the input
schema.
+ :param primitivesAsString: infers all primitive values as a string
type. If None is set,
+ it uses the default value, ``false``.
+ :param prefersDecimal: infers all floating-point values as a
decimal type. If the values
+ do not fit in decimal, then it infers them
as doubles. If None is
+ set, it uses the default value, ``false``.
+ :param allowComments: ignores Java/C++ style comment in JSON
records. If None is set,
+ it uses the default value, ``false``.
+ :param allowUnquotedFieldNames: allows unquoted JSON field names.
If None is set,
+ it uses the default value,
``false``.
+ :param allowSingleQuotes: allows single quotes in addition to
double quotes. If None is
+ set, it uses the default value,
``true``.
+ :param allowNumericLeadingZero: allows leading zeros in numbers
(e.g. 00012). If None is
+ set, it uses the default value,
``false``.
+ :param allowBackslashEscapingAnyCharacter: allows accepting
quoting of all character
+ using backslash quoting
mechanism. If None is
+ set, it uses the
default value, ``false``.
+ :param mode: allows a mode for dealing with corrupt records during
parsing. If None is
+ set, it uses the default value, ``PERMISSIVE``.
+
+ * ``PERMISSIVE`` : sets other fields to ``null`` when it
meets a corrupted \
+ record and puts the malformed string into a new field
configured by \
+ ``columnNameOfCorruptRecord``. When a schema is set by
user, it sets \
+ ``null`` for extra fields.
+ * ``DROPMALFORMED`` : ignores the whole corrupted records.
+ * ``FAILFAST`` : throws an exception when it meets
corrupted records.
+
+ :param columnNameOfCorruptRecord: allows renaming the new field
having malformed string
+ created by ``PERMISSIVE`` mode.
This overrides
+
``spark.sql.columnNameOfCorruptRecord``. If None is set,
+ it uses the value specified in
+
``spark.sql.columnNameOfCorruptRecord``.
+
+ """
+ if schema is not None:
+ self.schema(schema)
+ if primitivesAsString is not None:
+ self.option("primitivesAsString", primitivesAsString)
+ if prefersDecimal is not None:
+ self.option("prefersDecimal", prefersDecimal)
+ if allowComments is not None:
+ self.option("allowComments", allowComments)
+ if allowUnquotedFieldNames is not None:
+ self.option("allowUnquotedFieldNames", allowUnquotedFieldNames)
+ if allowSingleQuotes is not None:
+ self.option("allowSingleQuotes", allowSingleQuotes)
+ if allowNumericLeadingZero is not None:
+ self.option("allowNumericLeadingZero", allowNumericLeadingZero)
+ if allowBackslashEscapingAnyCharacter is not None:
+ self.option("allowBackslashEscapingAnyCharacter",
allowBackslashEscapingAnyCharacter)
+ if mode is not None:
+ self.option("mode", mode)
+ if columnNameOfCorruptRecord is not None:
+ self.option("columnNameOfCorruptRecord",
columnNameOfCorruptRecord)
+ if isinstance(path, basestring):
+ path = [path]
+ return self._df(self._jreader.json(path))
+ else:
+ raise TypeError("path can be only a single string")
+
+ @since(2.0)
+ def parquet(self, path):
+ """Loads a Parquet file stream, returning the result as a
:class:`DataFrame`.
+
+ You can set the following Parquet-specific option(s) for reading
Parquet files:
+ * ``mergeSchema``: sets whether we should merge schemas
collected from all \
+ Parquet part-files. This will override
``spark.sql.parquet.mergeSchema``. \
+ The default value is specified in
``spark.sql.parquet.mergeSchema``.
+
+ .. note:: Experimental.
+
+ """
+ if isinstance(path, basestring):
+ path = [path]
+ return
self._df(self._jreader.parquet(self._spark._sc._jvm.PythonUtils.toSeq(path)))
+ else:
+ raise TypeError("path can be only a single string")
+
+ @ignore_unicode_prefix
+ @since(2.0)
+ def text(self, path):
+ """
+ Loads a text file stream and returns a :class:`DataFrame` whose
schema starts with a
+ string column named "value", and followed by partitioned columns
if there
+ are any.
+
+ Each line in the text file is a new row in the resulting DataFrame.
+
+ .. note:: Experimental.
+
+ :param paths: string, or list of strings, for input path(s).
+
+ """
+ if isinstance(path, basestring):
+ path = [path]
+ return
self._df(self._jreader.text(self._spark._sc._jvm.PythonUtils.toSeq(path)))
+ else:
+ raise TypeError("path can be only a single string")
+
+ @since(2.0)
+ def csv(self, path, schema=None, sep=None, encoding=None, quote=None,
escape=None,
--- End diff --
I would like to do that in that in a future PR. I would like to get this in
as fast as possible.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]