[GitHub] [spark] HyukjinKwon commented on a change in pull request #24095: [SPARK-27163][PYTHON] Cleanup and consolidate Pandas UDF functionality
HyukjinKwon commented on a change in pull request #24095: [SPARK-27163][PYTHON] Cleanup and consolidate Pandas UDF functionality URL: https://github.com/apache/spark/pull/24095#discussion_r267659568 ## File path: python/pyspark/sql/session.py ## @@ -530,15 +530,29 @@ def _create_from_pandas_with_arrow(self, pdf, schema, timezone): to Arrow data, then sending to the JVM to parallelize. If a schema is passed in, the data types will be used to coerce the data in Pandas to Arrow conversion. """ -from pyspark.serializers import ArrowStreamSerializer, _create_batch -from pyspark.sql.types import from_arrow_schema, to_arrow_type, TimestampType +from distutils.version import LooseVersion +from pyspark.serializers import ArrowStreamPandasSerializer +from pyspark.sql.types import from_arrow_type, to_arrow_type, TimestampType from pyspark.sql.utils import require_minimum_pandas_version, \ require_minimum_pyarrow_version require_minimum_pandas_version() require_minimum_pyarrow_version() from pandas.api.types import is_datetime64_dtype, is_datetime64tz_dtype +import pyarrow as pa + +# Create the Spark schema from list of names passed in with Arrow types +if isinstance(schema, (list, tuple)): +if LooseVersion(pa.__version__) < LooseVersion("0.12.0"): +temp_batch = pa.RecordBatch.from_pandas(pdf[0:100], preserve_index=False) Review comment: Okie This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HyukjinKwon commented on a change in pull request #24095: [SPARK-27163][PYTHON] Cleanup and consolidate Pandas UDF functionality
HyukjinKwon commented on a change in pull request #24095: [SPARK-27163][PYTHON] Cleanup and consolidate Pandas UDF functionality URL: https://github.com/apache/spark/pull/24095#discussion_r267178326 ## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/python/FlatMapGroupsInPandasExec.scala ## @@ -145,7 +146,15 @@ case class FlatMapGroupsInPandasExec( sessionLocalTimeZone, pythonRunnerConf).compute(grouped, context.partitionId(), context) - columnarBatchIter.flatMap(_.rowIterator.asScala).map(UnsafeProjection.create(output, output)) + columnarBatchIter.flatMap { batch => +// Grouped Map UDF returns a StructType column in ColumnarBatch, select the children here +// TODO: ColumnVector getChild is protected, so use ArrowColumnVector which is public +val structVector = batch.column(0).asInstanceOf[ArrowColumnVector] +val outputVectors = output.indices.map(structVector.getChild(_).asInstanceOf[ColumnVector]) +val flattenedBatch = new ColumnarBatch(outputVectors.toArray) +flattenedBatch.setNumRows(batch.numRows()) +flattenedBatch.rowIterator.asScala + }.map(UnsafeProjection.create(output, output)) Review comment: Oh, also, let's make a separate variable for `UnsafeProjection.create(output, output)`. I think we talked before in my PR 😉 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HyukjinKwon commented on a change in pull request #24095: [SPARK-27163][PYTHON] Cleanup and consolidate Pandas UDF functionality
HyukjinKwon commented on a change in pull request #24095: [SPARK-27163][PYTHON] Cleanup and consolidate Pandas UDF functionality URL: https://github.com/apache/spark/pull/24095#discussion_r267178326 ## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/python/FlatMapGroupsInPandasExec.scala ## @@ -145,7 +146,15 @@ case class FlatMapGroupsInPandasExec( sessionLocalTimeZone, pythonRunnerConf).compute(grouped, context.partitionId(), context) - columnarBatchIter.flatMap(_.rowIterator.asScala).map(UnsafeProjection.create(output, output)) + columnarBatchIter.flatMap { batch => +// Grouped Map UDF returns a StructType column in ColumnarBatch, select the children here +// TODO: ColumnVector getChild is protected, so use ArrowColumnVector which is public +val structVector = batch.column(0).asInstanceOf[ArrowColumnVector] +val outputVectors = output.indices.map(structVector.getChild(_).asInstanceOf[ColumnVector]) +val flattenedBatch = new ColumnarBatch(outputVectors.toArray) +flattenedBatch.setNumRows(batch.numRows()) +flattenedBatch.rowIterator.asScala + }.map(UnsafeProjection.create(output, output)) Review comment: Oh, also, let's make a separate variable for `UnsafeProjection.create(output, output)`. I talked we talked before in my PR 😉 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HyukjinKwon commented on a change in pull request #24095: [SPARK-27163][PYTHON] Cleanup and consolidate Pandas UDF functionality
HyukjinKwon commented on a change in pull request #24095: [SPARK-27163][PYTHON] Cleanup and consolidate Pandas UDF functionality URL: https://github.com/apache/spark/pull/24095#discussion_r265815684 ## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/python/FlatMapGroupsInPandasExec.scala ## @@ -145,7 +146,15 @@ case class FlatMapGroupsInPandasExec( sessionLocalTimeZone, pythonRunnerConf).compute(grouped, context.partitionId(), context) - columnarBatchIter.flatMap(_.rowIterator.asScala).map(UnsafeProjection.create(output, output)) + columnarBatchIter.flatMap { batch => +// Grouped Map UDF returns a StructType column in ColumnarBatch, select the children here +// TODO: ColumnVector getChild is protected, so use ArrowColumnVector which is public +val structVector = batch.column(0).asInstanceOf[ArrowColumnVector] +val outputVectors = output.indices.map(structVector.getChild(_).asInstanceOf[ColumnVector]) Review comment: Another concern is tho .. I think all of Arrow implementations (including SparkR ones) dont modify the batch's outputs but use the batch as are. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HyukjinKwon commented on a change in pull request #24095: [SPARK-27163][PYTHON] Cleanup and consolidate Pandas UDF functionality
HyukjinKwon commented on a change in pull request #24095: [SPARK-27163][PYTHON] Cleanup and consolidate Pandas UDF functionality URL: https://github.com/apache/spark/pull/24095#discussion_r265815239 ## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/python/FlatMapGroupsInPandasExec.scala ## @@ -145,7 +146,15 @@ case class FlatMapGroupsInPandasExec( sessionLocalTimeZone, pythonRunnerConf).compute(grouped, context.partitionId(), context) - columnarBatchIter.flatMap(_.rowIterator.asScala).map(UnsafeProjection.create(output, output)) + columnarBatchIter.flatMap { batch => +// Grouped Map UDF returns a StructType column in ColumnarBatch, select the children here +// TODO: ColumnVector getChild is protected, so use ArrowColumnVector which is public +val structVector = batch.column(0).asInstanceOf[ArrowColumnVector] +val outputVectors = output.indices.map(structVector.getChild(_).asInstanceOf[ColumnVector]) Review comment: I think the logic itself is fine. But doesn't this mean we cannot support nested structs in grouped map Pandas UDFs? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HyukjinKwon commented on a change in pull request #24095: [SPARK-27163][PYTHON] Cleanup and consolidate Pandas UDF functionality
HyukjinKwon commented on a change in pull request #24095: [SPARK-27163][PYTHON] Cleanup and consolidate Pandas UDF functionality URL: https://github.com/apache/spark/pull/24095#discussion_r265812341 ## File path: python/pyspark/worker.py ## @@ -123,15 +120,9 @@ def wrapped(key_series, value_series): "Number of columns of the returned pandas.DataFrame " "doesn't match specified schema. " "Expected: {} Actual: {}".format(len(return_type), len(result.columns))) +return result -# Assign result columns by schema name if user labeled with strings, else use position -if assign_cols_by_name and any(isinstance(name, basestring) for name in result.columns): Review comment: Oops, okie. The logic was actually duplicated with `_create_batch`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HyukjinKwon commented on a change in pull request #24095: [SPARK-27163][PYTHON] Cleanup and consolidate Pandas UDF functionality
HyukjinKwon commented on a change in pull request #24095: [SPARK-27163][PYTHON] Cleanup and consolidate Pandas UDF functionality URL: https://github.com/apache/spark/pull/24095#discussion_r265810895 ## File path: python/pyspark/serializers.py ## @@ -352,28 +364,17 @@ def dump_stream(self, iterator, stream): Make ArrowRecordBatches from Pandas Series and serialize. Input is a single series or a list of series accompanied by an optional pyarrow type to coerce the data to. """ -import pyarrow as pa -writer = None -try: -for series in iterator: -batch = _create_batch(series, self._timezone, self._safecheck, - self._assign_cols_by_name) -if writer is None: -write_int(SpecialLengths.START_ARROW_STREAM, stream) -writer = pa.RecordBatchStreamWriter(stream, batch.schema) -writer.write_batch(batch) -finally: -if writer is not None: -writer.close() +batches = (_create_batch(series, self._timezone, self._safecheck, self._assign_cols_by_name) + for series in iterator) +super(ArrowStreamPandasSerializer, self).dump_stream(batches, stream) def load_stream(self, stream): """ Deserialize ArrowRecordBatches to an Arrow table and return as a list of pandas.Series. """ +batch_iter = super(ArrowStreamPandasSerializer, self).load_stream(stream) Review comment: not a big deal but I would name it `batches` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HyukjinKwon commented on a change in pull request #24095: [SPARK-27163][PYTHON] Cleanup and consolidate Pandas UDF functionality
HyukjinKwon commented on a change in pull request #24095: [SPARK-27163][PYTHON] Cleanup and consolidate Pandas UDF functionality URL: https://github.com/apache/spark/pull/24095#discussion_r265810573 ## File path: python/pyspark/serializers.py ## @@ -352,28 +364,17 @@ def dump_stream(self, iterator, stream): Make ArrowRecordBatches from Pandas Series and serialize. Input is a single series or a list of series accompanied by an optional pyarrow type to coerce the data to. """ -import pyarrow as pa -writer = None -try: -for series in iterator: -batch = _create_batch(series, self._timezone, self._safecheck, - self._assign_cols_by_name) -if writer is None: -write_int(SpecialLengths.START_ARROW_STREAM, stream) -writer = pa.RecordBatchStreamWriter(stream, batch.schema) -writer.write_batch(batch) -finally: -if writer is not None: -writer.close() +batches = (_create_batch(series, self._timezone, self._safecheck, self._assign_cols_by_name) + for series in iterator) Review comment: Hm ... @BryanCutler, seems `_init_dump_stream` was added to handle `write_int(SpecialLengths.START_ARROW_STREAM, stream)` this case alone TBH. Could we just only pull out `write_int(SpecialLengths.START_ARROW_STREAM, stream)` into here and remove `_init_dump_stream` to make this logic isolated here? It looks tricky to do it since it's all generators. I thought we could at least do something like: ```python batches = (_create_batch(series, self._timezone, self._safecheck, self._assign_cols_by_name) for series in iterator) def arrow_start_written_batches(): should_write_start_length = True for batch in batches: if should_write_start_length: write_int(SpecialLengths.START_ARROW_STREAM, stream) should_write_start_length = False yield batch super(ArrowStreamPandasSerializer, self).dump_stream(arrow_start_written_batches(), stream) ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HyukjinKwon commented on a change in pull request #24095: [SPARK-27163][PYTHON] Cleanup and consolidate Pandas UDF functionality
HyukjinKwon commented on a change in pull request #24095: [SPARK-27163][PYTHON] Cleanup and consolidate Pandas UDF functionality URL: https://github.com/apache/spark/pull/24095#discussion_r265806346 ## File path: python/pyspark/worker.py ## @@ -123,15 +120,9 @@ def wrapped(key_series, value_series): "Number of columns of the returned pandas.DataFrame " "doesn't match specified schema. " "Expected: {} Actual: {}".format(len(return_type), len(result.columns))) +return result -# Assign result columns by schema name if user labeled with strings, else use position -if assign_cols_by_name and any(isinstance(name, basestring) for name in result.columns): Review comment: Eh, @BryanCutler, sorry if I runshed to read but where did this logic go? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org