[GitHub] [spark] HyukjinKwon commented on a diff in pull request #37893: [SPARK-40434][SS][PYTHON] Implement applyInPandasWithState in PySpark

2022-09-21 Thread GitBox


HyukjinKwon commented on code in PR #37893:
URL: https://github.com/apache/spark/pull/37893#discussion_r977150660


##
python/pyspark/sql/pandas/group_ops.py:
##
@@ -216,6 +218,125 @@ def applyInPandas(
 jdf = self._jgd.flatMapGroupsInPandas(udf_column._jc.expr())
 return DataFrame(jdf, self.session)
 
+def applyInPandasWithState(
+self,
+func: "PandasGroupedMapFunctionWithState",
+outputStructType: Union[StructType, str],
+stateStructType: Union[StructType, str],
+outputMode: str,
+timeoutConf: str,
+) -> DataFrame:
+"""
+Applies the given function to each group of data, while maintaining a 
user-defined
+per-group state. The result Dataset will represent the flattened 
record returned by the
+function.
+
+For a streaming Dataset, the function will be invoked first for all 
input groups and then
+for all timed out states where the input data is set to be empty. 
Updates to each group's
+state will be saved across invocations.

Review Comment:
   ```suggestion
   For a streaming :class:`DataFrame`, the function will be invoked 
first for all input groups
   and then for all timed out states where the input data is set to be 
empty. Updates to
   each group's state will be saved across invocations.
   ```



##
sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowWriter.scala:
##
@@ -98,6 +98,16 @@ class ArrowWriter(val root: VectorSchemaRoot, fields: 
Array[ArrowFieldWriter]) {
 count += 1
   }
 
+  def sizeInBytes(): Int = {

Review Comment:
   I think we don't need `sizeInBytes` and  `getSizeInBytes ` anymore



##
python/pyspark/sql/pandas/group_ops.py:
##
@@ -216,6 +218,125 @@ def applyInPandas(
 jdf = self._jgd.flatMapGroupsInPandas(udf_column._jc.expr())
 return DataFrame(jdf, self.session)
 
+def applyInPandasWithState(
+self,
+func: "PandasGroupedMapFunctionWithState",
+outputStructType: Union[StructType, str],
+stateStructType: Union[StructType, str],
+outputMode: str,
+timeoutConf: str,
+) -> DataFrame:
+"""
+Applies the given function to each group of data, while maintaining a 
user-defined
+per-group state. The result Dataset will represent the flattened 
record returned by the
+function.
+
+For a streaming Dataset, the function will be invoked first for all 
input groups and then
+for all timed out states where the input data is set to be empty. 
Updates to each group's
+state will be saved across invocations.
+
+The function should take parameters (key, 
Iterator[`pandas.DataFrame`], state) and
+return another Iterator[`pandas.DataFrame`]. The grouping key(s) will 
be passed as a tuple
+of numpy data types, e.g., `numpy.int32` and `numpy.float64`. The 
state will be passed as
+:class:`pyspark.sql.streaming.state.GroupState`.
+
+For each group, all columns are passed together as `pandas.DataFrame` 
to the user-function,
+and the returned `pandas.DataFrame` across all invocations are 
combined as a
+:class:`DataFrame`. Note that the user function should not make a 
guess of the number of
+elements in the iterator. To process all data, the user function needs 
to iterate all
+elements and process them. On the other hand, the user function is not 
strictly required to
+iterate through all elements in the iterator if it intends to read a 
part of data.
+
+The `outputStructType` should be a :class:`StructType` describing the 
schema of all
+elements in the returned value, `pandas.DataFrame`. The column labels 
of all elements in
+returned `pandas.DataFrame` must either match the field names in the 
defined schema if
+specified as strings, or match the field data types by position if not 
strings,
+e.g. integer indices.
+
+The `stateStructType` should be :class:`StructType` describing the 
schema of the
+user-defined state. The value of the state will be presented as a 
tuple, as well as the
+update should be performed with the tuple. The corresponding Python 
types for
+:class:DataType are supported. Please refer to the page
+https://spark.apache.org/docs/latest/sql-ref-datatypes.html (python 
tab).
+
+The size of each DataFrame in both the input and output can be 
arbitrary. The number of
+DataFrames in both the input and output can also be arbitrary.
+
+.. versionadded:: 3.4.0
+
+Parameters
+--
+func : function
+a Python native function to be called on every group. It should 
take parameters
+(key, Iterator[`pandas.DataFrame`], state) and return 
Iterator[`pandas.DataFrame`].
+Note that the type of the key 

[GitHub] [spark] HyukjinKwon commented on a diff in pull request #37893: [SPARK-40434][SS][PYTHON] Implement applyInPandasWithState in PySpark

2022-09-19 Thread GitBox


HyukjinKwon commented on code in PR #37893:
URL: https://github.com/apache/spark/pull/37893#discussion_r974870249


##
python/pyspark/sql/pandas/group_ops.py:
##
@@ -216,6 +218,105 @@ def applyInPandas(
 jdf = self._jgd.flatMapGroupsInPandas(udf_column._jc.expr())
 return DataFrame(jdf, self.session)
 
+def applyInPandasWithState(
+self,
+func: "PandasGroupedMapFunctionWithState",
+outputStructType: Union[StructType, str],
+stateStructType: Union[StructType, str],
+outputMode: str,
+timeoutConf: str,
+) -> DataFrame:
+"""
+Applies the given function to each group of data, while maintaining a 
user-defined
+per-group state. The result Dataset will represent the flattened 
record returned by the
+function.
+
+For a streaming Dataset, the function will be invoked for each group 
repeatedly in every
+trigger, and updates to each group's state will be saved across 
invocations. The function
+will also be invoked for each timed-out state repeatedly. The sequence 
of the invocation
+will be input data -> state timeout. When the function is invoked for 
state timeout, there
+will be no data being presented.
+
+The function should takes parameters (key, 
Iterator[`pandas.DataFrame`], state) and
+returns another Iterator[`pandas.DataFrame`]. The grouping key(s) will 
be passed as a tuple
+of numpy data types, e.g., `numpy.int32` and `numpy.float64`. The 
state will be passed as
+:class:`pyspark.sql.streaming.state.GroupStateImpl`.
+
+For each group, all columns are passed together as `pandas.DataFrame` 
to the user-function,
+and the returned `pandas.DataFrame` across all invocations are 
combined as a
+:class:`DataFrame`. Note that the user function should loop through 
and process all
+elements in the iterator. The user function should not make a guess of 
the number of
+elements in the iterator.
+
+The `outputStructType` should be a :class:`StructType` describing the 
schema of all
+elements in returned value, `pandas.DataFrame`. The column labels of 
all elements in
+returned value, `pandas.DataFrame` must either match the field names 
in the defined
+schema if specified as strings, or match the field data types by 
position if not strings,
+e.g. integer indices.
+
+The `stateStructType` should be :class:`StructType` describing the 
schema of user-defined
+state. The value of state will be presented as a tuple, as well as the 
update should be
+performed with the tuple. User defined types e.g. native Python class 
types are not

Review Comment:
   I think we can just say that "the corresponding Python types for 
:class:`DataType` are supported".  Documented here 
https://spark.apache.org/docs/latest/sql-ref-datatypes.html (click python tab)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] HyukjinKwon commented on a diff in pull request #37893: [SPARK-40434][SS][PYTHON] Implement applyInPandasWithState in PySpark

2022-09-18 Thread GitBox


HyukjinKwon commented on code in PR #37893:
URL: https://github.com/apache/spark/pull/37893#discussion_r973844153


##
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala:
##
@@ -2705,6 +2705,44 @@ object SQLConf {
   .booleanConf
   .createWithDefault(false)
 
+  val MAP_PANDAS_UDF_WITH_STATE_SOFT_LIMIT_SIZE_PER_BATCH =
+
buildConf("spark.sql.execution.applyInPandasWithState.softLimitSizePerBatch")
+  .internal()
+  .doc("When using applyInPandasWithState, set a soft limit of the 
accumulated size of " +
+"records that can be written to a single ArrowRecordBatch in memory. 
This is used to " +
+"restrict the amount of memory being used to materialize the data in 
both executor and " +
+"Python worker. The accumulated size of records are calculated via 
sampling a set of " +
+"records. Splitting the ArrowRecordBatch is performed per record, so 
unless a record " +
+"is quite huge, the size of constructed ArrowRecordBatch will be 
around the " +
+"configured value.")
+  .version("3.4.0")
+  .bytesConf(ByteUnit.BYTE)
+  .createWithDefaultString("64MB")
+
+  val MAP_PANDAS_UDF_WITH_STATE_MIN_DATA_COUNT_FOR_SAMPLE =
+
buildConf("spark.sql.execution.applyInPandasWithState.minDataCountForSample")
+  .internal()
+  .doc("When using applyInPandasWithState, specify the minimum number of 
records to sample " +
+"the size of record. The size being retrieved from sampling will be 
used to estimate " +
+"the accumulated size of records. Note that limiting by size does not 
work if the " +
+"number of records are less than the configured value. For such case, 
ArrowRecordBatch " +
+"will only be split for soft timeout.")
+  .version("3.4.0")
+  .intConf
+  .createWithDefault(100)
+
+  val MAP_PANDAS_UDF_WITH_STATE_SOFT_TIMEOUT_PURGE_BATCH =
+
buildConf("spark.sql.execution.applyInPandasWithState.softTimeoutPurgeBatch")
+  .internal()
+  .doc("When using applyInPandasWithState, specify the soft timeout for 
purging the " +
+"ArrowRecordBatch. If batching records exceeds the timeout, Spark will 
force splitting " +
+"the ArrowRecordBatch regardless of estimated size. This config 
ensures the receiver " +
+"of data (both executor and Python worker) to not wait indefinitely 
for sender to " +
+"complete the ArrowRecordBatch, which may hurt both throughput and 
latency.")
+  .version("3.4.0")
+  .timeConf(TimeUnit.MILLISECONDS)
+  .createWithDefaultString("100ms")

Review Comment:
   For this, can we just leverage `spark.sql.execution.pandas.udf.buffer.size` 
(the feature this PR adds already respects it) if the flush time matters? That 
configuration is for the purpose.



##
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala:
##
@@ -142,6 +143,17 @@ object UnsupportedOperationChecker extends Logging {
   " or the output mode is not append on a streaming 
DataFrames/Datasets")(plan)
 }
 
+val applyInPandasWithStates = plan.collect {
+  case f: FlatMapGroupsInPandasWithState if f.isStreaming => f
+}
+
+// Disallow multiple `applyInPandasWithState`s.
+if (applyInPandasWithStates.size >= 2) {

Review Comment:
   no biggie but .. 
   
   ```suggestion
   if (applyInPandasWithStates.size > 1) {
   ```
   
   



##
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala:
##
@@ -2705,6 +2705,44 @@ object SQLConf {
   .booleanConf
   .createWithDefault(false)
 
+  val MAP_PANDAS_UDF_WITH_STATE_SOFT_LIMIT_SIZE_PER_BATCH =
+
buildConf("spark.sql.execution.applyInPandasWithState.softLimitSizePerBatch")
+  .internal()
+  .doc("When using applyInPandasWithState, set a soft limit of the 
accumulated size of " +
+"records that can be written to a single ArrowRecordBatch in memory. 
This is used to " +
+"restrict the amount of memory being used to materialize the data in 
both executor and " +
+"Python worker. The accumulated size of records are calculated via 
sampling a set of " +
+"records. Splitting the ArrowRecordBatch is performed per record, so 
unless a record " +
+"is quite huge, the size of constructed ArrowRecordBatch will be 
around the " +
+"configured value.")
+  .version("3.4.0")
+  .bytesConf(ByteUnit.BYTE)
+  .createWithDefaultString("64MB")

Review Comment:
   I think we should have a general configuration for this later that applies 
to all Arrow batch (SPARK-23258). I think we should reuse 
`spark.sql.execution.arrow.maxRecordsPerBatch` for the time being.
   



##
sql/core/src/main/scala/org/apache/spark/sql/execution/python/ApplyInPandasWithStatePythonRunner.scala:
##
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * 

[GitHub] [spark] HyukjinKwon commented on a diff in pull request #37893: [SPARK-40434][SS][PYTHON] Implement applyInPandasWithState in PySpark

2022-09-18 Thread GitBox


HyukjinKwon commented on code in PR #37893:
URL: https://github.com/apache/spark/pull/37893#discussion_r973840041


##
python/pyspark/sql/pandas/_typing/__init__.pyi:
##
@@ -256,6 +258,10 @@ PandasGroupedMapFunction = Union[
 Callable[[Any, DataFrameLike], DataFrameLike],
 ]
 
+PandasGroupedMapFunctionWithState = Callable[
+[Any, Iterable[DataFrameLike], GroupStateImpl], Iterable[DataFrameLike]

Review Comment:
   One concern is that if we happen to have a different implementation of 
`GroupState` in the far future. But the type is dynamic anyway so I don't worry 
too much.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] HyukjinKwon commented on a diff in pull request #37893: [SPARK-40434][SS][PYTHON] Implement applyInPandasWithState in PySpark

2022-09-18 Thread GitBox


HyukjinKwon commented on code in PR #37893:
URL: https://github.com/apache/spark/pull/37893#discussion_r973839939


##
python/pyspark/sql/pandas/_typing/__init__.pyi:
##
@@ -256,6 +258,10 @@ PandasGroupedMapFunction = Union[
 Callable[[Any, DataFrameLike], DataFrameLike],
 ]
 
+PandasGroupedMapFunctionWithState = Callable[
+[Any, Iterable[DataFrameLike], GroupStateImpl], Iterable[DataFrameLike]

Review Comment:
   I am fine either way too. Users aren't able to create this instance directly 
anyway.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org