HeartSaVioR commented on a change in pull request #31296:
URL: https://github.com/apache/spark/pull/31296#discussion_r563573369
##########
File path: sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
##########
@@ -2897,14 +2897,22 @@ class Dataset[T] private[sql](
* each line of stdout resulting in one element of the output partition. A
process is invoked
* even for empty partitions.
*
- * @param command command to run in forked process.
+ * Note that for micro-batch streaming Dataset, the effect of pipe is only
per micro-batch, not
Review comment:
I'd kindly explain the case they need to be careful, like `e.g. If your
external process does aggregation on inputs, the aggregation is applied per a
partition in micro-batch. You may want to aggregate these outputs after calling
pipe to get global aggregation across partitions and also across micro-batches.`
##########
File path: sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
##########
@@ -2897,14 +2897,22 @@ class Dataset[T] private[sql](
* each line of stdout resulting in one element of the output partition. A
process is invoked
* even for empty partitions.
*
- * @param command command to run in forked process.
+ * Note that for micro-batch streaming Dataset, the effect of pipe is only
per micro-batch, not
+ * cross entire stream.
*
+ * @param command command to run in forked process.
+ * @param printElement Use this function to customize how to pipe elements.
This function
+ * will be called with each Dataset element as the 1st
parameter, and the
+ * print line function (like out.println()) as the 2nd
parameter.
* @group typedrel
* @since 3.2.0
*/
- def pipe(command: String): Dataset[String] = {
+ def pipe(command: String, printElement: (T, String => Unit) => Unit):
Dataset[String] = {
Review comment:
I see all examples are simply calling print function with converted
string. Could we simply get serializer func like `serializeFn: (T => String)`
instead, or have two overloaded methods allowing both cases if we are unsure
printElement might be necessary? This should simplify the test codes and actual
user codes (as `_.toString` would simply work).
##########
File path: sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
##########
@@ -2897,14 +2897,22 @@ class Dataset[T] private[sql](
* each line of stdout resulting in one element of the output partition. A
process is invoked
* even for empty partitions.
*
- * @param command command to run in forked process.
+ * Note that for micro-batch streaming Dataset, the effect of pipe is only
per micro-batch, not
+ * cross entire stream.
*
+ * @param command command to run in forked process.
+ * @param printElement Use this function to customize how to pipe elements.
This function
+ * will be called with each Dataset element as the 1st
parameter, and the
+ * print line function (like out.println()) as the 2nd
parameter.
* @group typedrel
* @since 3.2.0
*/
- def pipe(command: String): Dataset[String] = {
+ def pipe(command: String, printElement: (T, String => Unit) => Unit):
Dataset[String] = {
Review comment:
I see all examples are simply calling print function with converted
string. Could we simply get serializer func like `serializeFn: (T => String)`
instead, or have two overloaded methods allowing both cases if we are unsure
printElement might be necessary in some cases? This should simplify the test
codes and actual user codes (as `_.toString` would simply work).
##########
File path:
sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
##########
@@ -1264,6 +1264,20 @@ class StreamSuite extends StreamTest {
}
}
}
+
+ test("SPARK-34205: Pipe Streaming Dataset") {
+ assume(TestUtils.testCommandAvailable("cat"))
+
+ val inputData = MemoryStream[Int]
+ val piped = inputData.toDS()
+ .pipe("cat", (n, printFunc) => printFunc(n.toString)).toDF
+
+ testStream(piped)(
+ AddData(inputData, 1, 2, 3),
+ CheckAnswer(Row("1"), Row("2"), Row("3")),
+ AddData(inputData, 4),
+ CheckAnswer(Row("1"), Row("2"), Row("3"), Row("4")))
Review comment:
I'd rather check `CheckNewAnswer(Row("4"))` to ensure inputs in previous
batch are not affected to the next batch.
##########
File path: sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
##########
@@ -2007,6 +2007,54 @@ class DatasetSuite extends QueryTest
checkAnswer(withUDF, Row(Row(1), null, null) :: Row(Row(1), null, null) ::
Nil)
}
+
+ test("SPARK-34205: Pipe Dataset") {
+ assume(TestUtils.testCommandAvailable("cat"))
+
+ val nums = spark.range(4)
+ val piped = nums.pipe("cat", (l, printFunc) => printFunc(l.toString)).toDF
Review comment:
Great point! I don't know how exhaustive Spark implements the Hive's
transform feature, but the description in Hive's manual for transform looks
pretty much powerful, and much beyond on what we plan to provide with pipe.
https://cwiki.apache.org/confluence/display/Hive/LanguageManual+Transform#LanguageManualTransform-Transform/Map-ReduceSyntax
Looks like the reason of absence of pipe in DataFrame is obvious - transform
just replaced it. That looks to be only available in SQL statement so we still
need DSL support for using it in SS.
##########
File path: sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
##########
@@ -2007,6 +2007,54 @@ class DatasetSuite extends QueryTest
checkAnswer(withUDF, Row(Row(1), null, null) :: Row(Row(1), null, null) ::
Nil)
}
+
+ test("SPARK-34205: Pipe Dataset") {
+ assume(TestUtils.testCommandAvailable("cat"))
+
+ val nums = spark.range(4)
+ val piped = nums.pipe("cat", (l, printFunc) => printFunc(l.toString)).toDF
Review comment:
Great point! I don't know how exhaustive Spark implements the Hive's
transform feature, but the description in Hive's manual for transform looks
pretty much powerful, and much beyond on what we plan to provide with pipe.
https://cwiki.apache.org/confluence/display/Hive/LanguageManual+Transform#LanguageManualTransform-Transform/Map-ReduceSyntax
~Looks like the reason of absence of pipe in DataFrame is obvious -
transform just replaced it.~ (Not valid as it was only available for Hive
support) That looks to be only available in SQL statement so we still need DSL
support for using it in SS.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]