[GitHub] [spark] HyukjinKwon commented on a change in pull request #24095: [SPARK-27163][PYTHON] Cleanup and consolidate Pandas UDF functionality

2019-03-21 Thread GitBox
HyukjinKwon commented on a change in pull request #24095: [SPARK-27163][PYTHON] 
Cleanup and consolidate Pandas UDF functionality
URL: https://github.com/apache/spark/pull/24095#discussion_r267659568
 
 

 ##
 File path: python/pyspark/sql/session.py
 ##
 @@ -530,15 +530,29 @@ def _create_from_pandas_with_arrow(self, pdf, schema, 
timezone):
 to Arrow data, then sending to the JVM to parallelize. If a schema is 
passed in, the
 data types will be used to coerce the data in Pandas to Arrow 
conversion.
 """
-from pyspark.serializers import ArrowStreamSerializer, _create_batch
-from pyspark.sql.types import from_arrow_schema, to_arrow_type, 
TimestampType
+from distutils.version import LooseVersion
+from pyspark.serializers import ArrowStreamPandasSerializer
+from pyspark.sql.types import from_arrow_type, to_arrow_type, 
TimestampType
 from pyspark.sql.utils import require_minimum_pandas_version, \
 require_minimum_pyarrow_version
 
 require_minimum_pandas_version()
 require_minimum_pyarrow_version()
 
 from pandas.api.types import is_datetime64_dtype, is_datetime64tz_dtype
+import pyarrow as pa
+
+# Create the Spark schema from list of names passed in with Arrow types
+if isinstance(schema, (list, tuple)):
+if LooseVersion(pa.__version__) < LooseVersion("0.12.0"):
+temp_batch = pa.RecordBatch.from_pandas(pdf[0:100], 
preserve_index=False)
 
 Review comment:
   Okie


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] HyukjinKwon commented on a change in pull request #24095: [SPARK-27163][PYTHON] Cleanup and consolidate Pandas UDF functionality

2019-03-21 Thread GitBox
HyukjinKwon commented on a change in pull request #24095: [SPARK-27163][PYTHON] 
Cleanup and consolidate Pandas UDF functionality
URL: https://github.com/apache/spark/pull/24095#discussion_r267178326
 
 

 ##
 File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/python/FlatMapGroupsInPandasExec.scala
 ##
 @@ -145,7 +146,15 @@ case class FlatMapGroupsInPandasExec(
 sessionLocalTimeZone,
 pythonRunnerConf).compute(grouped, context.partitionId(), context)
 
-  
columnarBatchIter.flatMap(_.rowIterator.asScala).map(UnsafeProjection.create(output,
 output))
+  columnarBatchIter.flatMap { batch =>
+// Grouped Map UDF returns a StructType column in ColumnarBatch, 
select the children here
+// TODO: ColumnVector getChild is protected, so use ArrowColumnVector 
which is public
+val structVector = batch.column(0).asInstanceOf[ArrowColumnVector]
+val outputVectors = 
output.indices.map(structVector.getChild(_).asInstanceOf[ColumnVector])
+val flattenedBatch = new ColumnarBatch(outputVectors.toArray)
+flattenedBatch.setNumRows(batch.numRows())
+flattenedBatch.rowIterator.asScala
+  }.map(UnsafeProjection.create(output, output))
 
 Review comment:
   Oh, also, let's make a separate variable for 
`UnsafeProjection.create(output, output)`. I think we talked before in my PR 😉


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] HyukjinKwon commented on a change in pull request #24095: [SPARK-27163][PYTHON] Cleanup and consolidate Pandas UDF functionality

2019-03-19 Thread GitBox
HyukjinKwon commented on a change in pull request #24095: [SPARK-27163][PYTHON] 
Cleanup and consolidate Pandas UDF functionality
URL: https://github.com/apache/spark/pull/24095#discussion_r267178326
 
 

 ##
 File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/python/FlatMapGroupsInPandasExec.scala
 ##
 @@ -145,7 +146,15 @@ case class FlatMapGroupsInPandasExec(
 sessionLocalTimeZone,
 pythonRunnerConf).compute(grouped, context.partitionId(), context)
 
-  
columnarBatchIter.flatMap(_.rowIterator.asScala).map(UnsafeProjection.create(output,
 output))
+  columnarBatchIter.flatMap { batch =>
+// Grouped Map UDF returns a StructType column in ColumnarBatch, 
select the children here
+// TODO: ColumnVector getChild is protected, so use ArrowColumnVector 
which is public
+val structVector = batch.column(0).asInstanceOf[ArrowColumnVector]
+val outputVectors = 
output.indices.map(structVector.getChild(_).asInstanceOf[ColumnVector])
+val flattenedBatch = new ColumnarBatch(outputVectors.toArray)
+flattenedBatch.setNumRows(batch.numRows())
+flattenedBatch.rowIterator.asScala
+  }.map(UnsafeProjection.create(output, output))
 
 Review comment:
   Oh, also, let's make a separate variable for 
`UnsafeProjection.create(output, output)`. I talked we talked before in my PR 😉


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] HyukjinKwon commented on a change in pull request #24095: [SPARK-27163][PYTHON] Cleanup and consolidate Pandas UDF functionality

2019-03-14 Thread GitBox
HyukjinKwon commented on a change in pull request #24095: [SPARK-27163][PYTHON] 
Cleanup and consolidate Pandas UDF functionality
URL: https://github.com/apache/spark/pull/24095#discussion_r265815684
 
 

 ##
 File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/python/FlatMapGroupsInPandasExec.scala
 ##
 @@ -145,7 +146,15 @@ case class FlatMapGroupsInPandasExec(
 sessionLocalTimeZone,
 pythonRunnerConf).compute(grouped, context.partitionId(), context)
 
-  
columnarBatchIter.flatMap(_.rowIterator.asScala).map(UnsafeProjection.create(output,
 output))
+  columnarBatchIter.flatMap { batch =>
+// Grouped Map UDF returns a StructType column in ColumnarBatch, 
select the children here
+// TODO: ColumnVector getChild is protected, so use ArrowColumnVector 
which is public
+val structVector = batch.column(0).asInstanceOf[ArrowColumnVector]
+val outputVectors = 
output.indices.map(structVector.getChild(_).asInstanceOf[ColumnVector])
 
 Review comment:
   Another concern is tho .. I think all of Arrow implementations (including 
SparkR ones) dont modify the batch's outputs but use the batch as are.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] HyukjinKwon commented on a change in pull request #24095: [SPARK-27163][PYTHON] Cleanup and consolidate Pandas UDF functionality

2019-03-14 Thread GitBox
HyukjinKwon commented on a change in pull request #24095: [SPARK-27163][PYTHON] 
Cleanup and consolidate Pandas UDF functionality
URL: https://github.com/apache/spark/pull/24095#discussion_r265815239
 
 

 ##
 File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/python/FlatMapGroupsInPandasExec.scala
 ##
 @@ -145,7 +146,15 @@ case class FlatMapGroupsInPandasExec(
 sessionLocalTimeZone,
 pythonRunnerConf).compute(grouped, context.partitionId(), context)
 
-  
columnarBatchIter.flatMap(_.rowIterator.asScala).map(UnsafeProjection.create(output,
 output))
+  columnarBatchIter.flatMap { batch =>
+// Grouped Map UDF returns a StructType column in ColumnarBatch, 
select the children here
+// TODO: ColumnVector getChild is protected, so use ArrowColumnVector 
which is public
+val structVector = batch.column(0).asInstanceOf[ArrowColumnVector]
+val outputVectors = 
output.indices.map(structVector.getChild(_).asInstanceOf[ColumnVector])
 
 Review comment:
   I think the logic itself is fine. But doesn't this mean we cannot support 
nested structs in grouped map Pandas UDFs?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] HyukjinKwon commented on a change in pull request #24095: [SPARK-27163][PYTHON] Cleanup and consolidate Pandas UDF functionality

2019-03-14 Thread GitBox
HyukjinKwon commented on a change in pull request #24095: [SPARK-27163][PYTHON] 
Cleanup and consolidate Pandas UDF functionality
URL: https://github.com/apache/spark/pull/24095#discussion_r265812341
 
 

 ##
 File path: python/pyspark/worker.py
 ##
 @@ -123,15 +120,9 @@ def wrapped(key_series, value_series):
 "Number of columns of the returned pandas.DataFrame "
 "doesn't match specified schema. "
 "Expected: {} Actual: {}".format(len(return_type), 
len(result.columns)))
+return result
 
-# Assign result columns by schema name if user labeled with strings, 
else use position
-if assign_cols_by_name and any(isinstance(name, basestring) for name 
in result.columns):
 
 Review comment:
   Oops, okie. The logic was actually duplicated with `_create_batch`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] HyukjinKwon commented on a change in pull request #24095: [SPARK-27163][PYTHON] Cleanup and consolidate Pandas UDF functionality

2019-03-14 Thread GitBox
HyukjinKwon commented on a change in pull request #24095: [SPARK-27163][PYTHON] 
Cleanup and consolidate Pandas UDF functionality
URL: https://github.com/apache/spark/pull/24095#discussion_r265810895
 
 

 ##
 File path: python/pyspark/serializers.py
 ##
 @@ -352,28 +364,17 @@ def dump_stream(self, iterator, stream):
 Make ArrowRecordBatches from Pandas Series and serialize. Input is a 
single series or
 a list of series accompanied by an optional pyarrow type to coerce the 
data to.
 """
-import pyarrow as pa
-writer = None
-try:
-for series in iterator:
-batch = _create_batch(series, self._timezone, self._safecheck,
-  self._assign_cols_by_name)
-if writer is None:
-write_int(SpecialLengths.START_ARROW_STREAM, stream)
-writer = pa.RecordBatchStreamWriter(stream, batch.schema)
-writer.write_batch(batch)
-finally:
-if writer is not None:
-writer.close()
+batches = (_create_batch(series, self._timezone, self._safecheck, 
self._assign_cols_by_name)
+   for series in iterator)
+super(ArrowStreamPandasSerializer, self).dump_stream(batches, stream)
 
 def load_stream(self, stream):
 """
 Deserialize ArrowRecordBatches to an Arrow table and return as a list 
of pandas.Series.
 """
+batch_iter = super(ArrowStreamPandasSerializer, 
self).load_stream(stream)
 
 Review comment:
   not a big deal but I would name it `batches`


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] HyukjinKwon commented on a change in pull request #24095: [SPARK-27163][PYTHON] Cleanup and consolidate Pandas UDF functionality

2019-03-14 Thread GitBox
HyukjinKwon commented on a change in pull request #24095: [SPARK-27163][PYTHON] 
Cleanup and consolidate Pandas UDF functionality
URL: https://github.com/apache/spark/pull/24095#discussion_r265810573
 
 

 ##
 File path: python/pyspark/serializers.py
 ##
 @@ -352,28 +364,17 @@ def dump_stream(self, iterator, stream):
 Make ArrowRecordBatches from Pandas Series and serialize. Input is a 
single series or
 a list of series accompanied by an optional pyarrow type to coerce the 
data to.
 """
-import pyarrow as pa
-writer = None
-try:
-for series in iterator:
-batch = _create_batch(series, self._timezone, self._safecheck,
-  self._assign_cols_by_name)
-if writer is None:
-write_int(SpecialLengths.START_ARROW_STREAM, stream)
-writer = pa.RecordBatchStreamWriter(stream, batch.schema)
-writer.write_batch(batch)
-finally:
-if writer is not None:
-writer.close()
+batches = (_create_batch(series, self._timezone, self._safecheck, 
self._assign_cols_by_name)
+   for series in iterator)
 
 Review comment:
   Hm ... @BryanCutler, seems `_init_dump_stream` was added to handle 
`write_int(SpecialLengths.START_ARROW_STREAM, stream)` this case alone TBH. 
Could we just only pull out `write_int(SpecialLengths.START_ARROW_STREAM, 
stream)` into here and remove `_init_dump_stream` to make this logic isolated 
here?
   
   It looks tricky to do it since it's all generators. I thought we could at 
least do something like:
   
   ```python
   batches = (_create_batch(series, self._timezone, self._safecheck, 
self._assign_cols_by_name)
  for series in iterator)
   def arrow_start_written_batches():
   should_write_start_length = True
   for batch in batches:
   if should_write_start_length:
   write_int(SpecialLengths.START_ARROW_STREAM, stream)
   should_write_start_length = False
   yield batch
   super(ArrowStreamPandasSerializer, 
self).dump_stream(arrow_start_written_batches(), stream)
   ```


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] HyukjinKwon commented on a change in pull request #24095: [SPARK-27163][PYTHON] Cleanup and consolidate Pandas UDF functionality

2019-03-14 Thread GitBox
HyukjinKwon commented on a change in pull request #24095: [SPARK-27163][PYTHON] 
Cleanup and consolidate Pandas UDF functionality
URL: https://github.com/apache/spark/pull/24095#discussion_r265806346
 
 

 ##
 File path: python/pyspark/worker.py
 ##
 @@ -123,15 +120,9 @@ def wrapped(key_series, value_series):
 "Number of columns of the returned pandas.DataFrame "
 "doesn't match specified schema. "
 "Expected: {} Actual: {}".format(len(return_type), 
len(result.columns)))
+return result
 
-# Assign result columns by schema name if user labeled with strings, 
else use position
-if assign_cols_by_name and any(isinstance(name, basestring) for name 
in result.columns):
 
 Review comment:
   Eh, @BryanCutler, sorry if I runshed to read but where did this logic go?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org