HeartSaVioR commented on a change in pull request #31296:
URL: https://github.com/apache/spark/pull/31296#discussion_r563573369



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
##########
@@ -2897,14 +2897,22 @@ class Dataset[T] private[sql](
    * each line of stdout resulting in one element of the output partition. A 
process is invoked
    * even for empty partitions.
    *
-   * @param command command to run in forked process.
+   * Note that for micro-batch streaming Dataset, the effect of pipe is only 
per micro-batch, not

Review comment:
       I'd kindly explain the case they need to be careful, like `e.g. If your 
external process does aggregation on inputs, the aggregation is applied per a 
partition in micro-batch. You may want to aggregate these outputs after calling 
pipe to get global aggregation across partitions and also across micro-batches.`

##########
File path: sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
##########
@@ -2897,14 +2897,22 @@ class Dataset[T] private[sql](
    * each line of stdout resulting in one element of the output partition. A 
process is invoked
    * even for empty partitions.
    *
-   * @param command command to run in forked process.
+   * Note that for micro-batch streaming Dataset, the effect of pipe is only 
per micro-batch, not
+   * cross entire stream.
    *
+   * @param command command to run in forked process.
+   * @param printElement Use this function to customize how to pipe elements. 
This function
+   *                     will be called with each Dataset element as the 1st 
parameter, and the
+   *                     print line function (like out.println()) as the 2nd 
parameter.
    * @group typedrel
    * @since 3.2.0
    */
-  def pipe(command: String): Dataset[String] = {
+  def pipe(command: String, printElement: (T, String => Unit) => Unit): 
Dataset[String] = {

Review comment:
       I see all examples are simply calling print function with converted 
string. Could we simply get serializer func like `serializeFn: (T => String)` 
instead, or have two overloaded methods allowing both cases if we are unsure 
printElement might be necessary? This should simplify the test codes and actual 
user codes (as `_.toString` would simply work).




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to