Github user HeartSaVioR commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21477#discussion_r193740695
  
    --- Diff: python/pyspark/sql/streaming.py ---
    @@ -843,6 +844,169 @@ def trigger(self, processingTime=None, once=None, 
continuous=None):
             self._jwrite = self._jwrite.trigger(jTrigger)
             return self
     
    +    def foreach(self, f):
    +        """
    +        Sets the output of the streaming query to be processed using the 
provided writer ``f``.
    +        This is often used to write the output of a streaming query to 
arbitrary storage systems.
    +        The processing logic can be specified in two ways.
    +
    +        #. A **function** that takes a row as input.
    +            This is a simple way to express your processing logic. Note 
that this does
    +            not allow you to deduplicate generated data when failures 
cause reprocessing of
    +            some input data. That would require you to specify the 
processing logic in the next
    +            way.
    +
    +        #. An **object** with a ``process`` method and optional ``open`` 
and ``close`` methods.
    +            The object can have the following methods.
    +
    +            * ``open(partition_id, epoch_id)``: *Optional* method that 
initializes the processing
    +                (for example, open a connection, start a transaction, 
etc). Additionally, you can
    +                use the `partition_id` and `epoch_id` to deduplicate 
regenerated data
    +                (discussed later).
    +
    +            * ``process(row)``: *Non-optional* method that processes each 
:class:`Row`.
    +
    +            * ``close(error)``: *Optional* method that finalizes and 
cleans up (for example,
    +                close connection, commit transaction, etc.) after all rows 
have been processed.
    +
    +            The object will be used by Spark in the following way.
    +
    +            * A single copy of this object is responsible of all the data 
generated by a
    +                single task in a query. In other words, one instance is 
responsible for
    +                processing one partition of the data generated in a 
distributed manner.
    +
    +            * This object must be serializable because each task will get 
a fresh
    +                serialized-deserializedcopy of the provided object. Hence, 
it is strongly
    +                recommended that any initialization for writing data (e.g. 
opening a
    +                connection or starting a transaction) be done open after 
the `open(...)`
    +                method has been called, which signifies that the task is 
ready to generate data.
    +
    +            * The lifecycle of the methods are as follows.
    +
    +                For each partition with ``partition_id``:
    +
    +                ... For each batch/epoch of streaming data with 
``epoch_id``:
    +
    +                ....... Method ``open(partitionId, epochId)`` is called.
    +
    +                ....... If ``open(...)`` returns true, for each row in the 
partition and
    +                        batch/epoch, method ``process(row)`` is called.
    +
    +                ....... Method ``close(errorOrNull)`` is called with error 
(if any) seen while
    +                        processing rows.
    +
    +            Important points to note:
    +
    +            * The `partitionId` and `epochId` can be used to deduplicate 
generated data when
    +                failures cause reprocessing of some input data. This 
depends on the execution
    +                mode of the query. If the streaming query is being 
executed in the micro-batch
    +                mode, then every partition represented by a unique tuple 
(partition_id, epoch_id)
    +                is guaranteed to have the same data. Hence, (partition_id, 
epoch_id) can be used
    +                to deduplicate and/or transactionally commit data and 
achieve exactly-once
    +                guarantees. However, if the streaming query is being 
executed in the continuous
    +                mode, then this guarantee does not hold and therefore 
should not be used for
    +                deduplication.
    +
    +            * The ``close()`` method (if exists) is will be called if 
`open()` method exists and
    +                returns successfully (irrespective of the return value), 
except if the Python
    +                crashes in the middle.
    +
    +        .. note:: Evolving.
    +
    +        >>> # Print every row using a function
    +        >>> writer = sdf.writeStream.foreach(lambda x: print(x))
    +        >>> # Print every row using a object with process() method
    +        >>> class RowPrinter:
    +        ...     def open(self, partition_id, epoch_id):
    +        ...         print("Opened %d, %d" % (partition_id, epoch_id))
    +        ...         return True
    +        ...     def process(self, row):
    +        ...         print(row)
    +        ...     def close(self, error):
    +        ...         print("Closed with error: %s" % str(error))
    +        ...
    +        >>> writer = sdf.writeStream.foreach(RowPrinter())
    +        """
    +
    +        from pyspark.rdd import _wrap_function
    +        from pyspark.serializers import PickleSerializer, 
AutoBatchedSerializer
    +        from pyspark.taskcontext import TaskContext
    +
    +        if callable(f):
    +            """
    +            The provided object is a callable function that is supposed to 
be called on each row.
    +            Construct a function that takes an iterator and calls the 
provided function on each row.
    +            """
    +            def func_without_process(_, iterator):
    +                for x in iterator:
    +                    f(x)
    +                return iter([])
    +
    +            func = func_without_process
    +
    +        else:
    +            """
    +            The provided object is not a callable function. Then it is 
expected to have a
    +            'process(row)' method, and optional 'open(partition_id, 
epoch_id)' and
    +            'close(error)' methods.
    +            """
    +
    +            if not hasattr(f, 'process'):
    +                raise Exception(
    +                    "Provided object is neither callable nor does it have 
a 'process' method")
    +
    +            if not callable(getattr(f, 'process')):
    +                raise Exception("Attribute 'process' in provided object is 
not callable")
    +
    +            open_exists = False
    +            if hasattr(f, 'open'):
    +                if not callable(getattr(f, 'open')):
    +                    raise Exception("Attribute 'open' in provided object 
is not callable")
    +                else:
    +                    open_exists = True
    +
    +            close_exists = False
    +            if hasattr(f, "close"):
    +                if not callable(getattr(f, 'close')):
    +                    raise Exception("Attribute 'close' in provided object 
is not callable")
    +                else:
    +                    close_exists = True
    +
    +            def func_with_open_process_close(partition_id, iterator):
    +                epoch_id = 
TaskContext.get().getLocalProperty('streaming.sql.batchId')
    +                if epoch_id:
    +                    epoch_id = int(epoch_id)
    +                else:
    +                    raise Exception("Could not get batch id from 
TaskContext")
    +
    +                should_process = True
    +                if open_exists:
    +                    should_process = f.open(partition_id, epoch_id)
    +
    +                def call_close_if_needed(error):
    +                    if open_exists and close_exists:
    +                        f.close(error)
    +                try:
    +                    if should_process:
    +                        for x in iterator:
    +                            f.process(x)
    +                except Exception as ex:
    +                    call_close_if_needed(ex)
    --- End diff --
    
    Either is fine for me.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to