[GitHub] spark pull request #21427: [SPARK-24324][PYTHON] Pandas Grouped Map UDF shou...

2018-06-23 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/21427


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21427: [SPARK-24324][PYTHON] Pandas Grouped Map UDF shou...

2018-06-22 Thread icexelloss
Github user icexelloss commented on a diff in the pull request:

https://github.com/apache/spark/pull/21427#discussion_r197527013
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowUtils.scala 
---
@@ -120,4 +121,19 @@ object ArrowUtils {
   StructField(field.getName, dt, field.isNullable)
 })
   }
+
+  /** Return Map with conf settings to be used in ArrowPythonRunner */
+  def getPythonRunnerConfMap(conf: SQLConf): Map[String, String] = {
+val timeZoneConf = if (conf.pandasRespectSessionTimeZone) {
+  Seq(SQLConf.SESSION_LOCAL_TIMEZONE.key -> conf.sessionLocalTimeZone)
+} else {
+  Nil
+}
+val pandasColsByPosition = if 
(conf.pandasGroupedMapAssignColumnssByPosition) {
--- End diff --

I am sorry can you explain why it's easier to process in the worker?

Isn't it just removing the default value here:

https://github.com/apache/spark/pull/21427/files#diff-d33eea00c68dfd120f4ceae6381f34cdR99

Also one thing is not great about omitting the conf for default case is 
that you need to put the default value in two places..(both python and java)


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21427: [SPARK-24324][PYTHON] Pandas Grouped Map UDF shou...

2018-06-22 Thread icexelloss
Github user icexelloss commented on a diff in the pull request:

https://github.com/apache/spark/pull/21427#discussion_r197525704
  
--- Diff: python/pyspark/worker.py ---
@@ -110,9 +116,20 @@ def wrapped(key_series, value_series):
 "Number of columns of the returned pandas.DataFrame "
 "doesn't match specified schema. "
 "Expected: {} Actual: {}".format(len(return_type), 
len(result.columns)))
-arrow_return_types = (to_arrow_type(field.dataType) for field in 
return_type)
-return [(result[result.columns[i]], arrow_type)
-for i, arrow_type in enumerate(arrow_return_types)]
+
+if not assign_cols_by_pos:
+try:
+# Assign result columns by schema name
+return [(result[field.name], to_arrow_type(field.dataType))
+for field in return_type]
+except KeyError:
--- End diff --

I think `result.iloc[:,i]` and `result[result.columns[i]]` are the same, 
you don't have change it if you prefer `result.columns[i]`

I agree `to_arrow_type` doesn't throw `KeyError`,  but in general I feel 
it's more robust not to assume the implementation detail of `to_arrow_type`. I 
think the code is more concise and readable with if/else too (comparing to 
except KeyError)


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21427: [SPARK-24324][PYTHON] Pandas Grouped Map UDF shou...

2018-06-22 Thread icexelloss
Github user icexelloss commented on a diff in the pull request:

https://github.com/apache/spark/pull/21427#discussion_r197524629
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowPythonRunner.scala
 ---
@@ -58,18 +58,18 @@ class ArrowPythonRunner(
 new WriterThread(env, worker, inputIterator, partitionIndex, context) {
 
   protected override def writeCommand(dataOut: DataOutputStream): Unit 
= {
-PythonUDFRunner.writeUDFs(dataOut, funcs, argOffsets)
-if (respectTimeZone) {
-  PythonRDD.writeUTF(timeZoneId, dataOut)
-} else {
-  dataOut.writeInt(SpecialLengths.NULL)
+dataOut.writeInt(conf.size)
--- End diff --

Ok, SGTM.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21427: [SPARK-24324][PYTHON] Pandas Grouped Map UDF shou...

2018-06-22 Thread BryanCutler
Github user BryanCutler commented on a diff in the pull request:

https://github.com/apache/spark/pull/21427#discussion_r197510171
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowPythonRunner.scala
 ---
@@ -58,18 +58,18 @@ class ArrowPythonRunner(
 new WriterThread(env, worker, inputIterator, partitionIndex, context) {
 
   protected override def writeCommand(dataOut: DataOutputStream): Unit 
= {
-PythonUDFRunner.writeUDFs(dataOut, funcs, argOffsets)
-if (respectTimeZone) {
-  PythonRDD.writeUTF(timeZoneId, dataOut)
-} else {
-  dataOut.writeInt(SpecialLengths.NULL)
+dataOut.writeInt(conf.size)
--- End diff --

I think it's fine, but I will add some comments


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21427: [SPARK-24324][PYTHON] Pandas Grouped Map UDF shou...

2018-06-22 Thread BryanCutler
Github user BryanCutler commented on a diff in the pull request:

https://github.com/apache/spark/pull/21427#discussion_r197509839
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowEvalPythonExec.scala
 ---
@@ -63,7 +64,7 @@ case class ArrowEvalPythonExec(udfs: Seq[PythonUDF], 
output: Seq[Attribute], chi
 
   private val batchSize = conf.arrowMaxRecordsPerBatch
   private val sessionLocalTimeZone = conf.sessionLocalTimeZone
-  private val pandasRespectSessionTimeZone = 
conf.pandasRespectSessionTimeZone
+  private val runnerConf = ArrowUtils.getPythonRunnerConfMap(conf)
--- End diff --

ok


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21427: [SPARK-24324][PYTHON] Pandas Grouped Map UDF shou...

2018-06-22 Thread BryanCutler
Github user BryanCutler commented on a diff in the pull request:

https://github.com/apache/spark/pull/21427#discussion_r197509567
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowUtils.scala 
---
@@ -120,4 +121,19 @@ object ArrowUtils {
   StructField(field.getName, dt, field.isNullable)
 })
   }
+
+  /** Return Map with conf settings to be used in ArrowPythonRunner */
+  def getPythonRunnerConfMap(conf: SQLConf): Map[String, String] = {
+val timeZoneConf = if (conf.pandasRespectSessionTimeZone) {
+  Seq(SQLConf.SESSION_LOCAL_TIMEZONE.key -> conf.sessionLocalTimeZone)
+} else {
+  Nil
+}
+val pandasColsByPosition = if 
(conf.pandasGroupedMapAssignColumnssByPosition) {
--- End diff --

I think it's better to just omit the config for the default case, that way 
it's easier to process in the worker.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21427: [SPARK-24324][PYTHON] Pandas Grouped Map UDF shou...

2018-06-22 Thread BryanCutler
Github user BryanCutler commented on a diff in the pull request:

https://github.com/apache/spark/pull/21427#discussion_r197508262
  
--- Diff: python/pyspark/worker.py ---
@@ -110,9 +116,20 @@ def wrapped(key_series, value_series):
 "Number of columns of the returned pandas.DataFrame "
 "doesn't match specified schema. "
 "Expected: {} Actual: {}".format(len(return_type), 
len(result.columns)))
-arrow_return_types = (to_arrow_type(field.dataType) for field in 
return_type)
-return [(result[result.columns[i]], arrow_type)
-for i, arrow_type in enumerate(arrow_return_types)]
+
+if not assign_cols_by_pos:
+try:
+# Assign result columns by schema name
+return [(result[field.name], to_arrow_type(field.dataType))
+for field in return_type]
+except KeyError:
--- End diff --

This seems ok to me since it's basically the same, but I don't think we 
need to worry about `to_arrow_type` throwing a `KeyError`.  Is there any 
particular reason you suggested handling position like this?

```
[(result.iloc[:,i], to_arrow_type(field.dataType)) for i, field in 
enumerate(return_type)]
```

To me it seems better to look up by column labels, how it is currently

```
[(result[result.columns[i]], to_arrow_type(field.dataType))
for i, field in enumerate(return_type)]
```


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21427: [SPARK-24324][PYTHON] Pandas Grouped Map UDF shou...

2018-06-19 Thread icexelloss
Github user icexelloss commented on a diff in the pull request:

https://github.com/apache/spark/pull/21427#discussion_r196457433
  
--- Diff: python/pyspark/worker.py ---
@@ -110,9 +116,20 @@ def wrapped(key_series, value_series):
 "Number of columns of the returned pandas.DataFrame "
 "doesn't match specified schema. "
 "Expected: {} Actual: {}".format(len(return_type), 
len(result.columns)))
-arrow_return_types = (to_arrow_type(field.dataType) for field in 
return_type)
-return [(result[result.columns[i]], arrow_type)
-for i, arrow_type in enumerate(arrow_return_types)]
+
+if not assign_cols_by_pos:
+try:
+# Assign result columns by schema name
+return [(result[field.name], to_arrow_type(field.dataType))
+for field in return_type]
+except KeyError:
--- End diff --

I think we want to be a little more careful here, for example, an 
`KeyError` in to_arrow_type could lead to unexpected behavior. 

How about sth like this:
```
if any(isinstance(name, basestring) for name in result.columns):
return [(result[field.name], to_arrow_type(field.dataType)) for field 
in return_type]
else:
return [(result.iloc[:,i], to_arrow_type(field.dataType)) for i, field 
in enumerate(return_type)]
```





---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21427: [SPARK-24324][PYTHON] Pandas Grouped Map UDF shou...

2018-06-19 Thread icexelloss
Github user icexelloss commented on a diff in the pull request:

https://github.com/apache/spark/pull/21427#discussion_r196442152
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowUtils.scala 
---
@@ -120,4 +121,19 @@ object ArrowUtils {
   StructField(field.getName, dt, field.isNullable)
 })
   }
+
+  /** Return Map with conf settings to be used in ArrowPythonRunner */
+  def getPythonRunnerConfMap(conf: SQLConf): Map[String, String] = {
+val timeZoneConf = if (conf.pandasRespectSessionTimeZone) {
+  Seq(SQLConf.SESSION_LOCAL_TIMEZONE.key -> conf.sessionLocalTimeZone)
+} else {
+  Nil
+}
+val pandasColsByPosition = if 
(conf.pandasGroupedMapAssignColumnssByPosition) {
--- End diff --

Can we do:
```
val pandasColByPosition = 
Seq(SQLConf.PANDAS_GROUPED_MAP_ASSIGN_COLUMNS_BY_POSITION.key -> 
conf.pandasGroupedMapAssignColumnssByPosition)
```


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21427: [SPARK-24324][PYTHON] Pandas Grouped Map UDF shou...

2018-06-19 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/21427#discussion_r196438132
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala ---
@@ -1161,6 +1161,16 @@ object SQLConf {
   .booleanConf
   .createWithDefault(true)
 
+  val PANDAS_GROUPED_MAP_ASSIGN_COLUMNS_BY_POSITION =
+
buildConf("spark.sql.execution.pandas.groupedMap.assignColumnsByPosition")
+  .internal()
+  .doc("When true, a grouped map Pandas UDF will assign columns from 
the returned " +
+"Pandas DataFrame based on position, regardless of column label 
type. When false, " +
+"columns will be looked up by name if labeled with a string and 
fallback to use" +
--- End diff --

Yup, I think so.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21427: [SPARK-24324][PYTHON] Pandas Grouped Map UDF shou...

2018-06-19 Thread icexelloss
Github user icexelloss commented on a diff in the pull request:

https://github.com/apache/spark/pull/21427#discussion_r196437909
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/python/WindowInPandasExec.scala
 ---
@@ -97,7 +98,7 @@ case class WindowInPandasExec(
 val bufferSize = inputRDD.conf.getInt("spark.buffer.size", 65536)
 val reuseWorker = 
inputRDD.conf.getBoolean("spark.python.worker.reuse", defaultValue = true)
 val sessionLocalTimeZone = conf.sessionLocalTimeZone
-val pandasRespectSessionTimeZone = conf.pandasRespectSessionTimeZone
+val runnerConf = ArrowUtils.getPythonRunnerConfMap(conf)
--- End diff --

nit: pythonRunnerConf?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21427: [SPARK-24324][PYTHON] Pandas Grouped Map UDF shou...

2018-06-19 Thread icexelloss
Github user icexelloss commented on a diff in the pull request:

https://github.com/apache/spark/pull/21427#discussion_r196437623
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/python/FlatMapGroupsInPandasExec.scala
 ---
@@ -77,7 +78,7 @@ case class FlatMapGroupsInPandasExec(
 val reuseWorker = 
inputRDD.conf.getBoolean("spark.python.worker.reuse", defaultValue = true)
 val chainedFunc = Seq(ChainedPythonFunctions(Seq(pandasFunction)))
 val sessionLocalTimeZone = conf.sessionLocalTimeZone
-val pandasRespectSessionTimeZone = conf.pandasRespectSessionTimeZone
+val runnerConf = ArrowUtils.getPythonRunnerConfMap(conf)
--- End diff --

nit: pythonRunnerConf?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21427: [SPARK-24324][PYTHON] Pandas Grouped Map UDF shou...

2018-06-19 Thread icexelloss
Github user icexelloss commented on a diff in the pull request:

https://github.com/apache/spark/pull/21427#discussion_r196437348
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowPythonRunner.scala
 ---
@@ -58,18 +58,18 @@ class ArrowPythonRunner(
 new WriterThread(env, worker, inputIterator, partitionIndex, context) {
 
   protected override def writeCommand(dataOut: DataOutputStream): Unit 
= {
-PythonUDFRunner.writeUDFs(dataOut, funcs, argOffsets)
-if (respectTimeZone) {
-  PythonRDD.writeUTF(timeZoneId, dataOut)
-} else {
-  dataOut.writeInt(SpecialLengths.NULL)
+dataOut.writeInt(conf.size)
--- End diff --

maybe put this in a  `writeConf` method to be more explicit?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21427: [SPARK-24324][PYTHON] Pandas Grouped Map UDF shou...

2018-06-19 Thread icexelloss
Github user icexelloss commented on a diff in the pull request:

https://github.com/apache/spark/pull/21427#discussion_r196436658
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowEvalPythonExec.scala
 ---
@@ -63,7 +64,7 @@ case class ArrowEvalPythonExec(udfs: Seq[PythonUDF], 
output: Seq[Attribute], chi
 
   private val batchSize = conf.arrowMaxRecordsPerBatch
   private val sessionLocalTimeZone = conf.sessionLocalTimeZone
-  private val pandasRespectSessionTimeZone = 
conf.pandasRespectSessionTimeZone
+  private val runnerConf = ArrowUtils.getPythonRunnerConfMap(conf)
--- End diff --

nit: runnerConf  -> pythonRunnerConf?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21427: [SPARK-24324][PYTHON] Pandas Grouped Map UDF shou...

2018-06-19 Thread icexelloss
Github user icexelloss commented on a diff in the pull request:

https://github.com/apache/spark/pull/21427#discussion_r196435526
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowUtils.scala 
---
@@ -120,4 +121,19 @@ object ArrowUtils {
   StructField(field.getName, dt, field.isNullable)
 })
   }
+
+  /** Return Map with conf settings to be used in ArrowPythonRunner */
+  def getPythonRunnerConfMap(conf: SQLConf): Map[String, String] = {
--- End diff --

Maybe move this function out of `ArrowUtils`? Doesn't seem to be Arrow 
specific.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21427: [SPARK-24324][PYTHON] Pandas Grouped Map UDF shou...

2018-06-18 Thread BryanCutler
Github user BryanCutler commented on a diff in the pull request:

https://github.com/apache/spark/pull/21427#discussion_r196243235
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala ---
@@ -1161,6 +1161,16 @@ object SQLConf {
   .booleanConf
   .createWithDefault(true)
 
+  val PANDAS_GROUPED_MAP_ASSIGN_COLUMNS_BY_POSITION =
+
buildConf("spark.sql.execution.pandas.groupedMap.assignColumnsByPosition")
+  .internal()
+  .doc("When true, a grouped map Pandas UDF will assign columns from 
the returned " +
+"Pandas DataFrame based on position, regardless of column label 
type. When false, " +
+"columns will be looked up by name if labeled with a string and 
fallback to use" +
--- End diff --

This can also be marked as deprecated right?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21427: [SPARK-24324][PYTHON] Pandas Grouped Map UDF shou...

2018-06-18 Thread BryanCutler
Github user BryanCutler commented on a diff in the pull request:

https://github.com/apache/spark/pull/21427#discussion_r196242012
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowPythonRunner.scala
 ---
@@ -58,18 +58,18 @@ class ArrowPythonRunner(
 new WriterThread(env, worker, inputIterator, partitionIndex, context) {
 
   protected override def writeCommand(dataOut: DataOutputStream): Unit 
= {
-PythonUDFRunner.writeUDFs(dataOut, funcs, argOffsets)
-if (respectTimeZone) {
-  PythonRDD.writeUTF(timeZoneId, dataOut)
-} else {
-  dataOut.writeInt(SpecialLengths.NULL)
+dataOut.writeInt(conf.size)
+for ((k, v) <- conf) {
+  PythonRDD.writeUTF(k, dataOut)
+  PythonRDD.writeUTF(v, dataOut)
 }
+PythonUDFRunner.writeUDFs(dataOut, funcs, argOffsets)
   }
 
   protected override def writeIteratorToStream(dataOut: 
DataOutputStream): Unit = {
-val arrowSchema = ArrowUtils.toArrowSchema(schema, timeZoneId)
 val allocator = ArrowUtils.rootAllocator.newChildAllocator(
   s"stdout writer for $pythonExec", 0, Long.MaxValue)
+val arrowSchema = ArrowUtils.toArrowSchema(schema, timeZoneId)
--- End diff --

change this back, accidental


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21427: [SPARK-24324][PYTHON] Pandas Grouped Map UDF shou...

2018-06-18 Thread BryanCutler
Github user BryanCutler commented on a diff in the pull request:

https://github.com/apache/spark/pull/21427#discussion_r196241595
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala ---
@@ -1161,6 +1161,16 @@ object SQLConf {
   .booleanConf
   .createWithDefault(true)
 
+  val PANDAS_GROUPED_MAP_ASSIGN_COLUMNS_BY_POSITION =
+
buildConf("spark.sql.execution.pandas.groupedMap.assignColumnsByPosition")
+  .internal()
+  .doc("When true, a grouped map Pandas UDF will assign columns from 
the returned " +
+"Pandas DataFrame based on position, regardless of column label 
type. When false, " +
+"columns will be looked up by name if labeled with a string and 
fallback to use" +
--- End diff --

need a space at end


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21427: [SPARK-24324][PYTHON] Pandas Grouped Map UDF shou...

2018-05-29 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/21427#discussion_r191596459
  
--- Diff: python/pyspark/worker.py ---
@@ -111,9 +114,16 @@ def wrapped(key_series, value_series):
 "Number of columns of the returned pandas.DataFrame "
 "doesn't match specified schema. "
 "Expected: {} Actual: {}".format(len(return_type), 
len(result.columns)))
-arrow_return_types = (to_arrow_type(field.dataType) for field in 
return_type)
-return [(result[result.columns[i]], arrow_type)
-for i, arrow_type in enumerate(arrow_return_types)]
+try:
+# Assign result columns by schema name
+return [(result[field.name], to_arrow_type(field.dataType)) 
for field in return_type]
+except KeyError:
+if all(not isinstance(name, basestring) for name in 
result.columns):
+# Assign result columns by position if they are not named 
with strings
+return [(result[result.columns[i]], 
to_arrow_type(field.dataType))
+for i, field in enumerate(return_type)]
+else:
+raise
--- End diff --

Ah, I saw you add document for this behavior. Looks good.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21427: [SPARK-24324][PYTHON] Pandas Grouped Map UDF shou...

2018-05-29 Thread BryanCutler
Github user BryanCutler commented on a diff in the pull request:

https://github.com/apache/spark/pull/21427#discussion_r191511343
  
--- Diff: python/pyspark/worker.py ---
@@ -111,9 +114,16 @@ def wrapped(key_series, value_series):
 "Number of columns of the returned pandas.DataFrame "
 "doesn't match specified schema. "
 "Expected: {} Actual: {}".format(len(return_type), 
len(result.columns)))
-arrow_return_types = (to_arrow_type(field.dataType) for field in 
return_type)
-return [(result[result.columns[i]], arrow_type)
-for i, arrow_type in enumerate(arrow_return_types)]
+try:
+# Assign result columns by schema name
+return [(result[field.name], to_arrow_type(field.dataType)) 
for field in return_type]
+except KeyError:
+if all(not isinstance(name, basestring) for name in 
result.columns):
+# Assign result columns by position if they are not named 
with strings
+return [(result[result.columns[i]], 
to_arrow_type(field.dataType))
+for i, field in enumerate(return_type)]
+else:
+raise
--- End diff --

@viirya I think that it's just that it is very common for users to create a 
DataFrame with a dict using names as keys and not know that this can change the 
order of columns.  So even if the field types all match (in the case of this 
JIRA they were all StringTypes), there could be a mix up between the data and 
column names.  This is really weird and hard to figure out what is going on 
from the user perspective.

When defining the pandas_udf, the return type requires the field names, so 
if the returned DataFrame has columns indexed by strings, I think it's fair to 
assume that if they do not match it was a mistake.  If the user wants to use 
positional columns, they can index by integers - and I'll add this to the docs.

That being said, I do suppose that this slightly changes the behavior if by 
chance the user had gone out of the way to make a pandas_udf by specifying 
columns with different names than the return type schema, but still with the 
same field type order.  That seems pretty unlikely to me though.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21427: [SPARK-24324][PYTHON] Pandas Grouped Map UDF shou...

2018-05-29 Thread icexelloss
Github user icexelloss commented on a diff in the pull request:

https://github.com/apache/spark/pull/21427#discussion_r191503646
  
--- Diff: python/pyspark/worker.py ---
@@ -111,9 +114,16 @@ def wrapped(key_series, value_series):
 "Number of columns of the returned pandas.DataFrame "
 "doesn't match specified schema. "
 "Expected: {} Actual: {}".format(len(return_type), 
len(result.columns)))
-arrow_return_types = (to_arrow_type(field.dataType) for field in 
return_type)
-return [(result[result.columns[i]], arrow_type)
-for i, arrow_type in enumerate(arrow_return_types)]
+try:
+# Assign result columns by schema name
+return [(result[field.name], to_arrow_type(field.dataType)) 
for field in return_type]
+except KeyError:
+if all(not isinstance(name, basestring) for name in 
result.columns):
+# Assign result columns by position if they are not named 
with strings
+return [(result[result.columns[i]], 
to_arrow_type(field.dataType))
+for i, field in enumerate(return_type)]
+else:
+raise
--- End diff --

I think when user specify column names explicitly on the returned 
pd.DataFrame but it doesn't match the schema, then it's most likely to be a bug 
/ typo, so throw exception makes sense to me.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21427: [SPARK-24324][PYTHON] Pandas Grouped Map UDF shou...

2018-05-29 Thread BryanCutler
Github user BryanCutler commented on a diff in the pull request:

https://github.com/apache/spark/pull/21427#discussion_r191502476
  
--- Diff: python/pyspark/worker.py ---
@@ -111,9 +114,16 @@ def wrapped(key_series, value_series):
 "Number of columns of the returned pandas.DataFrame "
 "doesn't match specified schema. "
 "Expected: {} Actual: {}".format(len(return_type), 
len(result.columns)))
-arrow_return_types = (to_arrow_type(field.dataType) for field in 
return_type)
-return [(result[result.columns[i]], arrow_type)
-for i, arrow_type in enumerate(arrow_return_types)]
+try:
+# Assign result columns by schema name
+return [(result[field.name], to_arrow_type(field.dataType)) 
for field in return_type]
+except KeyError:
+if all(not isinstance(name, basestring) for name in 
result.columns):
--- End diff --

Yeah, we still need to check for the possibility that python 2 uses unicode.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21427: [SPARK-24324][PYTHON] Pandas Grouped Map UDF shou...

2018-05-29 Thread BryanCutler
Github user BryanCutler commented on a diff in the pull request:

https://github.com/apache/spark/pull/21427#discussion_r191502180
  
--- Diff: python/pyspark/sql/tests.py ---
@@ -4931,6 +4931,63 @@ def foo3(key, pdf):
 expected4 = udf3.func((), pdf)
 self.assertPandasEqual(expected4, result4)
 
+def test_column_order(self):
+import pandas as pd
+from pyspark.sql.functions import pandas_udf, PandasUDFType
+df = self.data
+
+# Function returns a pdf with required column names, but order 
could be arbitrary using dict
+def change_col_order(pdf):
+# Constructing a DataFrame from a dict should result in the 
same order,
+# but use from_items to ensure the pdf column order is 
different than schema
+return pd.DataFrame.from_items([
+('id', pdf.id),
+('u', pdf.v * 2),
+('v', pdf.v)])
+
+ordered_udf = pandas_udf(
+change_col_order,
+'id long, v int, u int',
+PandasUDFType.GROUPED_MAP
+)
+
+def positional_col_order(pdf):
--- End diff --

yeah, I'll add a test for an integer index. I don't think we need to 
explicitly only support string or int.  Only if it is not string based, then 
position will be used. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21427: [SPARK-24324][PYTHON] Pandas Grouped Map UDF shou...

2018-05-27 Thread icexelloss
Github user icexelloss commented on a diff in the pull request:

https://github.com/apache/spark/pull/21427#discussion_r191076477
  
--- Diff: python/pyspark/worker.py ---
@@ -111,9 +114,16 @@ def wrapped(key_series, value_series):
 "Number of columns of the returned pandas.DataFrame "
 "doesn't match specified schema. "
 "Expected: {} Actual: {}".format(len(return_type), 
len(result.columns)))
-arrow_return_types = (to_arrow_type(field.dataType) for field in 
return_type)
-return [(result[result.columns[i]], arrow_type)
-for i, arrow_type in enumerate(arrow_return_types)]
+try:
+# Assign result columns by schema name
+return [(result[field.name], to_arrow_type(field.dataType)) 
for field in return_type]
+except KeyError:
+if all(not isinstance(name, basestring) for name in 
result.columns):
--- End diff --

Ah, I see. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21427: [SPARK-24324][PYTHON] Pandas Grouped Map UDF shou...

2018-05-27 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/21427#discussion_r191070228
  
--- Diff: python/pyspark/worker.py ---
@@ -111,9 +114,16 @@ def wrapped(key_series, value_series):
 "Number of columns of the returned pandas.DataFrame "
 "doesn't match specified schema. "
 "Expected: {} Actual: {}".format(len(return_type), 
len(result.columns)))
-arrow_return_types = (to_arrow_type(field.dataType) for field in 
return_type)
-return [(result[result.columns[i]], arrow_type)
-for i, arrow_type in enumerate(arrow_return_types)]
+try:
+# Assign result columns by schema name
+return [(result[field.name], to_arrow_type(field.dataType)) 
for field in return_type]
+except KeyError:
+if all(not isinstance(name, basestring) for name in 
result.columns):
--- End diff --

I believe he's trying to deal with `unicode` case too just in python 2. 
`isinstance(name, basestring)` should be safer.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21427: [SPARK-24324][PYTHON] Pandas Grouped Map UDF shou...

2018-05-25 Thread felixcheung
Github user felixcheung commented on a diff in the pull request:

https://github.com/apache/spark/pull/21427#discussion_r191040210
  
--- Diff: python/pyspark/sql/tests.py ---
@@ -4931,6 +4931,63 @@ def foo3(key, pdf):
 expected4 = udf3.func((), pdf)
 self.assertPandasEqual(expected4, result4)
 
+def test_column_order(self):
+import pandas as pd
+from pyspark.sql.functions import pandas_udf, PandasUDFType
+df = self.data
+
+# Function returns a pdf with required column names, but order 
could be arbitrary using dict
+def change_col_order(pdf):
+# Constructing a DataFrame from a dict should result in the 
same order,
+# but use from_items to ensure the pdf column order is 
different than schema
+return pd.DataFrame.from_items([
+('id', pdf.id),
+('u', pdf.v * 2),
+('v', pdf.v)])
+
+ordered_udf = pandas_udf(
+change_col_order,
+'id long, v int, u int',
+PandasUDFType.GROUPED_MAP
+)
+
+def positional_col_order(pdf):
--- End diff --

would it be more nature/common to `zip(range(3)` the columns, or or just 
name them one by one explicitly?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21427: [SPARK-24324][PYTHON] Pandas Grouped Map UDF shou...

2018-05-25 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/21427#discussion_r191037717
  
--- Diff: python/pyspark/worker.py ---
@@ -111,9 +114,16 @@ def wrapped(key_series, value_series):
 "Number of columns of the returned pandas.DataFrame "
 "doesn't match specified schema. "
 "Expected: {} Actual: {}".format(len(return_type), 
len(result.columns)))
-arrow_return_types = (to_arrow_type(field.dataType) for field in 
return_type)
-return [(result[result.columns[i]], arrow_type)
-for i, arrow_type in enumerate(arrow_return_types)]
+try:
+# Assign result columns by schema name
+return [(result[field.name], to_arrow_type(field.dataType)) 
for field in return_type]
+except KeyError:
+if all(not isinstance(name, basestring) for name in 
result.columns):
+# Assign result columns by position if they are not named 
with strings
+return [(result[result.columns[i]], 
to_arrow_type(field.dataType))
+for i, field in enumerate(return_type)]
+else:
+raise
--- End diff --

Why we limit to just result columns not named with strings?

In the case we return a pd.DataFrame with matching field types, but not 
matching field names, we don't like to allow it?

If returned pd.DataFrame doesn't match return_type's column names, 
shouldn't we follow current behavior?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21427: [SPARK-24324][PYTHON] Pandas Grouped Map UDF shou...

2018-05-25 Thread icexelloss
Github user icexelloss commented on a diff in the pull request:

https://github.com/apache/spark/pull/21427#discussion_r191016970
  
--- Diff: python/pyspark/worker.py ---
@@ -111,9 +114,16 @@ def wrapped(key_series, value_series):
 "Number of columns of the returned pandas.DataFrame "
 "doesn't match specified schema. "
 "Expected: {} Actual: {}".format(len(return_type), 
len(result.columns)))
-arrow_return_types = (to_arrow_type(field.dataType) for field in 
return_type)
-return [(result[result.columns[i]], arrow_type)
-for i, arrow_type in enumerate(arrow_return_types)]
+try:
+# Assign result columns by schema name
+return [(result[field.name], to_arrow_type(field.dataType)) 
for field in return_type]
+except KeyError:
--- End diff --

One potential issue is if `to_arrow_type(field.dataType)` ever throws 
KeyError, this can lead to unintended behavior. If we want to use KeyError, 
maybe limit the try block?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21427: [SPARK-24324][PYTHON] Pandas Grouped Map UDF shou...

2018-05-25 Thread icexelloss
Github user icexelloss commented on a diff in the pull request:

https://github.com/apache/spark/pull/21427#discussion_r191015609
  
--- Diff: python/pyspark/worker.py ---
@@ -111,9 +114,16 @@ def wrapped(key_series, value_series):
 "Number of columns of the returned pandas.DataFrame "
 "doesn't match specified schema. "
 "Expected: {} Actual: {}".format(len(return_type), 
len(result.columns)))
-arrow_return_types = (to_arrow_type(field.dataType) for field in 
return_type)
-return [(result[result.columns[i]], arrow_type)
-for i, arrow_type in enumerate(arrow_return_types)]
+try:
+# Assign result columns by schema name
+return [(result[field.name], to_arrow_type(field.dataType)) 
for field in return_type]
+except KeyError:
+if all(not isinstance(name, basestring) for name in 
result.columns):
--- End diff --

Can we just do `isinstance(name, str)` here to deal with python2/3?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21427: [SPARK-24324][PYTHON] Pandas Grouped Map UDF shou...

2018-05-25 Thread icexelloss
Github user icexelloss commented on a diff in the pull request:

https://github.com/apache/spark/pull/21427#discussion_r191015105
  
--- Diff: python/pyspark/sql/tests.py ---
@@ -4931,6 +4931,63 @@ def foo3(key, pdf):
 expected4 = udf3.func((), pdf)
 self.assertPandasEqual(expected4, result4)
 
+def test_column_order(self):
+import pandas as pd
+from pyspark.sql.functions import pandas_udf, PandasUDFType
+df = self.data
+
+# Function returns a pdf with required column names, but order 
could be arbitrary using dict
+def change_col_order(pdf):
+# Constructing a DataFrame from a dict should result in the 
same order,
+# but use from_items to ensure the pdf column order is 
different than schema
+return pd.DataFrame.from_items([
+('id', pdf.id),
+('u', pdf.v * 2),
+('v', pdf.v)])
+
+ordered_udf = pandas_udf(
+change_col_order,
+'id long, v int, u int',
+PandasUDFType.GROUPED_MAP
+)
+
+def positional_col_order(pdf):
--- End diff --

Can we test these two cases too? (using integer index)

This should work
```
pd.DataFrame(OrderedDict(zip(range(3), [pdf.id, pdf.v * 3, pdf.v]))
```

This should not be supported. (We should only support string index and int 
index)
```
pd.DataFrame(OrderedDict(zip(np.arange(3.0), [pdf.id, pdf.v * 3, pdf.v]))
```



---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21427: [SPARK-24324][PYTHON] Pandas Grouped Map UDF shou...

2018-05-25 Thread BryanCutler
Github user BryanCutler commented on a diff in the pull request:

https://github.com/apache/spark/pull/21427#discussion_r191006873
  
--- Diff: python/pyspark/worker.py ---
@@ -111,9 +114,16 @@ def wrapped(key_series, value_series):
 "Number of columns of the returned pandas.DataFrame "
 "doesn't match specified schema. "
 "Expected: {} Actual: {}".format(len(return_type), 
len(result.columns)))
-arrow_return_types = (to_arrow_type(field.dataType) for field in 
return_type)
-return [(result[result.columns[i]], arrow_type)
-for i, arrow_type in enumerate(arrow_return_types)]
+try:
+# Assign result columns by schema name
+return [(result[field.name], to_arrow_type(field.dataType)) 
for field in return_type]
+except KeyError:
--- End diff --

I think it's possible for the column index to be many things, the user 
could even assign it themselves right with `pdf.columns = ...`?

As far as I can tell, using a string as a key should always result in a 
KeyError if not there..  If a MultiIndex is involved, it's a little more 
complicated but I don't think that's allowed anyway


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21427: [SPARK-24324][PYTHON] Pandas Grouped Map UDF shou...

2018-05-25 Thread icexelloss
Github user icexelloss commented on a diff in the pull request:

https://github.com/apache/spark/pull/21427#discussion_r191004141
  
--- Diff: python/pyspark/worker.py ---
@@ -111,9 +114,16 @@ def wrapped(key_series, value_series):
 "Number of columns of the returned pandas.DataFrame "
 "doesn't match specified schema. "
 "Expected: {} Actual: {}".format(len(return_type), 
len(result.columns)))
-arrow_return_types = (to_arrow_type(field.dataType) for field in 
return_type)
-return [(result[result.columns[i]], arrow_type)
-for i, arrow_type in enumerate(arrow_return_types)]
+try:
+# Assign result columns by schema name
+return [(result[field.name], to_arrow_type(field.dataType)) 
for field in return_type]
+except KeyError:
--- End diff --

I wonder whether we should rely on KeyError or the type of the column index?



---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21427: [SPARK-24324][PYTHON] Pandas Grouped Map UDF shou...

2018-05-24 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/21427#discussion_r190793613
  
--- Diff: python/pyspark/sql/tests.py ---
@@ -4931,6 +4931,33 @@ def foo3(key, pdf):
 expected4 = udf3.func((), pdf)
 self.assertPandasEqual(expected4, result4)
 
+def test_column_order(self):
+import pandas as pd
+from pyspark.sql.functions import pandas_udf, col, PandasUDFType
--- End diff --

seems `col` is not used btw.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21427: [SPARK-24324][PYTHON] Pandas Grouped Map UDF shou...

2018-05-24 Thread BryanCutler
GitHub user BryanCutler opened a pull request:

https://github.com/apache/spark/pull/21427

[SPARK-24324][PYTHON] Pandas Grouped Map UDF should assign result columns 
by name

## What changes were proposed in this pull request?

Currently, a `pandas_udf` of type `PandasUDFType.GROUPED_MAP` will assign 
the resulting columns based on index of the return pandas.DataFrame.  If a new 
DataFrame is returned and constructed using a dict, then the order of the 
columns could be arbitrary and be different than the defined schema for the 
UDF.  If the schema types still match, then no error will be raised and the 
user will see column names and column data mixed up.

## How was this patch tested?

Added a test that returns a new DataFrame with column order different than 
the schema.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/BryanCutler/spark 
arrow-grouped-map-mixesup-cols-SPARK-24324

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/21427.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #21427


commit 0641c5a0cd690fec905829b70006de3f8a4902fc
Author: Bryan Cutler 
Date:   2018-05-24T19:03:02Z

added test for diff column order

commit 8484647113144958c8ebcf3611c222119047cc96
Author: Bryan Cutler 
Date:   2018-05-24T19:18:47Z

needed to adjust expected values to compare results

commit d67a8a5987d6ba4bdd65f5d5decafca2d22291ad
Author: Bryan Cutler 
Date:   2018-05-24T19:21:36Z

for grouped map results, get columns based on name instead of position




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org