[GitHub] [spark] alex-balikov commented on a diff in pull request #37893: [SPARK-40434][SS][PYTHON] Implement applyInPandasWithState in PySpark

2022-09-21 Thread GitBox


alex-balikov commented on code in PR #37893:
URL: https://github.com/apache/spark/pull/37893#discussion_r976868040


##
python/pyspark/sql/pandas/group_ops.py:
##
@@ -216,6 +218,125 @@ def applyInPandas(
 jdf = self._jgd.flatMapGroupsInPandas(udf_column._jc.expr())
 return DataFrame(jdf, self.session)
 
+def applyInPandasWithState(
+self,
+func: "PandasGroupedMapFunctionWithState",
+outputStructType: Union[StructType, str],
+stateStructType: Union[StructType, str],
+outputMode: str,
+timeoutConf: str,
+) -> DataFrame:
+"""
+Applies the given function to each group of data, while maintaining a 
user-defined
+per-group state. The result Dataset will represent the flattened 
record returned by the
+function.
+
+For a streaming Dataset, the function will be invoked first for all 
input groups and then
+for all timed out states where the input data is set to be empty. 
Updates to each group's
+state will be saved across invocations.
+
+The function should take parameters (key, 
Iterator[`pandas.DataFrame`], state) and
+returns another Iterator[`pandas.DataFrame`]. The grouping key(s) will 
be passed as a tuple

Review Comment:
   return another ...



##
python/pyspark/worker.py:
##
@@ -207,6 +209,89 @@ def wrapped(key_series, value_series):
 return lambda k, v: [(wrapped(k, v), to_arrow_type(return_type))]
 
 
+def wrap_grouped_map_pandas_udf_with_state(f, return_type):
+"""
+Provides a new lambda instance wrapping user function of 
applyInPandasWithState.
+
+The lambda instance receives (key series, iterator of value series, state) 
and performs
+some conversion to be adapted with the signature of user function.
+
+See the function doc of inner function `wrapped` for more details on what 
adapter does.
+See the function doc of `mapper` function for
+`eval_type == PythonEvalType.SQL_GROUPED_MAP_PANDAS_UDF_WITH_STATE` for 
more details on
+the input parameters of lambda function.
+
+Along with the returned iterator, the lambda instance will also produce 
the return_type as
+converted to the arrow schema.
+"""
+
+def wrapped(key_series, value_series_gen, state):
+"""
+Provide an adapter of the user function performing below:
+
+- Extract the first value of all columns in key series and produce as 
a tuple.
+- If the state has timed out, call the user function with empty pandas 
DataFrame.
+- If not, construct a new generator which converts each element of 
value series to
+  pandas DataFrame (lazy evaluation), and call the user function with 
the generator
+- Verify each element of returned iterator to check the schema of 
pandas DataFrame.
+"""
+import pandas as pd
+
+key = tuple(s[0] for s in key_series)
+
+if state.hasTimedOut:
+# Timeout processing pass empty iterator. Here we return an empty 
DataFrame instead.
+values = [
+pd.DataFrame(columns=pd.concat(next(value_series_gen), 
axis=1).columns),
+]
+else:
+values = (pd.concat(x, axis=1) for x in value_series_gen)
+
+result_iter = f(key, values, state)
+
+def verify_element(result):
+if not isinstance(result, pd.DataFrame):
+raise TypeError(
+"The type of element in return iterator of the 
user-defined function "
+"should be pandas.DataFrame, but is 
{}".format(type(result))
+)
+# the number of columns of result have to match the return type
+# but it is fine for result to have no columns at all if it is 
empty
+if not (
+len(result.columns) == len(return_type) or len(result.columns) 
== 0 and result.empty

Review Comment:
   may be it is just me but I would suggest adding parentheses so we do not 
rely on and/or priority 



##
python/pyspark/sql/pandas/serializers.py:
##
@@ -371,3 +373,354 @@ def load_stream(self, stream):
 raise ValueError(
 "Invalid number of pandas.DataFrames in group 
{0}".format(dataframes_in_group)
 )
+
+
+class ApplyInPandasWithStateSerializer(ArrowStreamPandasUDFSerializer):
+"""
+Serializer used by Python worker to evaluate UDF for 
applyInPandasWithState.
+
+Parameters
+--
+timezone : str
+A timezone to respect when handling timestamp values
+safecheck : bool
+If True, conversion from Arrow to Pandas checks for overflow/truncation
+assign_cols_by_name : bool
+If True, then Pandas DataFrames will get columns by name
+state_object_schema : StructType
+The type of state object represented as Spark SQL type
+arrow_max_records_per_batch : int
+Limit of 

[GitHub] [spark] alex-balikov commented on a diff in pull request #37893: [SPARK-40434][SS][PYTHON] Implement applyInPandasWithState in PySpark

2022-09-20 Thread GitBox


alex-balikov commented on code in PR #37893:
URL: https://github.com/apache/spark/pull/37893#discussion_r975687838


##
sql/core/src/main/scala/org/apache/spark/sql/execution/python/FlatMapGroupsInPandasWithStateExec.scala:
##
@@ -0,0 +1,214 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.python
+
+import org.apache.spark.TaskContext
+import org.apache.spark.api.python.{ChainedPythonFunctions, PythonEvalType}
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.encoders.{ExpressionEncoder, RowEncoder}
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.plans.logical.{EventTimeTimeout, 
ProcessingTimeTimeout}
+import org.apache.spark.sql.catalyst.plans.physical.Distribution
+import org.apache.spark.sql.execution.{GroupedIterator, SparkPlan, 
UnaryExecNode}
+import org.apache.spark.sql.execution.python.PandasGroupUtils.resolveArgOffsets
+import org.apache.spark.sql.execution.streaming._
+import org.apache.spark.sql.execution.streaming.GroupStateImpl.NO_TIMESTAMP
+import 
org.apache.spark.sql.execution.streaming.state.FlatMapGroupsWithStateExecHelper.StateData
+import org.apache.spark.sql.execution.streaming.state.StateStore
+import org.apache.spark.sql.streaming.{GroupStateTimeout, OutputMode}
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.util.ArrowUtils
+import org.apache.spark.util.CompletionIterator
+
+/**
+ * Physical operator for executing
+ * 
[[org.apache.spark.sql.catalyst.plans.logical.FlatMapGroupsInPandasWithState]]
+ *
+ * @param functionExpr function called on each group
+ * @param groupingAttributes used to group the data
+ * @param outAttributes used to define the output rows
+ * @param stateType used to serialize/deserialize state before calling 
`functionExpr`
+ * @param stateInfo `StatefulOperatorStateInfo` to identify the state store 
for a given operator.
+ * @param stateFormatVersion the version of state format.
+ * @param outputMode the output mode of `functionExpr`
+ * @param timeoutConf used to timeout groups that have not received data in a 
while
+ * @param batchTimestampMs processing timestamp of the current batch.
+ * @param eventTimeWatermark event time watermark for the current batch
+ * @param child logical plan of the underlying data
+ */
+case class FlatMapGroupsInPandasWithStateExec(

Review Comment:
   I wonder if this can be merged with the regular FlatMapGroupsWithStateExec. 
Maybe as a followup cleanup.



##
sql/core/src/main/scala/org/apache/spark/sql/execution/python/FlatMapGroupsInPandasWithStateExec.scala:
##
@@ -0,0 +1,214 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.python
+
+import org.apache.spark.TaskContext
+import org.apache.spark.api.python.{ChainedPythonFunctions, PythonEvalType}
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.encoders.{ExpressionEncoder, RowEncoder}
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.plans.logical.{EventTimeTimeout, 
ProcessingTimeTimeout}
+import org.apache.spark.sql.catalyst.plans.physical.Distribution
+import org.apache.spark.sql.execution.{GroupedIterator, SparkPlan, 
UnaryExecNode}
+import 

[GitHub] [spark] alex-balikov commented on a diff in pull request #37893: [SPARK-40434][SS][PYTHON] Implement applyInPandasWithState in PySpark

2022-09-19 Thread GitBox


alex-balikov commented on code in PR #37893:
URL: https://github.com/apache/spark/pull/37893#discussion_r974707164


##
sql/core/src/main/scala/org/apache/spark/sql/execution/python/ApplyInPandasWithStatePythonRunner.scala:
##
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.python
+
+import java.io._
+
+import scala.collection.JavaConverters._
+
+import org.apache.arrow.vector.VectorSchemaRoot
+import org.apache.arrow.vector.ipc.ArrowStreamWriter
+import org.json4s._
+import org.json4s.jackson.JsonMethods._
+
+import org.apache.spark.api.python._
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.api.python.PythonSQLUtils
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
+import org.apache.spark.sql.catalyst.expressions.UnsafeRow
+import 
org.apache.spark.sql.execution.python.ApplyInPandasWithStatePythonRunner.{InType,
 OutType, OutTypeForState, STATE_METADATA_SCHEMA_FROM_PYTHON_WORKER}
+import 
org.apache.spark.sql.execution.python.ApplyInPandasWithStateWriter.STATE_METADATA_SCHEMA
+import org.apache.spark.sql.execution.streaming.GroupStateImpl
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types._
+import org.apache.spark.sql.vectorized.{ArrowColumnVector, ColumnarBatch}
+
+
+/**
+ * A variant implementation of [[ArrowPythonRunner]] to serve the operation
+ * applyInPandasWithState.
+ */
+class ApplyInPandasWithStatePythonRunner(
+funcs: Seq[ChainedPythonFunctions],
+evalType: Int,
+argOffsets: Array[Array[Int]],
+inputSchema: StructType,
+override protected val timeZoneId: String,
+initialWorkerConf: Map[String, String],
+stateEncoder: ExpressionEncoder[Row],
+keySchema: StructType,
+valueSchema: StructType,
+stateValueSchema: StructType,
+softLimitBytesPerBatch: Long,
+minDataCountForSample: Int,
+softTimeoutMillsPurgeBatch: Long)
+  extends BasePythonRunner[InType, OutType](funcs, evalType, argOffsets)
+  with PythonArrowInput[InType]
+  with PythonArrowOutput[OutType] {
+
+  override protected val schema: StructType = inputSchema.add("__state", 
STATE_METADATA_SCHEMA)
+
+  override val simplifiedTraceback: Boolean = 
SQLConf.get.pysparkSimplifiedTraceback
+
+  override val bufferSize: Int = SQLConf.get.pandasUDFBufferSize
+  require(
+bufferSize >= 4,
+"Pandas execution requires more than 4 bytes. Please set higher buffer. " +
+  s"Please change '${SQLConf.PANDAS_UDF_BUFFER_SIZE.key}'.")
+
+  // applyInPandasWithState has its own mechanism to construct the Arrow 
RecordBatch instance.
+  // Configurations are both applied to executor and Python worker, set them 
to the worker conf
+  // to let Python worker read the config properly.
+  override protected val workerConf: Map[String, String] = initialWorkerConf +
+(SQLConf.MAP_PANDAS_UDF_WITH_STATE_SOFT_LIMIT_SIZE_PER_BATCH.key ->
+  softLimitBytesPerBatch.toString) +
+(SQLConf.MAP_PANDAS_UDF_WITH_STATE_MIN_DATA_COUNT_FOR_SAMPLE.key ->
+  minDataCountForSample.toString) +
+(SQLConf.MAP_PANDAS_UDF_WITH_STATE_SOFT_TIMEOUT_PURGE_BATCH.key ->
+  softTimeoutMillsPurgeBatch.toString)
+
+  private val stateRowDeserializer = stateEncoder.createDeserializer()
+
+  override protected def handleMetadataBeforeExec(stream: DataOutputStream): 
Unit = {
+super.handleMetadataBeforeExec(stream)
+// Also write the schema for state value
+PythonRDD.writeUTF(stateValueSchema.json, stream)
+  }
+
+  protected def writeIteratorToArrowStream(
+  root: VectorSchemaRoot,
+  writer: ArrowStreamWriter,
+  dataOut: DataOutputStream,
+  inputIterator: Iterator[InType]): Unit = {
+val w = new ApplyInPandasWithStateWriter(root, writer, 
softLimitBytesPerBatch,
+  minDataCountForSample, softTimeoutMillsPurgeBatch)
+
+while (inputIterator.hasNext) {
+  val (keyRow, groupState, dataIter) = inputIterator.next()
+  assert(dataIter.hasNext, "should have at least one data row!")
+  w.startNewGroup(keyRow, groupState)
+
+  while (dataIter.hasNext) {
+val dataRow = dataIter.next()
+w.writeRow(dataRow)

[GitHub] [spark] alex-balikov commented on a diff in pull request #37893: [SPARK-40434][SS][PYTHON] Implement applyInPandasWithState in PySpark

2022-09-19 Thread GitBox


alex-balikov commented on code in PR #37893:
URL: https://github.com/apache/spark/pull/37893#discussion_r974680745


##
sql/core/src/main/scala/org/apache/spark/sql/execution/python/ApplyInPandasWithStatePythonRunner.scala:
##
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.python
+
+import java.io._
+
+import scala.collection.JavaConverters._
+
+import org.apache.arrow.vector.VectorSchemaRoot
+import org.apache.arrow.vector.ipc.ArrowStreamWriter
+import org.json4s._
+import org.json4s.jackson.JsonMethods._
+
+import org.apache.spark.api.python._
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.api.python.PythonSQLUtils
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
+import org.apache.spark.sql.catalyst.expressions.UnsafeRow
+import 
org.apache.spark.sql.execution.python.ApplyInPandasWithStatePythonRunner.{InType,
 OutType, OutTypeForState, STATE_METADATA_SCHEMA_FROM_PYTHON_WORKER}
+import 
org.apache.spark.sql.execution.python.ApplyInPandasWithStateWriter.STATE_METADATA_SCHEMA
+import org.apache.spark.sql.execution.streaming.GroupStateImpl
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types._
+import org.apache.spark.sql.vectorized.{ArrowColumnVector, ColumnarBatch}
+
+
+/**
+ * A variant implementation of [[ArrowPythonRunner]] to serve the operation
+ * applyInPandasWithState.
+ */
+class ApplyInPandasWithStatePythonRunner(
+funcs: Seq[ChainedPythonFunctions],
+evalType: Int,
+argOffsets: Array[Array[Int]],
+inputSchema: StructType,
+override protected val timeZoneId: String,
+initialWorkerConf: Map[String, String],
+stateEncoder: ExpressionEncoder[Row],
+keySchema: StructType,
+valueSchema: StructType,
+stateValueSchema: StructType,
+softLimitBytesPerBatch: Long,
+minDataCountForSample: Int,
+softTimeoutMillsPurgeBatch: Long)
+  extends BasePythonRunner[InType, OutType](funcs, evalType, argOffsets)
+  with PythonArrowInput[InType]
+  with PythonArrowOutput[OutType] {
+
+  override protected val schema: StructType = inputSchema.add("__state", 
STATE_METADATA_SCHEMA)
+
+  override val simplifiedTraceback: Boolean = 
SQLConf.get.pysparkSimplifiedTraceback
+
+  override val bufferSize: Int = SQLConf.get.pandasUDFBufferSize
+  require(
+bufferSize >= 4,

Review Comment:
   why do we care to throw this exception. may just ensure that the buffer size 
if bigger:
   
   override val bufferSize: Int = max(4, SQLConf.get.pandasUDFBufferSize)



##
sql/core/src/main/scala/org/apache/spark/sql/execution/python/ApplyInPandasWithStatePythonRunner.scala:
##
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.python
+
+import java.io._
+
+import scala.collection.JavaConverters._
+
+import org.apache.arrow.vector.VectorSchemaRoot
+import org.apache.arrow.vector.ipc.ArrowStreamWriter
+import org.json4s._
+import org.json4s.jackson.JsonMethods._
+
+import org.apache.spark.api.python._
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.api.python.PythonSQLUtils
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
+import org.apache.spark.sql.catalyst.expressions.UnsafeRow
+import 

[GitHub] [spark] alex-balikov commented on a diff in pull request #37893: [SPARK-40434][SS][PYTHON] Implement applyInPandasWithState in PySpark

2022-09-19 Thread GitBox


alex-balikov commented on code in PR #37893:
URL: https://github.com/apache/spark/pull/37893#discussion_r974672806


##
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala:
##
@@ -2705,6 +2705,44 @@ object SQLConf {
   .booleanConf
   .createWithDefault(false)
 
+  val MAP_PANDAS_UDF_WITH_STATE_SOFT_LIMIT_SIZE_PER_BATCH =
+
buildConf("spark.sql.execution.applyInPandasWithState.softLimitSizePerBatch")
+  .internal()
+  .doc("When using applyInPandasWithState, set a soft limit of the 
accumulated size of " +
+"records that can be written to a single ArrowRecordBatch in memory. 
This is used to " +
+"restrict the amount of memory being used to materialize the data in 
both executor and " +
+"Python worker. The accumulated size of records are calculated via 
sampling a set of " +
+"records. Splitting the ArrowRecordBatch is performed per record, so 
unless a record " +
+"is quite huge, the size of constructed ArrowRecordBatch will be 
around the " +
+"configured value.")
+  .version("3.4.0")
+  .bytesConf(ByteUnit.BYTE)
+  .createWithDefaultString("64MB")
+
+  val MAP_PANDAS_UDF_WITH_STATE_MIN_DATA_COUNT_FOR_SAMPLE =

Review Comment:
   I wonder if we really care to have this param. Ultimately if the sizing 
estimate works badly, the users can just set a lower value for the batch size 
limit. I do not think it is useful to let them tune this parameter.



##
sql/core/src/main/scala/org/apache/spark/sql/RelationalGroupedDataset.scala:
##
@@ -620,6 +622,35 @@ class RelationalGroupedDataset protected[sql](
 Dataset.ofRows(df.sparkSession, plan)
   }
 
+  private[sql] def applyInPandasWithState(

Review Comment:
   method level comment



##
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala:
##
@@ -2705,6 +2705,44 @@ object SQLConf {
   .booleanConf
   .createWithDefault(false)
 
+  val MAP_PANDAS_UDF_WITH_STATE_SOFT_LIMIT_SIZE_PER_BATCH =
+
buildConf("spark.sql.execution.applyInPandasWithState.softLimitSizePerBatch")
+  .internal()
+  .doc("When using applyInPandasWithState, set a soft limit of the 
accumulated size of " +
+"records that can be written to a single ArrowRecordBatch in memory. 
This is used to " +
+"restrict the amount of memory being used to materialize the data in 
both executor and " +
+"Python worker. The accumulated size of records are calculated via 
sampling a set of " +
+"records. Splitting the ArrowRecordBatch is performed per record, so 
unless a record " +
+"is quite huge, the size of constructed ArrowRecordBatch will be 
around the " +
+"configured value.")
+  .version("3.4.0")
+  .bytesConf(ByteUnit.BYTE)
+  .createWithDefaultString("64MB")

Review Comment:
   I agree that expressing the limit in terms of bytes is more meaningful that 
records. However we estimate the bytes size efficiently. Specifically here I 
would rename 'softLimitSizePerBatch' by removing 'soft' - we can clarify in the 
comment about that and also including 'Bytes' - 'batchSizeLimitBytes' . Also 
wonder if we should have the property specific to applyInPandasWithState or 
should we make it general - remove the applyInPandasWithState scoping even if 
we do not support this limit initially, seems like generally meaningful and we 
can follow up fixing the other places as a bug.



##
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala:
##
@@ -2705,6 +2705,44 @@ object SQLConf {
   .booleanConf
   .createWithDefault(false)
 
+  val MAP_PANDAS_UDF_WITH_STATE_SOFT_LIMIT_SIZE_PER_BATCH =
+
buildConf("spark.sql.execution.applyInPandasWithState.softLimitSizePerBatch")
+  .internal()
+  .doc("When using applyInPandasWithState, set a soft limit of the 
accumulated size of " +
+"records that can be written to a single ArrowRecordBatch in memory. 
This is used to " +
+"restrict the amount of memory being used to materialize the data in 
both executor and " +
+"Python worker. The accumulated size of records are calculated via 
sampling a set of " +
+"records. Splitting the ArrowRecordBatch is performed per record, so 
unless a record " +
+"is quite huge, the size of constructed ArrowRecordBatch will be 
around the " +
+"configured value.")
+  .version("3.4.0")
+  .bytesConf(ByteUnit.BYTE)
+  .createWithDefaultString("64MB")
+
+  val MAP_PANDAS_UDF_WITH_STATE_MIN_DATA_COUNT_FOR_SAMPLE =
+
buildConf("spark.sql.execution.applyInPandasWithState.minDataCountForSample")
+  .internal()
+  .doc("When using applyInPandasWithState, specify the minimum number of 
records to sample " +
+"the size of record. The size being retrieved from sampling will be 
used to estimate " +
+"the accumulated size of records. Note that limiting by size does not 

[GitHub] [spark] alex-balikov commented on a diff in pull request #37893: [SPARK-40434][SS][PYTHON] Implement applyInPandasWithState in PySpark

2022-09-19 Thread GitBox


alex-balikov commented on code in PR #37893:
URL: https://github.com/apache/spark/pull/37893#discussion_r974517188


##
python/pyspark/worker.py:
##
@@ -361,6 +429,32 @@ def read_udfs(pickleSer, infile, eval_type):
 
 if eval_type == PythonEvalType.SQL_COGROUPED_MAP_PANDAS_UDF:
 ser = CogroupUDFSerializer(timezone, safecheck, 
assign_cols_by_name)
+elif eval_type == PythonEvalType.SQL_GROUPED_MAP_PANDAS_UDF_WITH_STATE:
+soft_limit_bytes_per_batch = runner_conf.get(
+
"spark.sql.execution.applyInPandasWithState.softLimitSizePerBatch",
+(64 * 1024 * 1024),
+)
+soft_limit_bytes_per_batch = int(soft_limit_bytes_per_batch)
+
+min_data_count_for_sample = runner_conf.get(
+
"spark.sql.execution.applyInPandasWithState.minDataCountForSample", 100

Review Comment:
   similar comment about the property names and default values here and 
everywhere else - can they be defined in a more prominent place 



##
python/pyspark/worker.py:
##
@@ -361,6 +429,32 @@ def read_udfs(pickleSer, infile, eval_type):
 
 if eval_type == PythonEvalType.SQL_COGROUPED_MAP_PANDAS_UDF:
 ser = CogroupUDFSerializer(timezone, safecheck, 
assign_cols_by_name)
+elif eval_type == PythonEvalType.SQL_GROUPED_MAP_PANDAS_UDF_WITH_STATE:
+soft_limit_bytes_per_batch = runner_conf.get(
+
"spark.sql.execution.applyInPandasWithState.softLimitSizePerBatch",
+(64 * 1024 * 1024),

Review Comment:
   can the default be value be defined in some more prominent place? Also the 
property names.



##
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala:
##
@@ -311,6 +323,56 @@ object UnsupportedOperationChecker extends Logging {
 }
   }
 
+// applyInPandasWithState
+case m: FlatMapGroupsInPandasWithState if m.isStreaming =>
+  // Check compatibility with output modes and aggregations in query
+  val aggsInQuery = collectStreamingAggregates(plan)
+
+  if (aggsInQuery.isEmpty) {
+// applyInPandasWithState without aggregation: operation's output 
mode must

Review Comment:
   why do we even have operation output mode. We are defining a new api, can we 
just drop this parameter from the api if we are going to be enforcing for it t 
match the output mode?



##
python/pyspark/worker.py:
##
@@ -361,6 +429,32 @@ def read_udfs(pickleSer, infile, eval_type):
 
 if eval_type == PythonEvalType.SQL_COGROUPED_MAP_PANDAS_UDF:
 ser = CogroupUDFSerializer(timezone, safecheck, 
assign_cols_by_name)
+elif eval_type == PythonEvalType.SQL_GROUPED_MAP_PANDAS_UDF_WITH_STATE:
+soft_limit_bytes_per_batch = runner_conf.get(
+
"spark.sql.execution.applyInPandasWithState.softLimitSizePerBatch",
+(64 * 1024 * 1024),
+)
+soft_limit_bytes_per_batch = int(soft_limit_bytes_per_batch)
+
+min_data_count_for_sample = runner_conf.get(
+
"spark.sql.execution.applyInPandasWithState.minDataCountForSample", 100
+)
+min_data_count_for_sample = int(min_data_count_for_sample)
+
+soft_timeout_millis_purge_batch = runner_conf.get(
+
"spark.sql.execution.applyInPandasWithState.softTimeoutPurgeBatch", 100

Review Comment:
   same



##
python/pyspark/worker.py:
##
@@ -207,6 +209,65 @@ def wrapped(key_series, value_series):
 return lambda k, v: [(wrapped(k, v), to_arrow_type(return_type))]
 
 
+def wrap_grouped_map_pandas_udf_with_state(f, return_type):
+def wrapped(key_series, value_series_gen, state):
+import pandas as pd
+
+key = tuple(s[0] for s in key_series)
+
+if state.hasTimedOut:
+# Timeout processing pass empty iterator. Here we return an empty 
DataFrame instead.
+values = [
+pd.DataFrame(columns=pd.concat(next(value_series_gen), 
axis=1).columns),
+]
+else:
+values = (pd.concat(x, axis=1) for x in value_series_gen)
+
+result_iter = f(key, values, state)
+
+def verify_element(result):
+if not isinstance(result, pd.DataFrame):
+raise TypeError(
+"The type of element in return iterator of the 
user-defined function "
+"should be pandas.DataFrame, but is 
{}".format(type(result))
+)
+# the number of columns of result have to match the return type
+# but it is fine for result to have no columns at all if it is 
empty
+if not (

Review Comment:
   if not ... ?



##
python/pyspark/worker.py:
##
@@ -486,6 +580,35 @@ def mapper(a):
 vals = [a[o] for o in parsed_offsets[0][1]]

[GitHub] [spark] alex-balikov commented on a diff in pull request #37893: [SPARK-40434][SS][PYTHON] Implement applyInPandasWithState in PySpark

2022-09-18 Thread GitBox


alex-balikov commented on code in PR #37893:
URL: https://github.com/apache/spark/pull/37893#discussion_r973844357


##
python/pyspark/sql/pandas/serializers.py:
##
@@ -371,3 +375,292 @@ def load_stream(self, stream):
 raise ValueError(
 "Invalid number of pandas.DataFrames in group 
{0}".format(dataframes_in_group)
 )
+
+
+class ApplyInPandasWithStateSerializer(ArrowStreamPandasUDFSerializer):

Review Comment:
   can we have some class and method level comments here.



##
python/pyspark/sql/pandas/serializers.py:
##
@@ -371,3 +375,292 @@ def load_stream(self, stream):
 raise ValueError(
 "Invalid number of pandas.DataFrames in group 
{0}".format(dataframes_in_group)
 )
+
+
+class ApplyInPandasWithStateSerializer(ArrowStreamPandasUDFSerializer):
+def __init__(
+self,
+timezone,
+safecheck,
+assign_cols_by_name,
+state_object_schema,
+soft_limit_bytes_per_batch,
+min_data_count_for_sample,
+soft_timeout_millis_purge_batch,
+):
+super(ApplyInPandasWithStateSerializer, self).__init__(
+timezone, safecheck, assign_cols_by_name
+)
+self.pickleSer = CPickleSerializer()
+self.utf8_deserializer = UTF8Deserializer()
+self.state_object_schema = state_object_schema
+
+self.result_state_df_type = StructType(
+[
+StructField("properties", StringType()),
+StructField("keyRowAsUnsafe", BinaryType()),
+StructField("object", BinaryType()),
+StructField("oldTimeoutTimestamp", LongType()),
+]
+)
+
+self.result_state_pdf_arrow_type = 
to_arrow_type(self.result_state_df_type)
+self.soft_limit_bytes_per_batch = soft_limit_bytes_per_batch
+self.min_data_count_for_sample = min_data_count_for_sample
+self.soft_timeout_millis_purge_batch = soft_timeout_millis_purge_batch
+
+def load_stream(self, stream):

Review Comment:
   method level comments here and everywhere else. specifically the parameters 
being untyped, a comment about the parameter type will be helpful.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] alex-balikov commented on a diff in pull request #37893: [SPARK-40434][SS][PYTHON] Implement applyInPandasWithState in PySpark

2022-09-16 Thread GitBox


alex-balikov commented on code in PR #37893:
URL: https://github.com/apache/spark/pull/37893#discussion_r973478091


##
python/pyspark/sql/pandas/group_ops.py:
##
@@ -216,6 +218,105 @@ def applyInPandas(
 jdf = self._jgd.flatMapGroupsInPandas(udf_column._jc.expr())
 return DataFrame(jdf, self.session)
 
+def applyInPandasWithState(
+self,
+func: "PandasGroupedMapFunctionWithState",
+outputStructType: Union[StructType, str],
+stateStructType: Union[StructType, str],
+outputMode: str,
+timeoutConf: str,
+) -> DataFrame:
+"""
+Applies the given function to each group of data, while maintaining a 
user-defined
+per-group state. The result Dataset will represent the flattened 
record returned by the
+function.
+
+For a streaming Dataset, the function will be invoked for each group 
repeatedly in every
+trigger, and updates to each group's state will be saved across 
invocations. The function
+will also be invoked for each timed-out state repeatedly. The sequence 
of the invocation
+will be input data -> state timeout. When the function is invoked for 
state timeout, there
+will be no data being presented.
+
+The function should takes parameters (key, 
Iterator[`pandas.DataFrame`], state) and
+returns another Iterator[`pandas.DataFrame`]. The grouping key(s) will 
be passed as a tuple
+of numpy data types, e.g., `numpy.int32` and `numpy.float64`. The 
state will be passed as
+:class:`pyspark.sql.streaming.state.GroupStateImpl`.
+
+For each group, all columns are passed together as `pandas.DataFrame` 
to the user-function,
+and the returned `pandas.DataFrame` across all invocations are 
combined as a
+:class:`DataFrame`. Note that the user function should loop through 
and process all
+elements in the iterator. The user function should not make a guess of 
the number of
+elements in the iterator.
+
+The `outputStructType` should be a :class:`StructType` describing the 
schema of all
+elements in returned value, `pandas.DataFrame`. The column labels of 
all elements in
+returned value, `pandas.DataFrame` must either match the field names 
in the defined
+schema if specified as strings, or match the field data types by 
position if not strings,
+e.g. integer indices.
+
+The `stateStructType` should be :class:`StructType` describing the 
schema of user-defined
+state. The value of state will be presented as a tuple, as well as the 
update should be
+performed with the tuple. User defined types e.g. native Python class 
types are not
+supported. Alternatively, you can pickle the data and produce the data 
as BinaryType, but

Review Comment:
   'Alternatively, you can pickle the data ...' - instead say
   
   'For such cases, the user should pickle the data into BinaryType. Note that 
this approach may be sensitive to backwards and forward compatibility issues of 
Python picks and Spark can not guarantee compatibility.
   
   though I think you could drop the note as that is orthogonal to Spark.



##
python/pyspark/sql/pandas/group_ops.py:
##
@@ -216,6 +218,105 @@ def applyInPandas(
 jdf = self._jgd.flatMapGroupsInPandas(udf_column._jc.expr())
 return DataFrame(jdf, self.session)
 
+def applyInPandasWithState(
+self,
+func: "PandasGroupedMapFunctionWithState",
+outputStructType: Union[StructType, str],
+stateStructType: Union[StructType, str],
+outputMode: str,
+timeoutConf: str,
+) -> DataFrame:
+"""
+Applies the given function to each group of data, while maintaining a 
user-defined
+per-group state. The result Dataset will represent the flattened 
record returned by the
+function.
+
+For a streaming Dataset, the function will be invoked for each group 
repeatedly in every
+trigger, and updates to each group's state will be saved across 
invocations. The function
+will also be invoked for each timed-out state repeatedly. The sequence 
of the invocation
+will be input data -> state timeout. When the function is invoked for 
state timeout, there
+will be no data being presented.
+
+The function should takes parameters (key, 
Iterator[`pandas.DataFrame`], state) and
+returns another Iterator[`pandas.DataFrame`]. The grouping key(s) will 
be passed as a tuple
+of numpy data types, e.g., `numpy.int32` and `numpy.float64`. The 
state will be passed as
+:class:`pyspark.sql.streaming.state.GroupStateImpl`.
+
+For each group, all columns are passed together as `pandas.DataFrame` 
to the user-function,
+and the returned `pandas.DataFrame` across all invocations are 
combined as a
+:class:`DataFrame`. Note that