Github user HeartSaVioR commented on a diff in the pull request:
https://github.com/apache/spark/pull/21477#discussion_r193284293
--- Diff: python/pyspark/sql/streaming.py ---
@@ -843,6 +844,169 @@ def trigger(self, processingTime=None, once=None,
continuous=None):
self._jwrite = self._jwrite.trigger(jTrigger)
return self
+ def foreach(self, f):
+ """
+ Sets the output of the streaming query to be processed using the
provided writer ``f``.
+ This is often used to write the output of a streaming query to
arbitrary storage systems.
+ The processing logic can be specified in two ways.
+
+ #. A **function** that takes a row as input.
+ This is a simple way to express your processing logic. Note
that this does
+ not allow you to deduplicate generated data when failures
cause reprocessing of
+ some input data. That would require you to specify the
processing logic in the next
+ way.
+
+ #. An **object** with a ``process`` method and optional ``open``
and ``close`` methods.
+ The object can have the following methods.
+
+ * ``open(partition_id, epoch_id)``: *Optional* method that
initializes the processing
+ (for example, open a connection, start a transaction,
etc). Additionally, you can
+ use the `partition_id` and `epoch_id` to deduplicate
regenerated data
+ (discussed later).
+
+ * ``process(row)``: *Non-optional* method that processes each
:class:`Row`.
+
+ * ``close(error)``: *Optional* method that finalizes and
cleans up (for example,
+ close connection, commit transaction, etc.) after all rows
have been processed.
+
+ The object will be used by Spark in the following way.
+
+ * A single copy of this object is responsible of all the data
generated by a
+ single task in a query. In other words, one instance is
responsible for
+ processing one partition of the data generated in a
distributed manner.
+
+ * This object must be serializable because each task will get
a fresh
+ serialized-deserializedcopy of the provided object. Hence,
it is strongly
--- End diff --
nit: deserialized` `copy (space)
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]