Github user zsxwing commented on a diff in the pull request:
https://github.com/apache/spark/pull/21571#discussion_r195878970
--- Diff: python/pyspark/sql/streaming.py ---
@@ -1016,6 +1018,35 @@ def func_with_open_process_close(partition_id,
iterator):
self._jwrite.foreach(jForeachWriter)
return self
+ @since(2.4)
+ def foreachBatch(self, func):
+ """
+ Sets the output of the streaming query to be processed using the
provided
+ function. This is supported only the in the micro-batch execution
modes (that is, when the
+ trigger is not continuous). In every micro-batch, the provided
function will be called in
+ every micro-batch with (i) the output rows as a DataFrame and (ii)
the batch identifier.
+ The batchId can be used deduplicate and transactionally write the
output
+ (that is, the provided Dataset) to external systems. The output
DataFrame is guaranteed
+ to exactly same for the same batchId (assuming all operations are
deterministic in the
+ query).
+
+ .. note:: Evolving.
+
+ >>> def func(batch_df, batch_id):
+ ... batch_df.collect()
+ ...
+ >>> writer = sdf.writeStream.foreach(func)
+ """
+
+ from pyspark.java_gateway import ensure_callback_server_started
+ gw = self._spark._sc._gateway
+ java_import(gw.jvm,
"org.apache.spark.sql.execution.streaming.sources.*")
+
+ wrapped_func = ForeachBatchFunction(self._spark, func)
+ gw.jvm.PythonForeachBatchHelper.callForeachBatch(self._jwrite,
wrapped_func)
+ ensure_callback_server_started(gw)
--- End diff --
this should be above otherwise there is a race that the streaming query
calls this python func before the callback server is started.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]