[GitHub] spark issue #15821: [SPARK-13534][WIP][PySpark] Using Apache Arrow to increa...

2017-01-23 Thread icexelloss
Github user icexelloss commented on the issue:

https://github.com/apache/spark/pull/15821
  
Bryan,

I am working on:
(1) Add more numbers to benchmark.py
(2) Add support for date/timestamp/binary type
(3) Fix memory leaking in the code.

All these should be done soon (tomorrow, if not today), but I think we can
start getting feedbacks from Spark committers.

What do you think? Is there anything else you want to be done before
updating the PR to Spark committers?

Li

On Wed, Jan 18, 2017 at 7:52 PM, Bryan Cutler <notificati...@github.com>
wrote:

> Shall we update this PR to the latest and solicit from involvement from
> Spark committers?
>
> Yeah, I think it's about ready for that. After we integrate the latest
> changes, I'll go over once more for some minor cleanup and update this.
> Probably in the next day or so.
>
> —
> You are receiving this because you were mentioned.
> Reply to this email directly, view it on GitHub
> <https://github.com/apache/spark/pull/15821#issuecomment-273649530>, or 
mute
> the thread
> 
<https://github.com/notifications/unsubscribe-auth/AAwbrDkFMHPoyXOD0H-OYSyeAx_1EL2Jks5rTrO4gaJpZM4KtGBc>
> .
>



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #15821: [SPARK-13534][WIP][PySpark] Using Apache Arrow to increa...

2016-12-01 Thread icexelloss
Github user icexelloss commented on the issue:

https://github.com/apache/spark/pull/15821
  
@BryanCutler , I have been working based on your branch here:

https://github.com/BryanCutler/spark/tree/wip-toPandas_with_arrow-SPARK-13534

Is this the right one?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #15821: [SPARK-13534][PySpark] Using Apache Arrow to increase pe...

2017-03-24 Thread icexelloss
Github user icexelloss commented on the issue:

https://github.com/apache/spark/pull/15821
  
@BryanCutler , there appears to be some stability issue in the current 
code. I am tried to repeated collect a DataFrame as Arrow BatchRecord in 
spark-shell and discovered that executors start to fail after a while.


![image](https://cloud.githubusercontent.com/assets/793516/24308213/7cd46daa-109d-11e7-8824-3255b9690cf2.png)

I looked at one failed executor, it appeared to have been killed by mesos 
because of exceeding memory limit. I suspect there are some kind of memory leak.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #15821: [SPARK-13534][PySpark] Using Apache Arrow to increase pe...

2017-03-27 Thread icexelloss
Github user icexelloss commented on the issue:

https://github.com/apache/spark/pull/15821
  
@BryanCutler 

Thanks! The issue is fixed with your new update.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #15821: [SPARK-13534][PySpark] Using Apache Arrow to increase pe...

2017-03-27 Thread icexelloss
Github user icexelloss commented on the issue:

https://github.com/apache/spark/pull/15821
  
@BryanCutler

I think this patch is in a good shape that I want to release this code 
internally in Two Sigma for beta users.

My understanding is support for timestamp and date is not available until 
arrow 0.3, is that right? 

Also, are there any unresolved issues that you know of?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #15821: [SPARK-13534][PySpark] Using Apache Arrow to increase pe...

2017-03-27 Thread icexelloss
Github user icexelloss commented on the issue:

https://github.com/apache/spark/pull/15821
  
Got it. Can you put up a patch to throw exception for timestamp and date 
type to arrow-integration branch? I would do it but I don't have my laptop with 
me now...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #15821: [SPARK-13534][PySpark] Using Apache Arrow to increase pe...

2017-03-24 Thread icexelloss
Github user icexelloss commented on the issue:

https://github.com/apache/spark/pull/15821
  
@BryanCutler from the arrow-integration branch. Where is the memory leak 
patch?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #18664: [SPARK-21375][PYSPARK][SQL][WIP] Add Date and Timestamp ...

2017-07-29 Thread icexelloss
Github user icexelloss commented on the issue:

https://github.com/apache/spark/pull/18664
  
It looks like "SESSION_LOCAL_TIMEZONE" is not respected in most of the 
pyspark functionality. 

I think `df.collect()` and `df.toPandas` can be fixed to respect 
SESSION_LOCAL_TIMEZONE.

`TimestampType().toInternal(dt)` is tricky because it doesn't have a 
reference to the SQLConf object, we should maybe deprecate this method?

I think we can also create separate Jira to address these, this PR can fix 
just the Arrow path. @BryanCutler and @ueshin do you think?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #18664: [SPARK-21375][PYSPARK][SQL][WIP] Add Date and Timestamp ...

2017-07-31 Thread icexelloss
Github user icexelloss commented on the issue:

https://github.com/apache/spark/pull/18664
  
@ueshin I amd +1 for fixing `df.collect()` and `df.toPandas()`, I don't 
think it is much of a backward-compatibility issue because the current behavior 
of `df.collect()` and `df.toPandas()` is broken when `SESSION_LOCAL_TIMEZONE` 
is set. The fact that no pyspark users complain that it doesn't work as 
expected is probably because it's not used widely in pyspark. IMHO we should 
fix this as soon as possible.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #18664: [SPARK-21375][PYSPARK][SQL][WIP] Add Date and Timestamp ...

2017-08-01 Thread icexelloss
Github user icexelloss commented on the issue:

https://github.com/apache/spark/pull/18664
  
To Wes's concern, I think we are only dealing with values in UTC here, both 
Spark and Arrow internally represents timestamp as microseconds since epoch.

To the two issues Bryan and Ueshin brought up:
Issue 1: 
I agree with Ueshin we should stick to `SESSION_LOCAL_TIMEZONE`. 
Bryan brought up a good point there in pyspark `df.toPandas()`, 
`df.collect()` and the python udf (through `Timestamp.fromInternal`) doesn't 
respect `SESSION_LOCAL_TIMEZONE` and therefore is confusing and inconsistent 
with Spark SQL behavior such as `df.show()`. Since it's going to be either 
inconsistent with Spark SQL (df.show()) or inconsistent with PySpark (i.e., the 
default df.toPandas()), I'd rather we do the right thing (by using 
`SESSION_LOCAL_TIMEZONE`) and fix other PySpark behavior separately. 

Issue 2:
I agree with Bryan that we leave the timezone as is. 
I don't think there is performance issue because like Wes mentioned, it's 
just metadata operation. I think converting it back to system timezone defeat 
the purpose of using session timezone and throwing away the tzinfo seems 
unnecessary.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18664: [SPARK-21375][PYSPARK][SQL][WIP] Add Date and Tim...

2017-08-03 Thread icexelloss
Github user icexelloss commented on a diff in the pull request:

https://github.com/apache/spark/pull/18664#discussion_r131227296
  
--- Diff: python/pyspark/sql/tests.py ---
@@ -3036,6 +3052,9 @@ def test_toPandas_arrow_toggle(self):
 pdf = df.toPandas()
 self.spark.conf.set("spark.sql.execution.arrow.enable", "true")
 pdf_arrow = df.toPandas()
+# need to remove timezone for comparison
+pdf_arrow["7_timestamp_t"] = \
+pdf_arrow["7_timestamp_t"].apply(lambda ts: 
ts.tz_localize(None))
--- End diff --

@gatorsmile, can you be a bit explicit about the behavior we want? As we 
discovered that the current behavior of `df.toPandas()` is wrong and 
inconsistent with Spark SQL (because `df,toPandas()` doesn't respect 
`LOCAL_SESSION_TIMEZONE`), do you think we should:
1. be consistent with default `df.toPandas()` even though it is wrong
or:
2. be consistent with Spark SQL

Also this is not huge issue IMHO because we should fix the behavior of 
`df.toPandas()` soon anyway.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18933: [WIP][SPARK-21722][SQL][PYTHON] Enable timezone-a...

2017-08-15 Thread icexelloss
Github user icexelloss commented on a diff in the pull request:

https://github.com/apache/spark/pull/18933#discussion_r133229705
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala ---
@@ -912,6 +912,14 @@ object SQLConf {
   .intConf
   .createWithDefault(1)
 
+  val PANDAS_TIMEZONE_AWARE =
--- End diff --

There are other parts of the pyspark that doesn't use session local 
timezone. For instance, df.collect() and (maybe) python udf execution.

I am worried that having those to be inconsistent (some use local timezone, 
some doesn't) and complex (one configuration for each of these functionality?)

While it will be harder to fix, but how about we use one configuration to 
control the behavior of `df.toPandas()` and `df.collect()` and python udf 
regarding session local timezone?
 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18732: groupby().apply() with pandas udf

2017-07-25 Thread icexelloss
GitHub user icexelloss opened a pull request:

https://github.com/apache/spark/pull/18732

groupby().apply() with pandas udf

## What changes were proposed in this pull request?

This PR adds an apply() function on df.groupby(). apply() takes a pandas 
udf that is a transformation on `pandas.DataFrame` -> `pandas.DataFrame`. 

A quick example:
```
schema = df.schema

@pandas_udf(schema)
def normalize(pdf):
pdf.v1 = (pdf.v1 - pdf.v1.mean()) / pdf.v1.std()
return pdf

df.groupBy('id').apply(normalize(df))
```

This Patch consists of multiple parts which can be broken into small PRs:
* Arrow RecordBatch -> UnsafeRow conversions in ArrowConverters
* pandas_udf in pyspark.sql.functions
* FlatMapInPandas plan node to support groupby().apply()

The first two parts can be used to implement other pandas udf functions.

Design doc: 
https://github.com/icexelloss/spark/blob/pandas-udf-doc/docs/pyspark-pandas-udf.md

## How was this patch tested?
* Arrow RecordBatch -> UnsafeRow conversions is tested with the existing 
ArrowConverters Suite
* groupby().apply() is tested with a new pyspark test



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/icexelloss/spark groupby-apply-SPARK-20396

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/18732.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #18732


commit 8f38c15ea372c1ec9c2fefc36c0bfc3a22c3be14
Author: Li Jin <ice.xell...@gmail.com>
Date:   2017-07-25T15:37:39Z

Initial commit of groupby apply with pandas udf




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18659: [SPARK-21404][PYSPARK][WIP] Simple Python Vectori...

2017-07-25 Thread icexelloss
Github user icexelloss commented on a diff in the pull request:

https://github.com/apache/spark/pull/18659#discussion_r129412101
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowConverters.scala
 ---
@@ -132,6 +135,61 @@ private[sql] object ArrowConverters {
 }
   }
 
+  private[sql] def fromPayloadIterator(iter: Iterator[ArrowPayload]): 
Iterator[InternalRow] = {
+new Iterator[InternalRow] {
+  private val _allocator = new RootAllocator(Long.MaxValue)
+  private var _reader: ArrowFileReader = _
+  private var _root: VectorSchemaRoot = _
+  private var _index = 0
+
+  loadNextBatch()
+
+  override def hasNext: Boolean = _root != null && _index < 
_root.getRowCount
+
+  override def next(): InternalRow = {
+val fields = _root.getFieldVectors.asScala
+
+val genericRowData = fields.map { field =>
+  field.getAccessor.getObject(_index)
+}.toArray[Any]
--- End diff --

@BryanCutler,

I have implemented arrow -> unsafe row conversions in:


https://github.com/icexelloss/spark/commit/8f38c15ea372c1ec9c2fefc36c0bfc3a22c3be14#diff-52cca47e7a940849b28d476ddf99d65eR575

This reuses the row object and doesn't do boxing. Hopefully it's useful to 
you as well?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #18664: [SPARK-21375][PYSPARK][SQL][WIP] Add Date and Timestamp ...

2017-07-24 Thread icexelloss
Github user icexelloss commented on the issue:

https://github.com/apache/spark/pull/18664
  
Excited to see this being worked on.

> SQLConf.SESSION_LOCAL_TIMEZONE
I like this the best. This presents timestamp in local time which is 
compatible with the existing `toPandas()` and `collect()`.  If we really want 
to have the result exactly the same as the non-arrow version of toPandas, we 
can do sth like `df[col] = df[col].dt.tz_localize(None)` after getting the 
`pandas.DataFrame` from arrow.






---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15821: [SPARK-13534][PySpark] Using Apache Arrow to incr...

2017-04-27 Thread icexelloss
Github user icexelloss commented on a diff in the pull request:

https://github.com/apache/spark/pull/15821#discussion_r113728111
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowConverters.scala
 ---
@@ -0,0 +1,396 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements.  See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License.  You may obtain a copy of the License at
+*
+*http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.spark.sql.execution.arrow
+
+import java.io.ByteArrayOutputStream
+import java.nio.channels.Channels
+
+import scala.collection.JavaConverters._
+
+import io.netty.buffer.ArrowBuf
+import org.apache.arrow.memory.{BufferAllocator, RootAllocator}
+import org.apache.arrow.vector._
+import org.apache.arrow.vector.BaseValueVector.BaseMutator
+import org.apache.arrow.vector.file._
+import org.apache.arrow.vector.schema.{ArrowFieldNode, ArrowRecordBatch}
+import org.apache.arrow.vector.types.{DateUnit, FloatingPointPrecision, 
TimeUnit}
+import org.apache.arrow.vector.types.pojo.{ArrowType, Field, FieldType, 
Schema}
+import org.apache.arrow.vector.util.ByteArrayReadableSeekableByteChannel
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.types._
+import org.apache.spark.util.Utils
+
+
+/**
+ * Store Arrow data in a form that can be serialized by Spark
+ */
+private[sql] class ArrowPayload(val batchBytes: Array[Byte]) extends 
Serializable {
+
+  def this(batch: ArrowRecordBatch, schema: StructType, allocator: 
BufferAllocator) = {
+this(ArrowConverters.batchToByteArray(batch, schema, allocator))
+  }
+
+  def loadBatch(allocator: BufferAllocator): ArrowRecordBatch = {
+ArrowConverters.byteArrayToBatch(batchBytes, allocator)
+  }
+}
+
+private[sql] object ArrowConverters {
+
+  /**
+   * Map a Spark DataType to ArrowType.
+   */
+  private[arrow] def sparkTypeToArrowType(dataType: DataType): ArrowType = 
{
+dataType match {
+  case BooleanType => ArrowType.Bool.INSTANCE
+  case ShortType => new ArrowType.Int(8 * ShortType.defaultSize, true)
+  case IntegerType => new ArrowType.Int(8 * IntegerType.defaultSize, 
true)
+  case LongType => new ArrowType.Int(8 * LongType.defaultSize, true)
+  case FloatType => new 
ArrowType.FloatingPoint(FloatingPointPrecision.SINGLE)
+  case DoubleType => new 
ArrowType.FloatingPoint(FloatingPointPrecision.DOUBLE)
+  case ByteType => new ArrowType.Int(8, true)
+  case StringType => ArrowType.Utf8.INSTANCE
+  case BinaryType => ArrowType.Binary.INSTANCE
+  case _ => throw new UnsupportedOperationException(s"Unsupported data 
type: $dataType")
+}
+  }
+
+  /**
+   * Convert a Spark Dataset schema to Arrow schema.
+   */
+  private[arrow] def schemaToArrowSchema(schema: StructType): Schema = {
+val arrowFields = schema.fields.map { f =>
+  new Field(f.name, f.nullable, sparkTypeToArrowType(f.dataType), 
List.empty[Field].asJava)
+}
+new Schema(arrowFields.toList.asJava)
+  }
+
+  /**
+   * Maps Iterator from InternalRow to ArrowPayload
+   */
+  private[sql] def toPayloadIterator(
+  rowIter: Iterator[InternalRow],
+  schema: StructType): Iterator[ArrowPayload] = {
+new Iterator[ArrowPayload] {
+  private val _allocator = new RootAllocator(Long.MaxValue)
+  private var _nextPayload = if (rowIter.nonEmpty) convert() else null
+
+  override def hasNext: Boolean = _nextPayload != null
+
+  override def next(): ArrowPayload = {
+val obj = _nextPayload
+if (hasNext) {
+  if (rowIter.hasNext) {
+_nextPayload = convert()
+  } else {
+_allocator.close()
--- End diff --

I think also need to handle exception / task cancellation here to free up 
the memory.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. I

[GitHub] spark pull request #15821: [SPARK-13534][PySpark] Using Apache Arrow to incr...

2017-04-27 Thread icexelloss
Github user icexelloss commented on a diff in the pull request:

https://github.com/apache/spark/pull/15821#discussion_r113728646
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowConverters.scala
 ---
@@ -0,0 +1,396 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements.  See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License.  You may obtain a copy of the License at
+*
+*http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.spark.sql.execution.arrow
+
+import java.io.ByteArrayOutputStream
+import java.nio.channels.Channels
+
+import scala.collection.JavaConverters._
+
+import io.netty.buffer.ArrowBuf
+import org.apache.arrow.memory.{BufferAllocator, RootAllocator}
+import org.apache.arrow.vector._
+import org.apache.arrow.vector.BaseValueVector.BaseMutator
+import org.apache.arrow.vector.file._
+import org.apache.arrow.vector.schema.{ArrowFieldNode, ArrowRecordBatch}
+import org.apache.arrow.vector.types.{DateUnit, FloatingPointPrecision, 
TimeUnit}
+import org.apache.arrow.vector.types.pojo.{ArrowType, Field, FieldType, 
Schema}
+import org.apache.arrow.vector.util.ByteArrayReadableSeekableByteChannel
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.types._
+import org.apache.spark.util.Utils
+
+
+/**
+ * Store Arrow data in a form that can be serialized by Spark
+ */
+private[sql] class ArrowPayload(val batchBytes: Array[Byte]) extends 
Serializable {
+
+  def this(batch: ArrowRecordBatch, schema: StructType, allocator: 
BufferAllocator) = {
+this(ArrowConverters.batchToByteArray(batch, schema, allocator))
+  }
+
+  def loadBatch(allocator: BufferAllocator): ArrowRecordBatch = {
+ArrowConverters.byteArrayToBatch(batchBytes, allocator)
+  }
+}
+
+private[sql] object ArrowConverters {
+
+  /**
+   * Map a Spark DataType to ArrowType.
+   */
+  private[arrow] def sparkTypeToArrowType(dataType: DataType): ArrowType = 
{
+dataType match {
+  case BooleanType => ArrowType.Bool.INSTANCE
+  case ShortType => new ArrowType.Int(8 * ShortType.defaultSize, true)
+  case IntegerType => new ArrowType.Int(8 * IntegerType.defaultSize, 
true)
+  case LongType => new ArrowType.Int(8 * LongType.defaultSize, true)
+  case FloatType => new 
ArrowType.FloatingPoint(FloatingPointPrecision.SINGLE)
+  case DoubleType => new 
ArrowType.FloatingPoint(FloatingPointPrecision.DOUBLE)
+  case ByteType => new ArrowType.Int(8, true)
+  case StringType => ArrowType.Utf8.INSTANCE
+  case BinaryType => ArrowType.Binary.INSTANCE
+  case _ => throw new UnsupportedOperationException(s"Unsupported data 
type: $dataType")
+}
+  }
+
+  /**
+   * Convert a Spark Dataset schema to Arrow schema.
+   */
+  private[arrow] def schemaToArrowSchema(schema: StructType): Schema = {
+val arrowFields = schema.fields.map { f =>
+  new Field(f.name, f.nullable, sparkTypeToArrowType(f.dataType), 
List.empty[Field].asJava)
+}
+new Schema(arrowFields.toList.asJava)
+  }
+
+  /**
+   * Maps Iterator from InternalRow to ArrowPayload
+   */
+  private[sql] def toPayloadIterator(
+  rowIter: Iterator[InternalRow],
+  schema: StructType): Iterator[ArrowPayload] = {
+new Iterator[ArrowPayload] {
+  private val _allocator = new RootAllocator(Long.MaxValue)
--- End diff --

I think it's better to have root allocator and use child allocators from 
the root allocator when an allocator is needed. This way it will be easy to 
find memory leaks. Maybe @julienledem can chime in and suggest the best 
practice? 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

---

[GitHub] spark pull request #15821: [SPARK-13534][PySpark] Using Apache Arrow to incr...

2017-04-27 Thread icexelloss
Github user icexelloss commented on a diff in the pull request:

https://github.com/apache/spark/pull/15821#discussion_r113729309
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowConverters.scala
 ---
@@ -0,0 +1,396 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements.  See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License.  You may obtain a copy of the License at
+*
+*http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.spark.sql.execution.arrow
+
+import java.io.ByteArrayOutputStream
+import java.nio.channels.Channels
+
+import scala.collection.JavaConverters._
+
+import io.netty.buffer.ArrowBuf
+import org.apache.arrow.memory.{BufferAllocator, RootAllocator}
+import org.apache.arrow.vector._
+import org.apache.arrow.vector.BaseValueVector.BaseMutator
+import org.apache.arrow.vector.file._
+import org.apache.arrow.vector.schema.{ArrowFieldNode, ArrowRecordBatch}
+import org.apache.arrow.vector.types.{DateUnit, FloatingPointPrecision, 
TimeUnit}
+import org.apache.arrow.vector.types.pojo.{ArrowType, Field, FieldType, 
Schema}
+import org.apache.arrow.vector.util.ByteArrayReadableSeekableByteChannel
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.types._
+import org.apache.spark.util.Utils
+
+
+/**
+ * Store Arrow data in a form that can be serialized by Spark
+ */
+private[sql] class ArrowPayload(val batchBytes: Array[Byte]) extends 
Serializable {
--- End diff --

I think it would be helpful to document that batchBytes here is in "Arrow 
FIle Format" that contains exactly one arrow record batch


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15821: [SPARK-13534][PySpark] Using Apache Arrow to incr...

2017-04-27 Thread icexelloss
Github user icexelloss commented on a diff in the pull request:

https://github.com/apache/spark/pull/15821#discussion_r113730387
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowConverters.scala
 ---
@@ -0,0 +1,396 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements.  See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License.  You may obtain a copy of the License at
+*
+*http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.spark.sql.execution.arrow
+
+import java.io.ByteArrayOutputStream
+import java.nio.channels.Channels
+
+import scala.collection.JavaConverters._
+
+import io.netty.buffer.ArrowBuf
+import org.apache.arrow.memory.{BufferAllocator, RootAllocator}
+import org.apache.arrow.vector._
+import org.apache.arrow.vector.BaseValueVector.BaseMutator
+import org.apache.arrow.vector.file._
+import org.apache.arrow.vector.schema.{ArrowFieldNode, ArrowRecordBatch}
+import org.apache.arrow.vector.types.{DateUnit, FloatingPointPrecision, 
TimeUnit}
+import org.apache.arrow.vector.types.pojo.{ArrowType, Field, FieldType, 
Schema}
+import org.apache.arrow.vector.util.ByteArrayReadableSeekableByteChannel
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.types._
+import org.apache.spark.util.Utils
+
+
+/**
+ * Store Arrow data in a form that can be serialized by Spark
+ */
+private[sql] class ArrowPayload(val batchBytes: Array[Byte]) extends 
Serializable {
+
+  def this(batch: ArrowRecordBatch, schema: StructType, allocator: 
BufferAllocator) = {
+this(ArrowConverters.batchToByteArray(batch, schema, allocator))
+  }
+
+  def loadBatch(allocator: BufferAllocator): ArrowRecordBatch = {
+ArrowConverters.byteArrayToBatch(batchBytes, allocator)
+  }
+}
+
+private[sql] object ArrowConverters {
+
+  /**
+   * Map a Spark DataType to ArrowType.
+   */
+  private[arrow] def sparkTypeToArrowType(dataType: DataType): ArrowType = 
{
+dataType match {
+  case BooleanType => ArrowType.Bool.INSTANCE
+  case ShortType => new ArrowType.Int(8 * ShortType.defaultSize, true)
+  case IntegerType => new ArrowType.Int(8 * IntegerType.defaultSize, 
true)
+  case LongType => new ArrowType.Int(8 * LongType.defaultSize, true)
+  case FloatType => new 
ArrowType.FloatingPoint(FloatingPointPrecision.SINGLE)
+  case DoubleType => new 
ArrowType.FloatingPoint(FloatingPointPrecision.DOUBLE)
+  case ByteType => new ArrowType.Int(8, true)
+  case StringType => ArrowType.Utf8.INSTANCE
+  case BinaryType => ArrowType.Binary.INSTANCE
+  case _ => throw new UnsupportedOperationException(s"Unsupported data 
type: $dataType")
+}
+  }
+
+  /**
+   * Convert a Spark Dataset schema to Arrow schema.
+   */
+  private[arrow] def schemaToArrowSchema(schema: StructType): Schema = {
+val arrowFields = schema.fields.map { f =>
+  new Field(f.name, f.nullable, sparkTypeToArrowType(f.dataType), 
List.empty[Field].asJava)
+}
+new Schema(arrowFields.toList.asJava)
+  }
+
+  /**
+   * Maps Iterator from InternalRow to ArrowPayload
+   */
+  private[sql] def toPayloadIterator(
+  rowIter: Iterator[InternalRow],
+  schema: StructType): Iterator[ArrowPayload] = {
+new Iterator[ArrowPayload] {
+  private val _allocator = new RootAllocator(Long.MaxValue)
+  private var _nextPayload = if (rowIter.nonEmpty) convert() else null
+
+  override def hasNext: Boolean = _nextPayload != null
+
+  override def next(): ArrowPayload = {
+val obj = _nextPayload
+if (hasNext) {
+  if (rowIter.hasNext) {
+_nextPayload = convert()
+  } else {
+_allocator.close()
+_nextPayload = null
+  }
+}
+obj
+  }
+
+  private def convert(): ArrowPayload = {
+val batch = internalRowIterToArrowBatch(rowIter, schema, 
_all

[GitHub] spark issue #15821: [SPARK-13534][PySpark] Using Apache Arrow to increase pe...

2017-04-27 Thread icexelloss
Github user icexelloss commented on the issue:

https://github.com/apache/spark/pull/15821
  
>  An instance of this must be used each time a ArrowRecordBatch is created 
and then the batch and allocator must be released/closed after they have been 
processed

I think it would useful to add test to check memory leaks in error cases, 
for instance:
* Have a dataframe that throws exception after n rows. Invoke the arrow 
conversion function, and check allocator memory usage.
* Have a dataframe that is slow, invoke the arrow conversion function, 
cancel the task, and check allocator memory usage.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18664: [SPARK-21375][PYSPARK][SQL][WIP] Add Date and Tim...

2017-07-28 Thread icexelloss
Github user icexelloss commented on a diff in the pull request:

https://github.com/apache/spark/pull/18664#discussion_r130201966
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowUtils.scala 
---
@@ -42,6 +43,9 @@ object ArrowUtils {
 case StringType => ArrowType.Utf8.INSTANCE
 case BinaryType => ArrowType.Binary.INSTANCE
 case DecimalType.Fixed(precision, scale) => new 
ArrowType.Decimal(precision, scale)
+case DateType => new ArrowType.Date(DateUnit.DAY)
+case TimestampType =>
+  new ArrowType.Timestamp(TimeUnit.MICROSECOND, 
DateTimeUtils.defaultTimeZone().getID)
--- End diff --

I do think we should use SQLConf.SESSION_LOCAL_TIMEZONE in this PR. I am 
concerned about potential inconsistent behavior due to system timezone like 
@wesm mentioned.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18787: [SPARK-21583][SQL] Create a ColumnarBatch from Ar...

2017-08-09 Thread icexelloss
Github user icexelloss commented on a diff in the pull request:

https://github.com/apache/spark/pull/18787#discussion_r132188490
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnarBatch.java
 ---
@@ -65,15 +65,35 @@
   final Row row;
 
   public static ColumnarBatch allocate(StructType schema, MemoryMode 
memMode) {
-return new ColumnarBatch(schema, DEFAULT_BATCH_SIZE, memMode);
+return allocate(schema, memMode, DEFAULT_BATCH_SIZE);
   }
 
   public static ColumnarBatch allocate(StructType type) {
-return new ColumnarBatch(type, DEFAULT_BATCH_SIZE, 
DEFAULT_MEMORY_MODE);
+return allocate(type, DEFAULT_MEMORY_MODE, DEFAULT_BATCH_SIZE);
   }
 
   public static ColumnarBatch allocate(StructType schema, MemoryMode 
memMode, int maxRows) {
-return new ColumnarBatch(schema, maxRows, memMode);
+ColumnVector[] columns = allocateCols(schema, maxRows, memMode);
+return new ColumnarBatch(schema, columns, maxRows);
+  }
+
+  private static ColumnVector[] allocateCols(StructType schema, int 
maxRows, MemoryMode memMode) {
+ColumnVector[] columns = new ColumnVector[schema.size()];
+for (int i = 0; i < schema.fields().length; ++i) {
+  StructField field = schema.fields()[i];
+  columns[i] = ColumnVector.allocate(maxRows, field.dataType(), 
memMode);
+}
+return columns;
+  }
+
+  public static ColumnarBatch createReadOnly(
+  StructType schema,
+  ReadOnlyColumnVector[] columns,
+  int numRows) {
+assert(schema.length() == columns.length);
+ColumnarBatch batch = new ColumnarBatch(schema, columns, numRows);
--- End diff --

Why the capacity is set to `numRows` inside the ctor but need to call 
`batch.setNumRows()` manually?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18787: [SPARK-21583][SQL] Create a ColumnarBatch from Ar...

2017-08-09 Thread icexelloss
Github user icexelloss commented on a diff in the pull request:

https://github.com/apache/spark/pull/18787#discussion_r132187027
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnarBatch.java
 ---
@@ -65,15 +65,35 @@
   final Row row;
 
   public static ColumnarBatch allocate(StructType schema, MemoryMode 
memMode) {
-return new ColumnarBatch(schema, DEFAULT_BATCH_SIZE, memMode);
+return allocate(schema, memMode, DEFAULT_BATCH_SIZE);
   }
 
   public static ColumnarBatch allocate(StructType type) {
-return new ColumnarBatch(type, DEFAULT_BATCH_SIZE, 
DEFAULT_MEMORY_MODE);
+return allocate(type, DEFAULT_MEMORY_MODE, DEFAULT_BATCH_SIZE);
   }
 
   public static ColumnarBatch allocate(StructType schema, MemoryMode 
memMode, int maxRows) {
-return new ColumnarBatch(schema, maxRows, memMode);
+ColumnVector[] columns = allocateCols(schema, maxRows, memMode);
+return new ColumnarBatch(schema, columns, maxRows);
+  }
+
+  private static ColumnVector[] allocateCols(StructType schema, int 
maxRows, MemoryMode memMode) {
+ColumnVector[] columns = new ColumnVector[schema.size()];
+for (int i = 0; i < schema.fields().length; ++i) {
+  StructField field = schema.fields()[i];
+  columns[i] = ColumnVector.allocate(maxRows, field.dataType(), 
memMode);
+}
+return columns;
+  }
+
+  public static ColumnarBatch createReadOnly(
+  StructType schema,
+  ReadOnlyColumnVector[] columns,
+  int numRows) {
+assert(schema.length() == columns.length);
+ColumnarBatch batch = new ColumnarBatch(schema, columns, numRows);
+batch.setNumRows(numRows);
--- End diff --

Do we need to check each ReadOnlyColumnVector has numRows?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #15821: [SPARK-13534][PySpark] Using Apache Arrow to increase pe...

2017-05-15 Thread icexelloss
Github user icexelloss commented on the issue:

https://github.com/apache/spark/pull/15821
  
@BryanCutler , is Timestamp and Date type supported now with Arrow 0.3?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #15821: [SPARK-13534][PySpark] Using Apache Arrow to increase pe...

2017-05-16 Thread icexelloss
Github user icexelloss commented on the issue:

https://github.com/apache/spark/pull/15821
  
>@icexelloss , yes Arrow supports it but Spark stores timestamps is a 
different way which caused some complication. After talking with Holden, we 
agreed it was better to keep this PR to simple data types only and extent type 
support in a follow up PR.

Got it. Can you share some details?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19284: [SPARK-22067][SQL] ArrowWriter should use position when ...

2017-09-19 Thread icexelloss
Github user icexelloss commented on the issue:

https://github.com/apache/spark/pull/19284
  
LGTM. What's the Arrow bug you mentioned? 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...

2017-10-04 Thread icexelloss
Github user icexelloss commented on a diff in the pull request:

https://github.com/apache/spark/pull/18732#discussion_r142845456
  
--- Diff: python/pyspark/sql/group.py ---
@@ -192,7 +193,67 @@ def pivot(self, pivot_col, values=None):
 jgd = self._jgd.pivot(pivot_col)
 else:
 jgd = self._jgd.pivot(pivot_col, values)
-return GroupedData(jgd, self.sql_ctx)
+return GroupedData(jgd, self._df)
+
+def apply(self, udf):
+"""
+Maps each group of the current :class:`DataFrame` using a pandas 
udf and returns the result
+as a :class:`DataFrame`.
+
+The user-defined function should take a `pandas.DataFrame` and 
return another
+`pandas.DataFrame`. Each group is passed as a `pandas.DataFrame` 
to the user-function and
+the returned`pandas.DataFrame` are combined as a 
:class:`DataFrame`. The returned
+`pandas.DataFrame` can be arbitrary length and its schema should 
match the returnType of
+the pandas udf.
+
+:param udf: A wrapped function returned by `pandas_udf`
+
+>>> df = spark.createDataFrame(
+... [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
+... ("id", "v"))
+>>> @pandas_udf(returnType=df.schema)
+... def normalize(pdf):
+... v = pdf.v
+... return pdf.assign(v=(v - v.mean()) / v.std())
+>>> df.groupby('id').apply(normalize).show() #  doctest: +SKIP
--- End diff --

Fixed. Thanks!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...

2017-10-04 Thread icexelloss
Github user icexelloss commented on a diff in the pull request:

https://github.com/apache/spark/pull/18732#discussion_r142840490
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/object.scala
 ---
@@ -519,3 +519,18 @@ case class CoGroup(
 outputObjAttr: Attribute,
 left: LogicalPlan,
 right: LogicalPlan) extends BinaryNode with ObjectProducer
+
+case class FlatMapGroupsInPandas(
--- End diff --

Doc added.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #18732: [SPARK-20396][SQL][PySpark] groupby().apply() with panda...

2017-10-04 Thread icexelloss
Github user icexelloss commented on the issue:

https://github.com/apache/spark/pull/18732
  
I pushed a new commit addressing the comments. Let me scan through the 
comments again. I think there are some comments around worker.py not being 
addressed yet.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...

2017-10-04 Thread icexelloss
Github user icexelloss commented on a diff in the pull request:

https://github.com/apache/spark/pull/18732#discussion_r142839010
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowEvalPythonExec.scala
 ---
@@ -26,6 +26,25 @@ import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.execution.SparkPlan
 import org.apache.spark.sql.types.StructType
 
+private class BatchIterator[T](iter: Iterator[T], batchSize: Int)
+  extends Iterator[Iterator[T]] {
+
+  override def hasNext: Boolean = iter.hasNext
+
+  override def next(): Iterator[T] = {
--- End diff --

Sorry I pushed a bit late. The comment is added now.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #18664: [SPARK-21375][PYSPARK][SQL][WIP] Add Date and Timestamp ...

2017-10-06 Thread icexelloss
Github user icexelloss commented on the issue:

https://github.com/apache/spark/pull/18664
  
If we all agree on the necessity of a design doc first, I can create a Jira 
and we can make progress there.

What do you all think? @BryanCutler  @gatorsmile @HyukjinKwon 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...

2017-10-06 Thread icexelloss
Github user icexelloss commented on a diff in the pull request:

https://github.com/apache/spark/pull/18732#discussion_r143263694
  
--- Diff: python/pyspark/sql/group.py ---
@@ -192,7 +193,69 @@ def pivot(self, pivot_col, values=None):
 jgd = self._jgd.pivot(pivot_col)
 else:
 jgd = self._jgd.pivot(pivot_col, values)
-return GroupedData(jgd, self.sql_ctx)
+return GroupedData(jgd, self._df)
+
+@since(2.3)
+def apply(self, udf):
--- End diff --

@rxin just to recap our discussion regarding naming:

You asked:
> What's the difference between this one and the transform function you 
also proposed? I'm trying to see if all the naming
makes sense when considered together.

Answer is:
`transform` takes a function: pd.Series -> pd.Series and apply the function 
on each column (or subset of columns). The input and output Series are of the 
same length.

`apply` takes a function: pd.DataFrame -> pd.DataFrame and apply the 
function on the group. Similar to `flatMapGroups`

Does this make sense to you?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #18664: [SPARK-21375][PYSPARK][SQL][WIP] Add Date and Timestamp ...

2017-10-06 Thread icexelloss
Github user icexelloss commented on the issue:

https://github.com/apache/spark/pull/18664
  
> The baseline should be (as said above): Internal optimisation should not 
introduce any behaviour change, and we are discouraged to change the previous 
behaviour unless it has bugs in general.

I am not sure if I totally agree with this. 

Take the struct  for instance, in the non-Arrow version, struct is turned 
to `pyspark.sql.Row` object in `toPandas()`. I wouldn't call this bug because 
it's design choice. However, this is maybe not the best design choice because 
if the user pickle the `pandas.DataFrame` to a file and send it to someone, the 
receiver won't be able to deserialize this `pandas.DataFrame` without having 
the pyspark library dependency.

 Now I am **not** trying to argue we should or should not turn struct 
column to `pyspark.sql.Row`, my point is that there might be some design 
choices in the non-Arrow versions that are not ideal and maybe that's not a 
hard requirement to make behavior 100% the same between non-Arrow and Arrow 
version.



---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #18664: [SPARK-21375][PYSPARK][SQL][WIP] Add Date and Timestamp ...

2017-10-06 Thread icexelloss
Github user icexelloss commented on the issue:

https://github.com/apache/spark/pull/18664
  
I agree. I think some high level document describing these differences so 
we can discuss it. I think we should be more careful about Arrow-version 
behavior before releasing support for timestamp and nested types.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #18732: [SPARK-20396][SQL][PySpark] groupby().apply() with panda...

2017-10-06 Thread icexelloss
Github user icexelloss commented on the issue:

https://github.com/apache/spark/pull/18732
  
Hi All, I think all comments should be addressed at this point, except for 
the naming comment from @rxin.

If I missed something or if there is anything else you want me to address, 
please let me know.

Otherwise, I will just wait for Reynold's feedback regarding naming.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...

2017-10-06 Thread icexelloss
Github user icexelloss commented on a diff in the pull request:

https://github.com/apache/spark/pull/18732#discussion_r143198047
  
--- Diff: python/pyspark/sql/group.py ---
@@ -192,7 +193,69 @@ def pivot(self, pivot_col, values=None):
 jgd = self._jgd.pivot(pivot_col)
 else:
 jgd = self._jgd.pivot(pivot_col, values)
-return GroupedData(jgd, self.sql_ctx)
+return GroupedData(jgd, self._df)
+
+@since(2.3)
+def apply(self, udf):
+"""
+Maps each group of the current :class:`DataFrame` using a pandas 
udf and returns the result
+as a :class:`DataFrame`.
+
+The user-defined function should take a `pandas.DataFrame` and 
return another
+`pandas.DataFrame`. Each group is passed as a `pandas.DataFrame` 
to the user-function and
+the returned`pandas.DataFrame` are combined as a 
:class:`DataFrame`. The returned
+`pandas.DataFrame` can be arbitrary length and its schema should 
match the returnType of
+the pandas udf.
+
+:param udf: A wrapped udf function returned by 
:meth:`pyspark.sql.functions.pandas_udf`
+
+>>> from pyspark.sql.functions import pandas_udf
+>>> df = spark.createDataFrame(
+... [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
+... ("id", "v"))
+>>> @pandas_udf(returnType=df.schema)
+... def normalize(pdf):
+... v = pdf.v
+... return pdf.assign(v=(v - v.mean()) / v.std())
+>>> df.groupby('id').apply(normalize).show()  # doctest: +SKIP
++---+---+
+| id|  v|
++---+---+
+|  1|-0.7071067811865475|
+|  1| 0.7071067811865475|
+|  2|-0.8320502943378437|
+|  2|-0.2773500981126146|
+|  2| 1.1094003924504583|
++---+---+
+
+.. seealso:: :meth:`pyspark.sql.functions.pandas_udf`
+
+"""
+from pyspark.sql.functions import pandas_udf
+
+# Columns are special because hasattr always return True
+if isinstance(udf, Column) or not hasattr(udf, 'func') or not 
udf.vectorized:
+raise ValueError("The argument to apply must be a pandas_udf")
+if not isinstance(udf.returnType, StructType):
+raise ValueError("The returnType of the pandas_udf must be a 
StructType")
+
+df = self._df
+func = udf.func
+returnType = udf.returnType
+
+# The python executors expects the function to take a list of 
pd.Series as input
+# So we to create a wrapper function that turns that to a 
pd.DataFrame before passing
+# down to the user function
+columns = df.columns
+
+def wrapped(*cols):
+import pandas as pd
+return func(pd.concat(cols, axis=1, keys=columns))
+
+wrapped_udf_obj = pandas_udf(wrapped, returnType)
+udf_column = wrapped_udf_obj(*[df[col] for col in df.columns])
--- End diff --

I see. Yeah I can make it more clear in the doc.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...

2017-10-06 Thread icexelloss
Github user icexelloss commented on a diff in the pull request:

https://github.com/apache/spark/pull/18732#discussion_r143213000
  
--- Diff: python/pyspark/sql/group.py ---
@@ -192,7 +193,69 @@ def pivot(self, pivot_col, values=None):
 jgd = self._jgd.pivot(pivot_col)
 else:
 jgd = self._jgd.pivot(pivot_col, values)
-return GroupedData(jgd, self.sql_ctx)
+return GroupedData(jgd, self._df)
+
+@since(2.3)
+def apply(self, udf):
+"""
+Maps each group of the current :class:`DataFrame` using a pandas 
udf and returns the result
+as a :class:`DataFrame`.
+
+The user-defined function should take a `pandas.DataFrame` and 
return another
+`pandas.DataFrame`. Each group is passed as a `pandas.DataFrame` 
to the user-function and
+the returned`pandas.DataFrame` are combined as a 
:class:`DataFrame`. The returned
+`pandas.DataFrame` can be arbitrary length and its schema should 
match the returnType of
+the pandas udf.
+
+:param udf: A wrapped udf function returned by 
:meth:`pyspark.sql.functions.pandas_udf`
+
+>>> from pyspark.sql.functions import pandas_udf
+>>> df = spark.createDataFrame(
+... [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
+... ("id", "v"))
+>>> @pandas_udf(returnType=df.schema)
+... def normalize(pdf):
+... v = pdf.v
+... return pdf.assign(v=(v - v.mean()) / v.std())
+>>> df.groupby('id').apply(normalize).show()  # doctest: +SKIP
++---+---+
+| id|  v|
++---+---+
+|  1|-0.7071067811865475|
+|  1| 0.7071067811865475|
+|  2|-0.8320502943378437|
+|  2|-0.2773500981126146|
+|  2| 1.1094003924504583|
++---+---+
+
+.. seealso:: :meth:`pyspark.sql.functions.pandas_udf`
+
+"""
+from pyspark.sql.functions import pandas_udf
+
+# Columns are special because hasattr always return True
+if isinstance(udf, Column) or not hasattr(udf, 'func') or not 
udf.vectorized:
+raise ValueError("The argument to apply must be a pandas_udf")
+if not isinstance(udf.returnType, StructType):
+raise ValueError("The returnType of the pandas_udf must be a 
StructType")
+
+df = self._df
+func = udf.func
+returnType = udf.returnType
+
+# The python executors expects the function to take a list of 
pd.Series as input
+# So we to create a wrapper function that turns that to a 
pd.DataFrame before passing
+# down to the user function
+columns = df.columns
+
+def wrapped(*cols):
+import pandas as pd
+return func(pd.concat(cols, axis=1, keys=columns))
+
+wrapped_udf_obj = pandas_udf(wrapped, returnType)
+udf_column = wrapped_udf_obj(*[df[col] for col in df.columns])
--- End diff --

Added


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #18664: [SPARK-21375][PYSPARK][SQL][WIP] Add Date and Timestamp ...

2017-10-06 Thread icexelloss
Github user icexelloss commented on the issue:

https://github.com/apache/spark/pull/18664
  
Bryan, I haven't created. Go ahead!
On Fri, Oct 6, 2017 at 5:45 PM Bryan Cutler <notificati...@github.com>
wrote:

> Thanks all for the discussion. I think there are a lot of subtleties at
> play here, and what may or may not be considered a bug can depend on the
> users intent. Regardless, I agree that there needs to be user facing
> documentation that will address these details, such as the questions posed
> by @gatorsmile <https://github.com/gatorsmile> above. I can create a JIRA
    > for this if @icexelloss <https://github.com/icexelloss> hasn't already.
>
> I am okay with proceeding separately for dealing with timezone, and
> matching the behaviour with Arrow to the existing behaviour without Arrow
> here with respect to timezone.
>
> @HyukjinKwon <https://github.com/hyukjinkwon> so you are suggesting to
> not use a timezone for Arrow timestamps? We discussed that earlier in this
> PR, but maybe that is the best solution for now
>
> —
> You are receiving this because you were mentioned.
> Reply to this email directly, view it on GitHub
> <https://github.com/apache/spark/pull/18664#issuecomment-334877795>, or 
mute
> the thread
> 
<https://github.com/notifications/unsubscribe-auth/AAwbrN5xDaLC77p3DZGeMgxuKeP1vtcDks5spp-GgaJpZM4Oateu>
> .
>



---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...

2017-10-03 Thread icexelloss
Github user icexelloss commented on a diff in the pull request:

https://github.com/apache/spark/pull/18732#discussion_r142583590
  
--- Diff: python/pyspark/sql/group.py ---
@@ -192,7 +193,66 @@ def pivot(self, pivot_col, values=None):
 jgd = self._jgd.pivot(pivot_col)
 else:
 jgd = self._jgd.pivot(pivot_col, values)
-return GroupedData(jgd, self.sql_ctx)
+return GroupedData(jgd, self)
--- End diff --

A good catch. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...

2017-10-03 Thread icexelloss
Github user icexelloss commented on a diff in the pull request:

https://github.com/apache/spark/pull/18732#discussion_r142583338
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowEvalPythonExec.scala
 ---
@@ -44,14 +63,17 @@ case class ArrowEvalPythonExec(udfs: Seq[PythonUDF], 
output: Seq[Attribute], chi
 val schemaOut = 
StructType.fromAttributes(output.drop(child.output.length).zipWithIndex
   .map { case (attr, i) => attr.withName(s"_$i") })
 
+val batchSize = conf.arrowMaxRecordsPerBatch
+val batchIter = if (batchSize > 0) new BatchIterator(iter, batchSize) 
else Iterator(iter)
--- End diff --

This is actually my first implementation. However it turns out I cannot 
hold reference to input rows without copy so `grouped` doesn't work (grouped 
uses a ArrayBuffer to keep references)


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...

2017-10-04 Thread icexelloss
Github user icexelloss commented on a diff in the pull request:

https://github.com/apache/spark/pull/18732#discussion_r142678914
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/python/FlatMapGroupsInPandasExec.scala
 ---
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.python
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.TaskContext
+import org.apache.spark.api.python.{ChainedPythonFunctions, PythonEvalType}
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions._
+import 
org.apache.spark.sql.catalyst.plans.physical.{ClusteredDistribution, 
Distribution, Partitioning}
+import org.apache.spark.sql.execution.{GroupedIterator, SparkPlan, 
UnaryExecNode}
+
+case class FlatMapGroupsInPandasExec(
+groupingAttributes: Seq[Attribute],
+func: Expression,
+output: Seq[Attribute],
+child: SparkPlan)
+  extends UnaryExecNode {
+
+  private val pandasFunction = func.asInstanceOf[PythonUDF].func
+
+  override def outputPartitioning: Partitioning = child.outputPartitioning
+
+  override def producedAttributes: AttributeSet = AttributeSet(output)
+
+  override def requiredChildDistribution: Seq[Distribution] =
+ClusteredDistribution(groupingAttributes) :: Nil
+
+  override def requiredChildOrdering: Seq[Seq[SortOrder]] =
+Seq(groupingAttributes.map(SortOrder(_, Ascending)))
+
+  override protected def doExecute(): RDD[InternalRow] = {
+val inputRDD = child.execute()
+
+val bufferSize = inputRDD.conf.getInt("spark.buffer.size", 65536)
+val reuseWorker = 
inputRDD.conf.getBoolean("spark.python.worker.reuse", defaultValue = true)
+val chainedFunc = Seq(ChainedPythonFunctions(Seq(pandasFunction)))
+val argOffsets = Array((0 until child.schema.length).toArray)
+
+inputRDD.mapPartitionsInternal { iter =>
+  val grouped = GroupedIterator(iter, groupingAttributes, child.output)
+  val context = TaskContext.get()
+
+  val columnarBatchIter = new ArrowPythonRunner(
+chainedFunc, bufferSize, reuseWorker,
+PythonEvalType.SQL_PANDAS_UDF, argOffsets, child.schema)
+.compute(grouped.map(_._2), context.partitionId(), context)
+
+  val rowIter = new Iterator[InternalRow] {
+private var currentIter = if (columnarBatchIter.hasNext) {
+  val batch = columnarBatchIter.next()
+  batch.rowIterator.asScala
--- End diff --

The returned schema is checked on the python side. It will throw exception 
when serializer tries to coerce series types.

Here is the test that covers wrong return types:

https://github.com/icexelloss/spark/blob/groupby-apply-SPARK-20396/python/pyspark/sql/tests.py#L3458
 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...

2017-10-04 Thread icexelloss
Github user icexelloss commented on a diff in the pull request:

https://github.com/apache/spark/pull/18732#discussion_r142689702
  
--- Diff: python/pyspark/sql/tests.py ---
@@ -3106,8 +3106,9 @@ def assertFramesEqual(self, df_with_arrow, 
df_without):
 self.assertTrue(df_without.equals(df_with_arrow), msg=msg)
 
 def test_unsupported_datatype(self):
-schema = StructType([StructField("dt", DateType(), True)])
-df = self.spark.createDataFrame([(datetime.date(1970, 1, 1),)], 
schema=schema)
--- End diff --

I think it's a white space thing. Let me revert this.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...

2017-10-04 Thread icexelloss
Github user icexelloss commented on a diff in the pull request:

https://github.com/apache/spark/pull/18732#discussion_r142690650
  
--- Diff: python/pyspark/sql/group.py ---
@@ -192,7 +193,66 @@ def pivot(self, pivot_col, values=None):
 jgd = self._jgd.pivot(pivot_col)
 else:
 jgd = self._jgd.pivot(pivot_col, values)
-return GroupedData(jgd, self.sql_ctx)
+return GroupedData(jgd, self)
+
+def apply(self, udf):
+"""
+Maps each group of the current :class:`DataFrame` using a pandas 
udf and returns the result
+as a :class:`DataFrame`.
+
+The user-function should take a `pandas.DataFrame` and return 
another `pandas.DataFrame`.
+Each group is passed as a `pandas.DataFrame` to the user-function 
and the returned
+`pandas.DataFrame` are combined as a :class:`DataFrame`. The 
returned `pandas.DataFrame`
+can be arbitrary length and its schema should match the returnType 
of the pandas udf.
+
+:param udf: A wrapped function returned by `pandas_udf`
+
+>>> df = spark.createDataFrame(
+... [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
+... ("id", "v"))
+>>> @pandas_udf(returnType=df.schema)
+... def normalize(pdf):
+... v = pdf.v
+... return pdf.assign(v=(v - v.mean()) / v.std())
+>>> df.groupby('id').apply(normalize).show() # doctest: + SKIP
--- End diff --

Thanks! Fixed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...

2017-10-04 Thread icexelloss
Github user icexelloss commented on a diff in the pull request:

https://github.com/apache/spark/pull/18732#discussion_r142690602
  
--- Diff: python/pyspark/sql/group.py ---
@@ -192,7 +193,66 @@ def pivot(self, pivot_col, values=None):
 jgd = self._jgd.pivot(pivot_col)
 else:
 jgd = self._jgd.pivot(pivot_col, values)
-return GroupedData(jgd, self.sql_ctx)
+return GroupedData(jgd, self)
--- End diff --

Fixed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...

2017-10-04 Thread icexelloss
Github user icexelloss commented on a diff in the pull request:

https://github.com/apache/spark/pull/18732#discussion_r142694484
  
--- Diff: python/pyspark/worker.py ---
@@ -74,17 +74,35 @@ def wrap_udf(f, return_type):
 
 
 def wrap_pandas_udf(f, return_type):
-arrow_return_type = toArrowType(return_type)
-
-def verify_result_length(*a):
-result = f(*a)
-if not hasattr(result, "__len__"):
-raise TypeError("Return type of pandas_udf should be a 
Pandas.Series")
-if len(result) != len(a[0]):
-raise RuntimeError("Result vector from pandas_udf was not the 
required length: "
-   "expected %d, got %d" % (len(a[0]), 
len(result)))
-return result
-return lambda *a: (verify_result_length(*a), arrow_return_type)
+if isinstance(return_type, StructType):
+arrow_return_types = [to_arrow_type(field.dataType) for field in 
return_type]
+
+def fn(*a):
+import pandas as pd
+out = f(*a)
+assert isinstance(out, pd.DataFrame), \
+'Return value from the user function is not a 
pandas.DataFrame.'
--- End diff --

Good catch. Yeah let's keep such terms consistent. Fixed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...

2017-10-04 Thread icexelloss
Github user icexelloss commented on a diff in the pull request:

https://github.com/apache/spark/pull/18732#discussion_r142694381
  
--- Diff: python/pyspark/worker.py ---
@@ -74,17 +74,35 @@ def wrap_udf(f, return_type):
 
 
 def wrap_pandas_udf(f, return_type):
-arrow_return_type = toArrowType(return_type)
-
-def verify_result_length(*a):
-result = f(*a)
-if not hasattr(result, "__len__"):
-raise TypeError("Return type of pandas_udf should be a 
Pandas.Series")
-if len(result) != len(a[0]):
-raise RuntimeError("Result vector from pandas_udf was not the 
required length: "
-   "expected %d, got %d" % (len(a[0]), 
len(result)))
-return result
-return lambda *a: (verify_result_length(*a), arrow_return_type)
+if isinstance(return_type, StructType):
+arrow_return_types = [to_arrow_type(field.dataType) for field in 
return_type]
+
+def fn(*a):
+import pandas as pd
+out = f(*a)
--- End diff --

Done.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...

2017-10-04 Thread icexelloss
Github user icexelloss commented on a diff in the pull request:

https://github.com/apache/spark/pull/18732#discussion_r142697418
  
--- Diff: python/pyspark/worker.py ---
@@ -74,17 +74,35 @@ def wrap_udf(f, return_type):
 
 
 def wrap_pandas_udf(f, return_type):
-arrow_return_type = toArrowType(return_type)
-
-def verify_result_length(*a):
-result = f(*a)
-if not hasattr(result, "__len__"):
-raise TypeError("Return type of pandas_udf should be a 
Pandas.Series")
-if len(result) != len(a[0]):
-raise RuntimeError("Result vector from pandas_udf was not the 
required length: "
-   "expected %d, got %d" % (len(a[0]), 
len(result)))
-return result
-return lambda *a: (verify_result_length(*a), arrow_return_type)
+if isinstance(return_type, StructType):
--- End diff --

normal pandas doesn't support `StructType` as returnType that's why this 
works.

However, I agree the way we distinguish grouping udf and normal udf is not 
clean. Ideally we should have a cleaner way of defining such wrapping functions 
for different pandas_udf use cases. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...

2017-10-04 Thread icexelloss
Github user icexelloss commented on a diff in the pull request:

https://github.com/apache/spark/pull/18732#discussion_r142692448
  
--- Diff: python/pyspark/sql/tests.py ---
@@ -3106,8 +3106,9 @@ def assertFramesEqual(self, df_with_arrow, 
df_without):
 self.assertTrue(df_without.equals(df_with_arrow), msg=msg)
 
 def test_unsupported_datatype(self):
-schema = StructType([StructField("dt", DateType(), True)])
-df = self.spark.createDataFrame([(datetime.date(1970, 1, 1),)], 
schema=schema)
--- End diff --

Reverted.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...

2017-10-04 Thread icexelloss
Github user icexelloss commented on a diff in the pull request:

https://github.com/apache/spark/pull/18732#discussion_r142695501
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowEvalPythonExec.scala
 ---
@@ -26,6 +26,25 @@ import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.execution.SparkPlan
 import org.apache.spark.sql.types.StructType
 
+private class BatchIterator[T](iter: Iterator[T], batchSize: Int)
+  extends Iterator[Iterator[T]] {
+
+  override def hasNext: Boolean = iter.hasNext
+
+  override def next(): Iterator[T] = {
--- End diff --

+1. Let me add that.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...

2017-10-04 Thread icexelloss
Github user icexelloss commented on a diff in the pull request:

https://github.com/apache/spark/pull/18732#discussion_r142695129
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/object.scala
 ---
@@ -519,3 +519,18 @@ case class CoGroup(
 outputObjAttr: Attribute,
 left: LogicalPlan,
 right: LogicalPlan) extends BinaryNode with ObjectProducer
+
+case class FlatMapGroupsInPandas(
--- End diff --

Yes agreed. I will fix that.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...

2017-10-04 Thread icexelloss
Github user icexelloss commented on a diff in the pull request:

https://github.com/apache/spark/pull/18732#discussion_r142694835
  
--- Diff: python/pyspark/worker.py ---
@@ -74,17 +74,35 @@ def wrap_udf(f, return_type):
 
 
 def wrap_pandas_udf(f, return_type):
-arrow_return_type = toArrowType(return_type)
-
-def verify_result_length(*a):
-result = f(*a)
-if not hasattr(result, "__len__"):
-raise TypeError("Return type of pandas_udf should be a 
Pandas.Series")
-if len(result) != len(a[0]):
-raise RuntimeError("Result vector from pandas_udf was not the 
required length: "
-   "expected %d, got %d" % (len(a[0]), 
len(result)))
-return result
-return lambda *a: (verify_result_length(*a), arrow_return_type)
+if isinstance(return_type, StructType):
+arrow_return_types = [to_arrow_type(field.dataType) for field in 
return_type]
+
+def fn(*a):
+import pandas as pd
+out = f(*a)
+assert isinstance(out, pd.DataFrame), \
+'Return value from the user function is not a 
pandas.DataFrame.'
+assert len(out.columns) == len(arrow_return_types), \
+'Number of columns of the returned pd.DataFrame doesn\'t 
match output schema. ' \
--- End diff --

Good catch. Fixed. (Btw thanks for catching these small things)


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...

2017-10-04 Thread icexelloss
Github user icexelloss commented on a diff in the pull request:

https://github.com/apache/spark/pull/18732#discussion_r142691179
  
--- Diff: python/pyspark/sql/tests.py ---
@@ -3106,8 +3106,9 @@ def assertFramesEqual(self, df_with_arrow, 
df_without):
 self.assertTrue(df_without.equals(df_with_arrow), msg=msg)
 
 def test_unsupported_datatype(self):
-schema = StructType([StructField("dt", DateType(), True)])
-df = self.spark.createDataFrame([(datetime.date(1970, 1, 1),)], 
schema=schema)
--- End diff --

Oh actually DataType() -> TimestampType(), let me double check.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...

2017-10-04 Thread icexelloss
Github user icexelloss commented on a diff in the pull request:

https://github.com/apache/spark/pull/18732#discussion_r142704126
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/RelationalGroupedDataset.scala ---
@@ -435,6 +435,33 @@ class RelationalGroupedDataset protected[sql](
   df.logicalPlan.output,
   df.logicalPlan))
   }
+
+  private[sql] def flatMapGroupsInPandas(expr: PythonUDF): DataFrame = {
+require(expr.vectorized, "Must pass a vectorized python udf")
+
+val output = expr.dataType match {
+  case s: StructType => s.map {
+case StructField(name, dataType, nullable, metadata) =>
+  AttributeReference(name, dataType, nullable, metadata)()
+  }
+}
+
+val groupingAttributes: Seq[Attribute] = groupingExprs.map {
+  case ne: NamedExpression => ne.toAttribute
--- End diff --

Fixed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...

2017-10-04 Thread icexelloss
Github user icexelloss commented on a diff in the pull request:

https://github.com/apache/spark/pull/18732#discussion_r142703487
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/RelationalGroupedDataset.scala ---
@@ -435,6 +435,33 @@ class RelationalGroupedDataset protected[sql](
   df.logicalPlan.output,
   df.logicalPlan))
   }
+
+  private[sql] def flatMapGroupsInPandas(expr: PythonUDF): DataFrame = {
+require(expr.vectorized, "Must pass a vectorized python udf")
+
+val output = expr.dataType match {
+  case s: StructType => s.map {
+case StructField(name, dataType, nullable, metadata) =>
+  AttributeReference(name, dataType, nullable, metadata)()
+  }
+}
+
+val groupingAttributes: Seq[Attribute] = groupingExprs.map {
+  case ne: NamedExpression => ne.toAttribute
+}
+
+val plan = FlatMapGroupsInPandas(
+  groupingAttributes,
+  expr,
+  output,
+  df.logicalPlan
+)
--- End diff --

Fixed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...

2017-10-04 Thread icexelloss
Github user icexelloss commented on a diff in the pull request:

https://github.com/apache/spark/pull/18732#discussion_r142693843
  
--- Diff: python/pyspark/worker.py ---
@@ -74,17 +74,35 @@ def wrap_udf(f, return_type):
 
 
 def wrap_pandas_udf(f, return_type):
-arrow_return_type = toArrowType(return_type)
-
-def verify_result_length(*a):
-result = f(*a)
-if not hasattr(result, "__len__"):
-raise TypeError("Return type of pandas_udf should be a 
Pandas.Series")
-if len(result) != len(a[0]):
-raise RuntimeError("Result vector from pandas_udf was not the 
required length: "
-   "expected %d, got %d" % (len(a[0]), 
len(result)))
-return result
-return lambda *a: (verify_result_length(*a), arrow_return_type)
+if isinstance(return_type, StructType):
+arrow_return_types = [to_arrow_type(field.dataType) for field in 
return_type]
+
+def fn(*a):
+import pandas as pd
+out = f(*a)
--- End diff --

Yes that is more consistent. Let me change that.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...

2017-10-04 Thread icexelloss
Github user icexelloss commented on a diff in the pull request:

https://github.com/apache/spark/pull/18732#discussion_r142703829
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/RelationalGroupedDataset.scala ---
@@ -435,6 +435,33 @@ class RelationalGroupedDataset protected[sql](
   df.logicalPlan.output,
   df.logicalPlan))
   }
+
+  private[sql] def flatMapGroupsInPandas(expr: PythonUDF): DataFrame = {
+require(expr.vectorized, "Must pass a vectorized python udf")
+
+val output = expr.dataType match {
+  case s: StructType => s.map {
--- End diff --

Fixed


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...

2017-10-03 Thread icexelloss
Github user icexelloss commented on a diff in the pull request:

https://github.com/apache/spark/pull/18732#discussion_r142498880
  
--- Diff: python/pyspark/sql/group.py ---
@@ -194,6 +194,37 @@ def pivot(self, pivot_col, values=None):
 jgd = self._jgd.pivot(pivot_col, values)
 return GroupedData(jgd, self.sql_ctx)
 
+def apply(self, udf_obj):
+"""
+Maps each group of the current [[DataFrame]] using a pandas udf 
and returns the result
+as a :class:`DataFrame`.
+
+"""
+from pyspark.sql.functions import pandas_udf
+
+if not udf_obj._vectorized:
--- End diff --

Fixed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...

2017-10-03 Thread icexelloss
Github user icexelloss commented on a diff in the pull request:

https://github.com/apache/spark/pull/18732#discussion_r142498841
  
--- Diff: python/pyspark/worker.py ---
@@ -74,17 +75,33 @@ def wrap_udf(f, return_type):
 
 
 def wrap_pandas_udf(f, return_type):
-arrow_return_type = toArrowType(return_type)
-
-def verify_result_length(*a):
-result = f(*a)
-if not hasattr(result, "__len__"):
-raise TypeError("Return type of pandas_udf should be a 
Pandas.Series")
-if len(result) != len(a[0]):
-raise RuntimeError("Result vector from pandas_udf was not the 
required length: "
-   "expected %d, got %d" % (len(a[0]), 
len(result)))
-return result
-return lambda *a: (verify_result_length(*a), arrow_return_type)
+if isinstance(return_type, StructType):
+arrow_return_types = list(to_arrow_type(field.dataType) for field 
in return_type)
+
+def fn(*a):
+import pandas as pd
+out = f(*a)
+assert isinstance(out, pd.DataFrame), 'Must return a 
pd.DataFrame'
+assert len(out.columns) == len(arrow_return_types), \
+'Columns of pd.DataFrame don\'t match return schema'
--- End diff --

The result df actually don't have length required- it could be of different 
length.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...

2017-10-03 Thread icexelloss
Github user icexelloss commented on a diff in the pull request:

https://github.com/apache/spark/pull/18732#discussion_r142498939
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/python/FlatMapGroupsInPandasExec.scala
 ---
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.python
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.TaskContext
+import org.apache.spark.api.python.{ChainedPythonFunctions, PythonEvalType}
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.{Ascending, Attribute, 
AttributeSet, Expression, NamedExpression, SortOrder, UnsafeProjection}
+import 
org.apache.spark.sql.catalyst.plans.physical.{ClusteredDistribution, 
Distribution, Partitioning}
+import org.apache.spark.sql.execution.{GroupedIterator, SparkPlan, 
UnaryExecNode}
+
+case class FlatMapGroupsInPandasExec(
+groupingAttributes: Seq[Attribute],
+func: Expression,
+output: Seq[Attribute],
+child: SparkPlan)
+  extends UnaryExecNode {
+
+  private val pandasFunction = func.asInstanceOf[PythonUDF].func
+
+  override def outputPartitioning: Partitioning = child.outputPartitioning
+
+  override def producedAttributes: AttributeSet = AttributeSet(output)
+
+  override def requiredChildDistribution: Seq[Distribution] =
+ClusteredDistribution(groupingAttributes) :: Nil
+
+  override def requiredChildOrdering: Seq[Seq[SortOrder]] =
+Seq(groupingAttributes.map(SortOrder(_, Ascending)))
+
+  override protected def doExecute(): RDD[InternalRow] = {
+val inputRDD = child.execute()
+
+val bufferSize = inputRDD.conf.getInt("spark.buffer.size", 65536)
+val reuseWorker = 
inputRDD.conf.getBoolean("spark.python.worker.reuse", defaultValue = true)
+val chainedFunc = Seq(ChainedPythonFunctions(Seq(pandasFunction)))
+val argOffsets = Array((0 until child.schema.length).toArray)
+
+inputRDD.mapPartitionsInternal { iter =>
+  val grouped = GroupedIterator(iter, groupingAttributes, child.output)
+  val context = TaskContext.get()
+
+  val columnarBatchIter = new ArrowPythonRunner(
+chainedFunc, bufferSize, reuseWorker,
+PythonEvalType.SQL_PANDAS_UDF, argOffsets, child.schema)
+.compute(grouped.map(_._2), context.partitionId(), context)
+
+  val vectorRowIter = new Iterator[InternalRow] {
+private var currentIter = if (columnarBatchIter.hasNext) {
+  val batch = columnarBatchIter.next()
+  // assert(schemaOut.equals(batch.schema),
+  //  s"Invalid schema from pandas_udf: expected $schemaOut, got 
${batch.schema}")
--- End diff --

Removed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...

2017-10-03 Thread icexelloss
Github user icexelloss commented on a diff in the pull request:

https://github.com/apache/spark/pull/18732#discussion_r142482842
  
--- Diff: python/pyspark/sql/functions.py ---
@@ -2181,31 +2186,69 @@ def udf(f=None, returnType=StringType()):
 @since(2.3)
 def pandas_udf(f=None, returnType=StringType()):
 """
-Creates a :class:`Column` expression representing a user defined 
function (UDF) that accepts
-`Pandas.Series` as input arguments and outputs a `Pandas.Series` of 
the same length.
+Creates a :class:`Column` expression representing a vectorized user 
defined function (UDF).
+
+The user-defined function can define one of the following 
transformations:
+1. One or more `pandas.Series` -> A `pandas.Series`
+
+   This udf is used with `DataFrame.withColumn` and `DataFrame.select`.
+   The returnType should be a primitive data type, e.g., DoubleType()
+
+   Example:
+
+   >>> from pyspark.sql.types import IntegerType, StringType
+   >>> slen = pandas_udf(lambda s: s.str.len(), IntegerType())
+   >>> @pandas_udf(returnType=StringType())
+   ... def to_upper(s):
+   ... return s.str.upper()
+   ...
+   >>> @pandas_udf(returnType="integer")
+   ... def add_one(x):
+   ... return x + 1
+   ...
+   >>> df = spark.createDataFrame([(1, "John Doe", 21)], ("id", 
"name", "age"))
+   >>> df.select(slen("name").alias("slen(name)"), to_upper("name"), 
add_one("age")) \\
+   ... .show()  # doctest: +SKIP
+   +--+--++
+   |slen(name)|to_upper(name)|add_one(age)|
+   +--+--++
+   | 8|  JOHN DOE|  22|
+   +--+--++
+
+2. A `pandas.DataFrame` -> A `pandas.DataFrame`
+
+   This udf is used with `GroupedData.apply`
+   The returnType should be a StructType describing the schema of the 
returned
+   `pandas.DataFrame`.
+
+   Example:
+
+   >>> df = spark.createDataFrame([(1, 1.0), (1, 2.0), (2, 3.0), (2, 
4.0)], ("id", "v"))
+   >>> @pandas_udf(returnType=df.schema)
+   ... def normalize(df):
+   ... v = df.v
+   ... ret = df.assign(v=(v - v.mean()) / v.std())
+   >>> df.groupby('id').apply(normalize).show() # doctest: + SKIP
+  +---+---+
+  | id|  v|
+  +---+---+
+  |  1|-0.7071067811865475|
+  |  1| 0.7071067811865475|
+  |  2|-0.7071067811865475|
+  |  2| 0.7071067811865475|
+  +---+---+
+
+
+.. note:: The user-defined functions must be deterministic.
 
 :param f: python function if used as a standalone function
 :param returnType: a :class:`pyspark.sql.types.DataType` object
 
->>> from pyspark.sql.types import IntegerType, StringType
->>> slen = pandas_udf(lambda s: s.str.len(), IntegerType())
->>> @pandas_udf(returnType=StringType())
-... def to_upper(s):
-... return s.str.upper()
-...
->>> @pandas_udf(returnType="integer")
-... def add_one(x):
-... return x + 1
-...
->>> df = spark.createDataFrame([(1, "John Doe", 21)], ("id", "name", 
"age"))
->>> df.select(slen("name").alias("slen(name)"), to_upper("name"), 
add_one("age")) \\
-... .show()  # doctest: +SKIP
-+--+--++
-|slen(name)|to_upper(name)|add_one(age)|
-+--+--++
-| 8|  JOHN DOE|  22|
-+--+--++
 """
+import pandas as pd
+if isinstance(returnType, pd.Series):
--- End diff --

Oh that's neat. Thanks!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...

2017-10-03 Thread icexelloss
Github user icexelloss commented on a diff in the pull request:

https://github.com/apache/spark/pull/18732#discussion_r142486439
  
--- Diff: python/pyspark/sql/group.py ---
@@ -194,6 +194,37 @@ def pivot(self, pivot_col, values=None):
 jgd = self._jgd.pivot(pivot_col, values)
 return GroupedData(jgd, self.sql_ctx)
 
+def apply(self, udf_obj):
+"""
+Maps each group of the current [[DataFrame]] using a pandas udf 
and returns the result
+as a :class:`DataFrame`.
+
+"""
+from pyspark.sql.functions import pandas_udf
+
+if not udf_obj._vectorized:
--- End diff --

I ended up checking `hasattr(input, 'func')` to check if it's a valid 
input. It's not great but I don't what's better though.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...

2017-10-03 Thread icexelloss
Github user icexelloss commented on a diff in the pull request:

https://github.com/apache/spark/pull/18732#discussion_r142497616
  
--- Diff: python/pyspark/sql/functions.py ---
@@ -2120,6 +2120,7 @@ def wrapper(*args):
   else self.func.__class__.__module__)
 wrapper.func = self.func
 wrapper.returnType = self.returnType
+wrapper._vectorized = self._vectorized
--- End diff --

Good call. I think it should not be underscore too.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...

2017-10-03 Thread icexelloss
Github user icexelloss commented on a diff in the pull request:

https://github.com/apache/spark/pull/18732#discussion_r142500980
  
--- Diff: python/pyspark/sql/functions.py ---
@@ -2129,8 +2130,12 @@ def _create_udf(f, returnType, vectorized):
 def _udf(f, returnType=StringType(), vectorized=vectorized):
 if vectorized:
 import inspect
-if len(inspect.getargspec(f).args) == 0:
-raise NotImplementedError("0-parameter pandas_udfs are not 
currently supported")
+argspec = inspect.getargspec(f)
+if len(argspec.args) == 0 and argspec.varargs is None:
--- End diff --

Added.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...

2017-10-03 Thread icexelloss
Github user icexelloss commented on a diff in the pull request:

https://github.com/apache/spark/pull/18732#discussion_r142474570
  
--- Diff: python/pyspark/sql/group.py ---
@@ -194,6 +194,37 @@ def pivot(self, pivot_col, values=None):
 jgd = self._jgd.pivot(pivot_col, values)
 return GroupedData(jgd, self.sql_ctx)
 
+def apply(self, udf_obj):
+"""
+Maps each group of the current [[DataFrame]] using a pandas udf 
and returns the result
+as a :class:`DataFrame`.
+
+"""
+from pyspark.sql.functions import pandas_udf
+
+if not udf_obj._vectorized:
+raise ValueError("Must pass a pandas_udf")
--- End diff --

Changed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...

2017-10-03 Thread icexelloss
Github user icexelloss commented on a diff in the pull request:

https://github.com/apache/spark/pull/18732#discussion_r142499286
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/object.scala
 ---
@@ -519,3 +519,18 @@ case class CoGroup(
 outputObjAttr: Attribute,
 left: LogicalPlan,
 right: LogicalPlan) extends BinaryNode with ObjectProducer
+
+case class FlatMapGroupsInPandas(
--- End diff --

I like `FlatMapGroupsInPandas` a little better because 
`FlatMapGroupsInPython` doesn't imply it's using a vectorized udf.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...

2017-10-03 Thread icexelloss
Github user icexelloss commented on a diff in the pull request:

https://github.com/apache/spark/pull/18732#discussion_r142499092
  
--- Diff: python/pyspark/sql/functions.py ---
@@ -2120,6 +2120,7 @@ def wrapper(*args):
   else self.func.__class__.__module__)
 wrapper.func = self.func
 wrapper.returnType = self.returnType
+wrapper._vectorized = self._vectorized
--- End diff --

Fixed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...

2017-10-03 Thread icexelloss
Github user icexelloss commented on a diff in the pull request:

https://github.com/apache/spark/pull/18732#discussion_r142499712
  
--- Diff: python/pyspark/sql/functions.py ---
@@ -2129,8 +2130,12 @@ def _create_udf(f, returnType, vectorized):
 def _udf(f, returnType=StringType(), vectorized=vectorized):
 if vectorized:
 import inspect
-if len(inspect.getargspec(f).args) == 0:
-raise NotImplementedError("0-parameter pandas_udfs are not 
currently supported")
+argspec = inspect.getargspec(f)
+if len(argspec.args) == 0 and argspec.varargs is None:
--- End diff --

I think `varargs` are fine. I will add the test.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...

2017-10-03 Thread icexelloss
Github user icexelloss commented on a diff in the pull request:

https://github.com/apache/spark/pull/18732#discussion_r142478440
  
--- Diff: python/pyspark/sql/group.py ---
@@ -194,6 +194,37 @@ def pivot(self, pivot_col, values=None):
 jgd = self._jgd.pivot(pivot_col, values)
 return GroupedData(jgd, self.sql_ctx)
 
+def apply(self, udf_obj):
+"""
+Maps each group of the current [[DataFrame]] using a pandas udf 
and returns the result
+as a :class:`DataFrame`.
+
+"""
+from pyspark.sql.functions import pandas_udf
+
+if not udf_obj._vectorized:
--- End diff --

It seems `foo?` and `foo??` only shows the property doc string if `foo` is 
a `function`.

If `foo` is a `UserDefinedFunction`, it will show the docstring for the 
class.

For that reason, I think we should keep the return value of `udf` and 
`pandas_udf` to be a wrapped function. And I will do checks on `udf_obj.func` 
to see if this is a valid wrapped udf function. @BryanCutler what do you think?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...

2017-10-04 Thread icexelloss
Github user icexelloss commented on a diff in the pull request:

https://github.com/apache/spark/pull/18732#discussion_r142740947
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/RelationalGroupedDataset.scala ---
@@ -435,6 +435,33 @@ class RelationalGroupedDataset protected[sql](
   df.logicalPlan.output,
   df.logicalPlan))
   }
+
+  private[sql] def flatMapGroupsInPandas(expr: PythonUDF): DataFrame = {
+require(expr.vectorized, "Must pass a vectorized python udf")
+
+val output = expr.dataType match {
+  case s: StructType => s.map {
+case StructField(name, dataType, nullable, metadata) =>
+  AttributeReference(name, dataType, nullable, metadata)()
+  }
+}
+
+val groupingAttributes: Seq[Attribute] = groupingExprs.map {
+  case ne: NamedExpression => ne.toAttribute
+}
+
+val plan = FlatMapGroupsInPandas(
+  groupingAttributes,
+  expr,
+  output,
+  df.logicalPlan
+)
+
+Dataset.ofRows(
--- End diff --

Fixed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...

2017-10-03 Thread icexelloss
Github user icexelloss commented on a diff in the pull request:

https://github.com/apache/spark/pull/18732#discussion_r142518730
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowEvalPythonExec.scala
 ---
@@ -26,6 +26,28 @@ import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.execution.SparkPlan
 import org.apache.spark.sql.types.StructType
 
+private object BatchIterator {
+  class InnerIterator[T](iter: Iterator[T], batchSize: Int) extends 
Iterator[T] {
--- End diff --

Done.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...

2017-10-03 Thread icexelloss
Github user icexelloss commented on a diff in the pull request:

https://github.com/apache/spark/pull/18732#discussion_r142533141
  
--- Diff: python/pyspark/sql/functions.py ---
@@ -2058,7 +2058,7 @@ def __init__(self, func, returnType, name=None, 
vectorized=False):
 self._name = name or (
 func.__name__ if hasattr(func, '__name__')
 else func.__class__.__name__)
-self._vectorized = vectorized
+self.vectorized = vectorized
--- End diff --

I kind of dislike the inconsistency between `UserDefinedFunction` and its 
wrapped function. I think they are just the same thing except for the wrapped 
function has doc string. For ease of mind, I think we should make them either 
both private or public. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...

2017-10-03 Thread icexelloss
Github user icexelloss commented on a diff in the pull request:

https://github.com/apache/spark/pull/18732#discussion_r142523354
  
--- Diff: python/pyspark/sql/group.py ---
@@ -194,6 +194,65 @@ def pivot(self, pivot_col, values=None):
 jgd = self._jgd.pivot(pivot_col, values)
 return GroupedData(jgd, self.sql_ctx)
 
+def apply(self, udf):
+"""
+Maps each group of the current :class:`DataFrame` using a pandas 
udf and returns the result
+as a :class:`DataFrame`.
+
+The user-function should take a `pandas.DataFrame` and return 
another `pandas.DataFrame`.
+Each group is passed as a `pandas.DataFrame` to the user-function 
and the returned
+`pandas.DataFrame` are combined as a :class:`DataFrame`. The 
returned `pandas.DataFrame`
+can be arbitrary length and its schema should match the returnType 
of the pandas udf.
+
+:param udf: A wrapped function returned by `pandas_udf`
+
+>>> df = spark.createDataFrame(
+... [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
+... ("id", "v"))
+>>> @pandas_udf(returnType=df.schema)
+... def normalize(pdf):
+... v = pdf.v
+... return pdf.assign(v=(v - v.mean()) / v.std())
+>>> df.groupby('id').apply(normalize).show() # doctest: + SKIP
++---+---+
+| id|  v|
++---+---+
+|  1|-0.7071067811865475|
+|  1| 0.7071067811865475|
+|  2|-0.8320502943378437|
+|  2|-0.2773500981126146|
+|  2| 1.1094003924504583|
++---+---+
+
+.. seealso:: :meth:`pyspark.sql.functions.pandas_udf`
+
+"""
+from pyspark.sql.functions import pandas_udf
+
+# Columns are special because hasattr always return True
+if isinstance(udf, Column) or not hasattr(udf, 'func') or not 
udf.vectorized:
+raise ValueError("The argument to apply must be a pandas_udf")
+if not isinstance(udf.returnType, StructType):
+raise ValueError("The returnType of the pandas_udf must be a 
StructType")
+
+df = DataFrame(self._jgd.df(), self.sql_ctx)
+func = udf.func
+returnType = udf.returnType
+
+# The python executors expects the function to take a list of 
pd.Series as input
+# So we to create a wrapper function that turns that to a 
pd.DataFrame before passing
+# down to the user function
+columns = df.columns
+
+def wrapped(*cols):
+import pandas as pd
+return func(pd.concat(cols, axis=1, keys=columns))
--- End diff --

I think we need to think a little more about how do we handle different 
formats of arrow data. 

Currently, the input of arrow serializer is a list of (pd.Series, 
DataType), I feel it's cleaner that this class not deal with type coercion and 
just serialization. It could take a `pyarrow.Table` for instance and let caller 
construct the `pyarrow.Table`.

Another thing to think about is whatever the data we are passing are not 
purely `pd.Series` and `pd.DataFrame`. What if, for instance, we want to 
serialize a (pd.Series, pd.DataFrame) tuple or a tuple of (scalar value, 
pd.DataFrame). Maybe somehow making the serializer composable is more 
flexiable. i.e. a class knows how to serialize `pd.Series`, a class knows how 
to serialize `pd.DataFrame` and if we want to serialize (pd.Series, 
pd.DataFrame) tuple we can compose them.




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...

2017-10-03 Thread icexelloss
Github user icexelloss commented on a diff in the pull request:

https://github.com/apache/spark/pull/18732#discussion_r142482577
  
--- Diff: python/pyspark/sql/group.py ---
@@ -194,6 +194,37 @@ def pivot(self, pivot_col, values=None):
 jgd = self._jgd.pivot(pivot_col, values)
 return GroupedData(jgd, self.sql_ctx)
 
+def apply(self, udf_obj):
+"""
+Maps each group of the current [[DataFrame]] using a pandas udf 
and returns the result
+as a :class:`DataFrame`.
+
+"""
+from pyspark.sql.functions import pandas_udf
+
+if not udf_obj._vectorized:
+raise ValueError("Must pass a pandas_udf")
+if not isinstance(udf_obj.returnType, StructType):
+raise ValueError("Must pass a StructType as return type in 
pandas_udf")
--- End diff --

Fixed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...

2017-10-03 Thread icexelloss
Github user icexelloss commented on a diff in the pull request:

https://github.com/apache/spark/pull/18732#discussion_r142571075
  
--- Diff: python/pyspark/worker.py ---
@@ -32,8 +32,9 @@
 from pyspark.serializers import write_with_length, write_int, read_long, \
 write_long, read_int, SpecialLengths, PythonEvalType, 
UTF8Deserializer, PickleSerializer, \
 BatchedSerializer, ArrowStreamPandasSerializer
-from pyspark.sql.types import toArrowType
+from pyspark.sql.types import to_arrow_type
 from pyspark import shuffle
+from pyspark.sql.types import StructType
--- End diff --

Good catch. Fixed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...

2017-10-03 Thread icexelloss
Github user icexelloss commented on a diff in the pull request:

https://github.com/apache/spark/pull/18732#discussion_r142571047
  
--- Diff: python/pyspark/sql/tests.py ---
@@ -3376,6 +3377,132 @@ def test_vectorized_udf_empty_partition(self):
 res = df.select(f(col('id')))
 self.assertEquals(df.collect(), res.collect())
 
+def test_vectorized_udf_varargs(self):
+from pyspark.sql.functions import pandas_udf, col
+df = self.spark.createDataFrame(self.sc.parallelize([Row(id=1)], 
2))
+f = pandas_udf(lambda *v: v[0], LongType())
+res = df.select(f(col('id')))
+self.assertEquals(df.collect(), res.collect())
+
+
+@unittest.skipIf(not _have_pandas or not _have_arrow, "Pandas or Arrow not 
installed")
+class GroupbyApplyTests(ReusedPySparkTestCase):
+@classmethod
+def setUpClass(cls):
+ReusedPySparkTestCase.setUpClass()
+cls.spark = SparkSession(cls.sc)
+
+@classmethod
+def tearDownClass(cls):
+ReusedPySparkTestCase.tearDownClass()
+cls.spark.stop()
+
+def assertFramesEqual(self, expected, result):
+msg = ("DataFrames are not equal: " +
+   ("\n\nExpected:\n%s\n%s" % (expected, expected.dtypes)) +
+   ("\n\nResult:\n%s\n%s" % (result, result.dtypes)))
+self.assertTrue(expected.equals(result), msg=msg)
+
+@property
+def data(self):
+from pyspark.sql.functions import pandas_udf, array, explode, col, 
lit
+return self.spark.range(10).toDF('id') \
+.withColumn("vs", array([lit(i) for i in range(20, 30)])) \
+.withColumn("v", explode(col('vs'))).drop('vs')
+
+def test_simple(self):
+from pyspark.sql.functions import pandas_udf
+df = self.data
+
+def foo(df):
+ret = df
+ret = ret.assign(v1=df.v * df.id * 1.0)
+ret = ret.assign(v2=df.v + df.id)
+return ret
+
+foo_udf = pandas_udf(
+foo,
+StructType(
+[StructField('id', LongType()),
+ StructField('v', IntegerType()),
+ StructField('v1', DoubleType()),
+ StructField('v2', LongType())]))
+
+result = df.groupby('id').apply(foo_udf).sort('id').toPandas()
+expected = 
df.toPandas().groupby('id').apply(foo).reset_index(drop=True)
+self.assertFramesEqual(expected, result)
+
+def test_decorator(self):
+from pyspark.sql.functions import pandas_udf
+df = self.data
+
+@pandas_udf(StructType(
+[StructField('id', LongType()),
+ StructField('v', IntegerType()),
+ StructField('v1', DoubleType()),
+ StructField('v2', LongType())]))
+def foo(df):
+ret = df
+ret = ret.assign(v1=df.v * df.id * 1.0)
+ret = ret.assign(v2=df.v + df.id)
+return ret
--- End diff --

Fixed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...

2017-10-03 Thread icexelloss
Github user icexelloss commented on a diff in the pull request:

https://github.com/apache/spark/pull/18732#discussion_r142571038
  
--- Diff: python/pyspark/sql/tests.py ---
@@ -3376,6 +3377,132 @@ def test_vectorized_udf_empty_partition(self):
 res = df.select(f(col('id')))
 self.assertEquals(df.collect(), res.collect())
 
+def test_vectorized_udf_varargs(self):
+from pyspark.sql.functions import pandas_udf, col
+df = self.spark.createDataFrame(self.sc.parallelize([Row(id=1)], 
2))
+f = pandas_udf(lambda *v: v[0], LongType())
+res = df.select(f(col('id')))
+self.assertEquals(df.collect(), res.collect())
+
+
+@unittest.skipIf(not _have_pandas or not _have_arrow, "Pandas or Arrow not 
installed")
+class GroupbyApplyTests(ReusedPySparkTestCase):
+@classmethod
+def setUpClass(cls):
+ReusedPySparkTestCase.setUpClass()
+cls.spark = SparkSession(cls.sc)
+
+@classmethod
+def tearDownClass(cls):
+ReusedPySparkTestCase.tearDownClass()
+cls.spark.stop()
+
+def assertFramesEqual(self, expected, result):
+msg = ("DataFrames are not equal: " +
+   ("\n\nExpected:\n%s\n%s" % (expected, expected.dtypes)) +
+   ("\n\nResult:\n%s\n%s" % (result, result.dtypes)))
+self.assertTrue(expected.equals(result), msg=msg)
+
+@property
+def data(self):
+from pyspark.sql.functions import pandas_udf, array, explode, col, 
lit
+return self.spark.range(10).toDF('id') \
+.withColumn("vs", array([lit(i) for i in range(20, 30)])) \
+.withColumn("v", explode(col('vs'))).drop('vs')
+
+def test_simple(self):
+from pyspark.sql.functions import pandas_udf
+df = self.data
+
+def foo(df):
+ret = df
+ret = ret.assign(v1=df.v * df.id * 1.0)
+ret = ret.assign(v2=df.v + df.id)
+return ret
+
+foo_udf = pandas_udf(
+foo,
+StructType(
+[StructField('id', LongType()),
+ StructField('v', IntegerType()),
+ StructField('v1', DoubleType()),
+ StructField('v2', LongType())]))
+
+result = df.groupby('id').apply(foo_udf).sort('id').toPandas()
+expected = 
df.toPandas().groupby('id').apply(foo).reset_index(drop=True)
+self.assertFramesEqual(expected, result)
+
+def test_decorator(self):
+from pyspark.sql.functions import pandas_udf
+df = self.data
+
+@pandas_udf(StructType(
+[StructField('id', LongType()),
+ StructField('v', IntegerType()),
+ StructField('v1', DoubleType()),
+ StructField('v2', LongType())]))
+def foo(df):
+ret = df
+ret = ret.assign(v1=df.v * df.id * 1.0)
+ret = ret.assign(v2=df.v + df.id)
+return ret
+
+result = df.groupby('id').apply(foo).sort('id').toPandas()
+expected = 
df.toPandas().groupby('id').apply(foo.func).reset_index(drop=True)
+self.assertFramesEqual(expected, result)
+
+def test_coerce(self):
+from pyspark.sql.functions import pandas_udf
+df = self.data
+
+def foo(df):
+ret = df
+ret = ret.assign(v=df.v + 1)
+return ret
+
+@pandas_udf(StructType([StructField('id', LongType()), 
StructField('v', DoubleType())]))
+def foo(df):
--- End diff --

Fixed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...

2017-10-03 Thread icexelloss
Github user icexelloss commented on a diff in the pull request:

https://github.com/apache/spark/pull/18732#discussion_r142571056
  
--- Diff: python/pyspark/sql/tests.py ---
@@ -3376,6 +3377,133 @@ def test_vectorized_udf_empty_partition(self):
 res = df.select(f(col('id')))
 self.assertEquals(df.collect(), res.collect())
 
+def test_vectorized_udf_varargs(self):
+from pyspark.sql.functions import pandas_udf, col
+df = self.spark.createDataFrame(self.sc.parallelize([Row(id=1)], 
2))
+f = pandas_udf(lambda *v: v[0], LongType())
+res = df.select(f(col('id')))
+self.assertEquals(df.collect(), res.collect())
+
+
+@unittest.skipIf(not _have_pandas or not _have_arrow, "Pandas or Arrow not 
installed")
+class GroupbyApplyTests(ReusedPySparkTestCase):
+@classmethod
+def setUpClass(cls):
+ReusedPySparkTestCase.setUpClass()
+cls.spark = SparkSession(cls.sc)
+
+@classmethod
+def tearDownClass(cls):
+ReusedPySparkTestCase.tearDownClass()
+cls.spark.stop()
+
+def assertFramesEqual(self, expected, result):
+msg = ("DataFrames are not equal: " +
+   ("\n\nExpected:\n%s\n%s" % (expected, expected.dtypes)) +
+   ("\n\nResult:\n%s\n%s" % (result, result.dtypes)))
+self.assertTrue(expected.equals(result), msg=msg)
+
+@property
+def data(self):
+from pyspark.sql.functions import pandas_udf, array, explode, col, 
lit
+return self.spark.range(10).toDF('id') \
+.withColumn("vs", array([lit(i) for i in range(20, 30)])) \
+.withColumn("v", explode(col('vs'))).drop('vs')
+
+def test_simple(self):
+from pyspark.sql.functions import pandas_udf
+df = self.data
+
+def foo(df):
+ret = df
+ret = ret.assign(v1=df.v * df.id * 1.0)
+ret = ret.assign(v2=df.v + df.id)
+return ret
--- End diff --

Fixed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...

2017-10-03 Thread icexelloss
Github user icexelloss commented on a diff in the pull request:

https://github.com/apache/spark/pull/18732#discussion_r142571653
  
--- Diff: python/pyspark/worker.py ---
@@ -74,17 +75,37 @@ def wrap_udf(f, return_type):
 
 
 def wrap_pandas_udf(f, return_type):
-arrow_return_type = toArrowType(return_type)
-
-def verify_result_length(*a):
-result = f(*a)
-if not hasattr(result, "__len__"):
-raise TypeError("Return type of pandas_udf should be a 
Pandas.Series")
-if len(result) != len(a[0]):
-raise RuntimeError("Result vector from pandas_udf was not the 
required length: "
-   "expected %d, got %d" % (len(a[0]), 
len(result)))
-return result
-return lambda *a: (verify_result_length(*a), arrow_return_type)
+if isinstance(return_type, StructType):
+import pyarrow as pa
--- End diff --

Removed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...

2017-10-03 Thread icexelloss
Github user icexelloss commented on a diff in the pull request:

https://github.com/apache/spark/pull/18732#discussion_r142571660
  
--- Diff: python/pyspark/worker.py ---
@@ -74,17 +75,37 @@ def wrap_udf(f, return_type):
 
 
 def wrap_pandas_udf(f, return_type):
-arrow_return_type = toArrowType(return_type)
-
-def verify_result_length(*a):
-result = f(*a)
-if not hasattr(result, "__len__"):
-raise TypeError("Return type of pandas_udf should be a 
Pandas.Series")
-if len(result) != len(a[0]):
-raise RuntimeError("Result vector from pandas_udf was not the 
required length: "
-   "expected %d, got %d" % (len(a[0]), 
len(result)))
-return result
-return lambda *a: (verify_result_length(*a), arrow_return_type)
+if isinstance(return_type, StructType):
+import pyarrow as pa
+
+arrow_return_types = list(to_arrow_type(field.dataType) for field 
in return_type)
--- End diff --

Fixed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...

2017-10-03 Thread icexelloss
Github user icexelloss commented on a diff in the pull request:

https://github.com/apache/spark/pull/18732#discussion_r142572356
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/RelationalGroupedDataset.scala ---
@@ -47,7 +47,7 @@ import org.apache.spark.sql.types.StructType
  */
 @InterfaceStability.Stable
 class RelationalGroupedDataset protected[sql](
-df: DataFrame,
+val df: DataFrame,
--- End diff --

Reverted.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...

2017-10-03 Thread icexelloss
Github user icexelloss commented on a diff in the pull request:

https://github.com/apache/spark/pull/18732#discussion_r142572643
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowEvalPythonExec.scala
 ---
@@ -44,14 +63,22 @@ case class ArrowEvalPythonExec(udfs: Seq[PythonUDF], 
output: Seq[Attribute], chi
 val schemaOut = 
StructType.fromAttributes(output.drop(child.output.length).zipWithIndex
   .map { case (attr, i) => attr.withName(s"_$i") })
 
+val batchSize = conf.arrowMaxRecordsPerBatch
+
+val batchIter = if (batchSize > 0) {
+  new BatchIterator(iter, batchSize)
+} else {
+  Iterator(iter)
+}
--- End diff --

I changed to one line. I think more concise.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...

2017-10-09 Thread icexelloss
Github user icexelloss commented on a diff in the pull request:

https://github.com/apache/spark/pull/18732#discussion_r143507748
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/object.scala
 ---
@@ -519,3 +519,18 @@ case class CoGroup(
 outputObjAttr: Attribute,
 left: LogicalPlan,
 right: LogicalPlan) extends BinaryNode with ObjectProducer
+
+case class FlatMapGroupsInPandas(
--- End diff --

@HyukjinKwon Thanks for catching this. I added docs for 
`FlatMapGroupsInPandas` (function) `FlatMapGroupsInPandas` (logical node) and 
`FlatMapGroupsInPandasExec` and cross referenced them.

@rxin I created file `pythonLogicalOperators` under 
`org.apache.spark.sql.catalyst.plans.logical` and move `FlatMapGroupsInPandas` 
under that file.



---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...

2017-10-09 Thread icexelloss
Github user icexelloss commented on a diff in the pull request:

https://github.com/apache/spark/pull/18732#discussion_r143506845
  
--- Diff: python/pyspark/sql/group.py ---
@@ -192,7 +193,69 @@ def pivot(self, pivot_col, values=None):
 jgd = self._jgd.pivot(pivot_col)
 else:
 jgd = self._jgd.pivot(pivot_col, values)
-return GroupedData(jgd, self.sql_ctx)
+return GroupedData(jgd, self._df)
+
+@since(2.3)
+def apply(self, udf):
+"""
+Maps each group of the current :class:`DataFrame` using a pandas 
udf and returns the result
+as a :class:`DataFrame`.
+
+The user-defined function should take a `pandas.DataFrame` and 
return another
+`pandas.DataFrame`. For each group, all columns are passed 
together as a `pandas.DataFrame`
+to the user-function and the returned `pandas.DataFrame` are 
combined as a
+:class:`DataFrame`. The returned `pandas.DataFrame` can be 
arbitrary length and its schema
+must match the returnType of the pandas udf.
+
+:param udf: A wrapped udf function returned by 
:meth:`pyspark.sql.functions.pandas_udf`
+
+>>> from pyspark.sql.functions import pandas_udf
+>>> df = spark.createDataFrame(
+... [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
+... ("id", "v"))
+>>> @pandas_udf(returnType=df.schema)
+... def normalize(pdf):
+... v = pdf.v
+... return pdf.assign(v=(v - v.mean()) / v.std())
+>>> df.groupby('id').apply(normalize).show()  # doctest: +SKIP
++---+---+
+| id|  v|
++---+---+
+|  1|-0.7071067811865475|
+|  1| 0.7071067811865475|
+|  2|-0.8320502943378437|
+|  2|-0.2773500981126146|
+|  2| 1.1094003924504583|
++---+---+
+
+.. seealso:: :meth:`pyspark.sql.functions.pandas_udf`
+
+"""
+from pyspark.sql.functions import pandas_udf
+
+# Columns are special because hasattr always return True
+if isinstance(udf, Column) or not hasattr(udf, 'func') or not 
udf.vectorized:
+raise ValueError("The argument to apply must be a pandas_udf")
+if not isinstance(udf.returnType, StructType):
+raise ValueError("The returnType of the pandas_udf must be a 
StructType")
+
+df = self._df
+func = udf.func
+returnType = udf.returnType
--- End diff --

I actually like it because I think it's more readable this way.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...

2017-10-10 Thread icexelloss
Github user icexelloss commented on a diff in the pull request:

https://github.com/apache/spark/pull/18732#discussion_r143740636
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/object.scala
 ---
@@ -519,3 +519,4 @@ case class CoGroup(
 outputObjAttr: Attribute,
 left: LogicalPlan,
 right: LogicalPlan) extends BinaryNode with ObjectProducer
+
--- End diff --

Reverted


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...

2017-10-10 Thread icexelloss
Github user icexelloss commented on a diff in the pull request:

https://github.com/apache/spark/pull/18732#discussion_r143741944
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowEvalPythonExec.scala
 ---
@@ -44,14 +73,18 @@ case class ArrowEvalPythonExec(udfs: Seq[PythonUDF], 
output: Seq[Attribute], chi
 val schemaOut = 
StructType.fromAttributes(output.drop(child.output.length).zipWithIndex
   .map { case (attr, i) => attr.withName(s"_$i") })
 
+val batchSize = conf.arrowMaxRecordsPerBatch
+// DO NOT use iter.grouped(). See BatchIterator.
+val batchIter = if (batchSize > 0) new BatchIterator(iter, batchSize) 
else Iterator(iter)
+
 val columnarBatchIter = new ArrowPythonRunner(
-funcs, conf.arrowMaxRecordsPerBatch, bufferSize, reuseWorker,
+funcs, bufferSize, reuseWorker,
 PythonEvalType.SQL_PANDAS_UDF, argOffsets, schema)
-  .compute(iter, context.partitionId(), context)
+  .compute(batchIter, context.partitionId(), context)
 
 new Iterator[InternalRow] {
 
-  var currentIter = if (columnarBatchIter.hasNext) {
+  private var currentIter = if (columnarBatchIter.hasNext) {
--- End diff --

I think so. The variable is reassigned for each columnar batch


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...

2017-10-10 Thread icexelloss
Github user icexelloss commented on a diff in the pull request:

https://github.com/apache/spark/pull/18732#discussion_r143740157
  
--- Diff: python/pyspark/sql/tests.py ---
@@ -3376,6 +3376,151 @@ def test_vectorized_udf_empty_partition(self):
 res = df.select(f(col('id')))
 self.assertEquals(df.collect(), res.collect())
 
+def test_vectorized_udf_varargs(self):
+from pyspark.sql.functions import pandas_udf, col
+df = self.spark.createDataFrame(self.sc.parallelize([Row(id=1)], 
2))
+f = pandas_udf(lambda *v: v[0], LongType())
+res = df.select(f(col('id')))
+self.assertEquals(df.collect(), res.collect())
+
+
+@unittest.skipIf(not _have_pandas or not _have_arrow, "Pandas or Arrow not 
installed")
+class GroupbyApplyTests(ReusedPySparkTestCase):
+@classmethod
+def setUpClass(cls):
+ReusedPySparkTestCase.setUpClass()
+cls.spark = SparkSession(cls.sc)
+
+@classmethod
+def tearDownClass(cls):
+ReusedPySparkTestCase.tearDownClass()
+cls.spark.stop()
+
+def assertFramesEqual(self, expected, result):
+msg = ("DataFrames are not equal: " +
+   ("\n\nExpected:\n%s\n%s" % (expected, expected.dtypes)) +
+   ("\n\nResult:\n%s\n%s" % (result, result.dtypes)))
+self.assertTrue(expected.equals(result), msg=msg)
+
+@property
+def data(self):
+from pyspark.sql.functions import array, explode, col, lit
+return self.spark.range(10).toDF('id') \
+.withColumn("vs", array([lit(i) for i in range(20, 30)])) \
+.withColumn("v", explode(col('vs'))).drop('vs')
+
+def test_simple(self):
+from pyspark.sql.functions import pandas_udf
+df = self.data
+
+foo_udf = pandas_udf(
+lambda df: df.assign(v1=df.v * df.id * 1.0, v2=df.v + df.id),
+StructType(
+[StructField('id', LongType()),
+ StructField('v', IntegerType()),
+ StructField('v1', DoubleType()),
+ StructField('v2', LongType())]))
+
+result = df.groupby('id').apply(foo_udf).sort('id').toPandas()
+expected = 
df.toPandas().groupby('id').apply(foo_udf.func).reset_index(drop=True)
+self.assertFramesEqual(expected, result)
+
+def test_decorator(self):
+from pyspark.sql.functions import pandas_udf
+df = self.data
+
+@pandas_udf(StructType(
+[StructField('id', LongType()),
+ StructField('v', IntegerType()),
+ StructField('v1', DoubleType()),
+ StructField('v2', LongType())]))
+def foo(df):
+return df.assign(v1=df.v * df.id * 1.0, v2=df.v + df.id)
+
+result = df.groupby('id').apply(foo).sort('id').toPandas()
+expected = 
df.toPandas().groupby('id').apply(foo.func).reset_index(drop=True)
+self.assertFramesEqual(expected, result)
+
+def test_coerce(self):
+from pyspark.sql.functions import pandas_udf
+df = self.data
+
+foo = pandas_udf(
+lambda df: df,
+StructType([StructField('id', LongType()), StructField('v', 
DoubleType())]))
+
+result = df.groupby('id').apply(foo).sort('id').toPandas()
+expected = 
df.toPandas().groupby('id').apply(foo.func).reset_index(drop=True)
+expected = expected.assign(v=expected.v.astype('float64'))
+self.assertFramesEqual(expected, result)
+
+def test_complex_groupby(self):
+from pyspark.sql.functions import pandas_udf, col
+df = self.data
+
+@pandas_udf(StructType(
+[StructField('id', LongType()),
+ StructField('v', IntegerType()),
+ StructField('norm', DoubleType())]))
+def normalize(pdf):
+v = pdf.v
+return pdf.assign(norm=(v - v.mean()) / v.std())
+
+result = df.groupby(col('id') % 2 == 
0).apply(normalize).sort('id', 'v').toPandas()
+pdf = df.toPandas()
+expected = pdf.groupby(pdf['id'] % 2 == 0).apply(normalize.func)
+expected = expected.sort_values(['id', 'v']).reset_index(drop=True)
+expected = expected.assign(norm=expected.norm.astype('float64'))
+self.assertFramesEqual(expected, result)
+
+def test_empty_groupby(self):
+from pyspark.sql.functions import pandas_udf, col
+df = self.data
+
+@pandas_udf(StructType(
+[StructField('id',

[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...

2017-10-10 Thread icexelloss
Github user icexelloss commented on a diff in the pull request:

https://github.com/apache/spark/pull/18732#discussion_r143740078
  
--- Diff: python/pyspark/sql/tests.py ---
@@ -3376,6 +3376,151 @@ def test_vectorized_udf_empty_partition(self):
 res = df.select(f(col('id')))
 self.assertEquals(df.collect(), res.collect())
 
+def test_vectorized_udf_varargs(self):
+from pyspark.sql.functions import pandas_udf, col
+df = self.spark.createDataFrame(self.sc.parallelize([Row(id=1)], 
2))
+f = pandas_udf(lambda *v: v[0], LongType())
+res = df.select(f(col('id')))
+self.assertEquals(df.collect(), res.collect())
+
+
+@unittest.skipIf(not _have_pandas or not _have_arrow, "Pandas or Arrow not 
installed")
+class GroupbyApplyTests(ReusedPySparkTestCase):
+@classmethod
+def setUpClass(cls):
+ReusedPySparkTestCase.setUpClass()
+cls.spark = SparkSession(cls.sc)
+
+@classmethod
+def tearDownClass(cls):
+ReusedPySparkTestCase.tearDownClass()
+cls.spark.stop()
+
+def assertFramesEqual(self, expected, result):
+msg = ("DataFrames are not equal: " +
+   ("\n\nExpected:\n%s\n%s" % (expected, expected.dtypes)) +
+   ("\n\nResult:\n%s\n%s" % (result, result.dtypes)))
+self.assertTrue(expected.equals(result), msg=msg)
+
+@property
+def data(self):
+from pyspark.sql.functions import array, explode, col, lit
+return self.spark.range(10).toDF('id') \
+.withColumn("vs", array([lit(i) for i in range(20, 30)])) \
+.withColumn("v", explode(col('vs'))).drop('vs')
+
+def test_simple(self):
+from pyspark.sql.functions import pandas_udf
+df = self.data
+
+foo_udf = pandas_udf(
+lambda df: df.assign(v1=df.v * df.id * 1.0, v2=df.v + df.id),
+StructType(
+[StructField('id', LongType()),
+ StructField('v', IntegerType()),
+ StructField('v1', DoubleType()),
+ StructField('v2', LongType())]))
+
+result = df.groupby('id').apply(foo_udf).sort('id').toPandas()
+expected = 
df.toPandas().groupby('id').apply(foo_udf.func).reset_index(drop=True)
+self.assertFramesEqual(expected, result)
+
+def test_decorator(self):
+from pyspark.sql.functions import pandas_udf
+df = self.data
+
+@pandas_udf(StructType(
+[StructField('id', LongType()),
+ StructField('v', IntegerType()),
+ StructField('v1', DoubleType()),
+ StructField('v2', LongType())]))
+def foo(df):
+return df.assign(v1=df.v * df.id * 1.0, v2=df.v + df.id)
--- End diff --

Fixed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...

2017-10-10 Thread icexelloss
Github user icexelloss commented on a diff in the pull request:

https://github.com/apache/spark/pull/18732#discussion_r143740129
  
--- Diff: python/pyspark/sql/tests.py ---
@@ -3376,6 +3376,151 @@ def test_vectorized_udf_empty_partition(self):
 res = df.select(f(col('id')))
 self.assertEquals(df.collect(), res.collect())
 
+def test_vectorized_udf_varargs(self):
+from pyspark.sql.functions import pandas_udf, col
+df = self.spark.createDataFrame(self.sc.parallelize([Row(id=1)], 
2))
+f = pandas_udf(lambda *v: v[0], LongType())
+res = df.select(f(col('id')))
+self.assertEquals(df.collect(), res.collect())
+
+
+@unittest.skipIf(not _have_pandas or not _have_arrow, "Pandas or Arrow not 
installed")
+class GroupbyApplyTests(ReusedPySparkTestCase):
+@classmethod
+def setUpClass(cls):
+ReusedPySparkTestCase.setUpClass()
+cls.spark = SparkSession(cls.sc)
+
+@classmethod
+def tearDownClass(cls):
+ReusedPySparkTestCase.tearDownClass()
+cls.spark.stop()
+
+def assertFramesEqual(self, expected, result):
+msg = ("DataFrames are not equal: " +
+   ("\n\nExpected:\n%s\n%s" % (expected, expected.dtypes)) +
+   ("\n\nResult:\n%s\n%s" % (result, result.dtypes)))
+self.assertTrue(expected.equals(result), msg=msg)
+
+@property
+def data(self):
+from pyspark.sql.functions import array, explode, col, lit
+return self.spark.range(10).toDF('id') \
+.withColumn("vs", array([lit(i) for i in range(20, 30)])) \
+.withColumn("v", explode(col('vs'))).drop('vs')
+
+def test_simple(self):
+from pyspark.sql.functions import pandas_udf
+df = self.data
+
+foo_udf = pandas_udf(
+lambda df: df.assign(v1=df.v * df.id * 1.0, v2=df.v + df.id),
+StructType(
+[StructField('id', LongType()),
+ StructField('v', IntegerType()),
+ StructField('v1', DoubleType()),
+ StructField('v2', LongType())]))
+
+result = df.groupby('id').apply(foo_udf).sort('id').toPandas()
+expected = 
df.toPandas().groupby('id').apply(foo_udf.func).reset_index(drop=True)
+self.assertFramesEqual(expected, result)
+
+def test_decorator(self):
+from pyspark.sql.functions import pandas_udf
+df = self.data
+
+@pandas_udf(StructType(
+[StructField('id', LongType()),
+ StructField('v', IntegerType()),
+ StructField('v1', DoubleType()),
+ StructField('v2', LongType())]))
+def foo(df):
+return df.assign(v1=df.v * df.id * 1.0, v2=df.v + df.id)
+
+result = df.groupby('id').apply(foo).sort('id').toPandas()
+expected = 
df.toPandas().groupby('id').apply(foo.func).reset_index(drop=True)
+self.assertFramesEqual(expected, result)
+
+def test_coerce(self):
+from pyspark.sql.functions import pandas_udf
+df = self.data
+
+foo = pandas_udf(
+lambda df: df,
--- End diff --

Fixed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...

2017-10-10 Thread icexelloss
Github user icexelloss commented on a diff in the pull request:

https://github.com/apache/spark/pull/18732#discussion_r143744197
  
--- Diff: python/pyspark/sql/functions.py ---
@@ -2181,30 +2187,66 @@ def udf(f=None, returnType=StringType()):
 @since(2.3)
 def pandas_udf(f=None, returnType=StringType()):
 """
-Creates a :class:`Column` expression representing a user defined 
function (UDF) that accepts
-`Pandas.Series` as input arguments and outputs a `Pandas.Series` of 
the same length.
+Creates a vectorized user defined function (UDF).
 
-:param f: python function if used as a standalone function
+:param f: user-defined function. A python function if used as a 
standalone function
 :param returnType: a :class:`pyspark.sql.types.DataType` object
 
->>> from pyspark.sql.types import IntegerType, StringType
->>> slen = pandas_udf(lambda s: s.str.len(), IntegerType())
->>> @pandas_udf(returnType=StringType())
-... def to_upper(s):
-... return s.str.upper()
-...
->>> @pandas_udf(returnType="integer")
-... def add_one(x):
-... return x + 1
-...
->>> df = spark.createDataFrame([(1, "John Doe", 21)], ("id", "name", 
"age"))
->>> df.select(slen("name").alias("slen(name)"), to_upper("name"), 
add_one("age")) \\
-... .show()  # doctest: +SKIP
-+--+--++
-|slen(name)|to_upper(name)|add_one(age)|
-+--+--++
-| 8|  JOHN DOE|  22|
-+--+--++
+The user-defined function can define one of the following 
transformations:
+
+1. One or more `pandas.Series` -> A `pandas.Series`
+
+   This udf is used with :meth:`pyspark.sql.DataFrame.withColumn` and
+   :meth:`pyspark.sql.DataFrame.select`.
+   The returnType should be a primitive data type, e.g., 
`DoubleType()`.
+   The length of the returned `pandas.Series` must be of the same as 
the input `pandas.Series`.
+
+   >>> from pyspark.sql.types import IntegerType, StringType
+   >>> slen = pandas_udf(lambda s: s.str.len(), IntegerType())
+   >>> @pandas_udf(returnType=StringType())
+   ... def to_upper(s):
+   ... return s.str.upper()
+   ...
+   >>> @pandas_udf(returnType="integer")
+   ... def add_one(x):
+   ... return x + 1
+   ...
+   >>> df = spark.createDataFrame([(1, "John Doe", 21)], ("id", 
"name", "age"))
+   >>> df.select(slen("name").alias("slen(name)"), to_upper("name"), 
add_one("age")) \\
+   ... .show()  # doctest: +SKIP
+   +--+--++
+   |slen(name)|to_upper(name)|add_one(age)|
+   +--+--++
+   | 8|  JOHN DOE|  22|
+   +--+--++
+
+2. A `pandas.DataFrame` -> A `pandas.DataFrame`
+
+   This udf is used with :meth:`pyspark.sql.GroupedData.apply`.
--- End diff --

Change to `This udf is only used with` and added `note`.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...

2017-10-10 Thread icexelloss
Github user icexelloss commented on a diff in the pull request:

https://github.com/apache/spark/pull/18732#discussion_r143740882
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/pythonLogicalOperators.scala
 ---
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.plans.logical
+
+import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeSet, 
Expression}
+
+/**
+ * Logical nodes specific to PySpark.
+ */
+
+/**
+ * FlatMap groups using a udf: pandas.Dataframe -> pandas.DataFrame.
--- End diff --

Fixed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...

2017-10-10 Thread icexelloss
Github user icexelloss commented on a diff in the pull request:

https://github.com/apache/spark/pull/18732#discussion_r143740773
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/pythonLogicalOperators.scala
 ---
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.plans.logical
+
+import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeSet, 
Expression}
+
+/**
+ * Logical nodes specific to PySpark.
+ */
--- End diff --

Removed


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...

2017-10-10 Thread icexelloss
Github user icexelloss commented on a diff in the pull request:

https://github.com/apache/spark/pull/18732#discussion_r143810355
  
--- Diff: python/pyspark/sql/group.py ---
@@ -192,7 +193,84 @@ def pivot(self, pivot_col, values=None):
 jgd = self._jgd.pivot(pivot_col)
 else:
 jgd = self._jgd.pivot(pivot_col, values)
-return GroupedData(jgd, self.sql_ctx)
+return GroupedData(jgd, self._df)
+
+@since(2.3)
+def apply(self, udf):
+"""
+Maps each group of the current :class:`DataFrame` using a pandas 
udf and returns the result
+as a `DataFrame`.
--- End diff --

I think "pandas udf"  as a word is fine. `pandas_udf` is the function name.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...

2017-10-10 Thread icexelloss
Github user icexelloss commented on a diff in the pull request:

https://github.com/apache/spark/pull/18732#discussion_r143810539
  
--- Diff: python/pyspark/sql/group.py ---
@@ -192,7 +193,84 @@ def pivot(self, pivot_col, values=None):
 jgd = self._jgd.pivot(pivot_col)
 else:
 jgd = self._jgd.pivot(pivot_col, values)
-return GroupedData(jgd, self.sql_ctx)
+return GroupedData(jgd, self._df)
+
+@since(2.3)
+def apply(self, udf):
+"""
+Maps each group of the current :class:`DataFrame` using a pandas 
udf and returns the result
+as a `DataFrame`.
+
+The user-defined function should take a `pandas.DataFrame` and 
return another
+`pandas.DataFrame`. For each group, all columns are passed 
together as a `pandas.DataFrame`
+to the user-function and the returned `pandas.DataFrame` are 
combined as a `DataFrame`.
--- End diff --

Fixed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...

2017-10-10 Thread icexelloss
Github user icexelloss commented on a diff in the pull request:

https://github.com/apache/spark/pull/18732#discussion_r143810736
  
--- Diff: python/pyspark/sql/group.py ---
@@ -192,7 +193,84 @@ def pivot(self, pivot_col, values=None):
 jgd = self._jgd.pivot(pivot_col)
 else:
 jgd = self._jgd.pivot(pivot_col, values)
-return GroupedData(jgd, self.sql_ctx)
+return GroupedData(jgd, self._df)
+
+@since(2.3)
+def apply(self, udf):
+"""
+Maps each group of the current :class:`DataFrame` using a pandas 
udf and returns the result
+as a `DataFrame`.
+
+The user-defined function should take a `pandas.DataFrame` and 
return another
+`pandas.DataFrame`. For each group, all columns are passed 
together as a `pandas.DataFrame`
+to the user-function and the returned `pandas.DataFrame` are 
combined as a `DataFrame`.
+The returned `pandas.DataFrame` can be arbitrary length and its 
schema must match the
--- End diff --

Fixed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...

2017-10-10 Thread icexelloss
Github user icexelloss commented on a diff in the pull request:

https://github.com/apache/spark/pull/18732#discussion_r143812619
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowEvalPythonExec.scala
 ---
@@ -44,14 +73,18 @@ case class ArrowEvalPythonExec(udfs: Seq[PythonUDF], 
output: Seq[Attribute], chi
 val schemaOut = 
StructType.fromAttributes(output.drop(child.output.length).zipWithIndex
   .map { case (attr, i) => attr.withName(s"_$i") })
 
+val batchSize = conf.arrowMaxRecordsPerBatch
+// DO NOT use iter.grouped(). See BatchIterator.
--- End diff --

Yes. The reason is explained in the docstring of BatchIterator.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...

2017-10-10 Thread icexelloss
Github user icexelloss commented on a diff in the pull request:

https://github.com/apache/spark/pull/18732#discussion_r143812311
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/RelationalGroupedDataset.scala ---
@@ -435,6 +435,35 @@ class RelationalGroupedDataset protected[sql](
   df.logicalPlan.output,
   df.logicalPlan))
   }
+
+  /**
+   * Applies a vectorized python user-defined function to each group of 
data.
+   * The user-defined function defines a transformation: 
`Pandas.DataFrame` -> `Pandas.DataFrame`.
+   * For each group, all elements in the group are passed as a 
`Pandas.DataFrame` and the results
+   * for all groups are combined into a new `DataFrame`.
+   *
+   * This function does not support partial aggregation, and requires 
shuffling all the data in
+   * the `DataFrame`.
--- End diff --

Fixed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...

2017-10-10 Thread icexelloss
Github user icexelloss commented on a diff in the pull request:

https://github.com/apache/spark/pull/18732#discussion_r143813642
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/python/FlatMapGroupsInPandasExec.scala
 ---
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.python
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.TaskContext
+import org.apache.spark.api.python.{ChainedPythonFunctions, PythonEvalType}
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.plans.physical.{AllTuples, 
ClusteredDistribution, Distribution, Partitioning}
+import org.apache.spark.sql.execution.{GroupedIterator, SparkPlan, 
UnaryExecNode}
+import org.apache.spark.sql.types.StructType
+
+/**
+ * Physical node for 
[[org.apache.spark.sql.catalyst.plans.logical.FlatMapGroupsInPandas]]
+ *
+ * Rows in each group are passed to the python worker as a Arrow record 
batch.
--- End diff --

Fixed "a Arrow -> an Arrow"

Fixed "Python and Java capitalization"

I am actually leaning toward keeping `pandas.DataFrame` . The preference to 
`pandas` is usually lower case:
https://pandas.pydata.org/pandas-docs/stable/



---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...

2017-10-10 Thread icexelloss
Github user icexelloss commented on a diff in the pull request:

https://github.com/apache/spark/pull/18732#discussion_r143809711
  
--- Diff: python/pyspark/sql/group.py ---
@@ -192,7 +193,84 @@ def pivot(self, pivot_col, values=None):
 jgd = self._jgd.pivot(pivot_col)
 else:
 jgd = self._jgd.pivot(pivot_col, values)
-return GroupedData(jgd, self.sql_ctx)
+return GroupedData(jgd, self._df)
+
+@since(2.3)
+def apply(self, udf):
+"""
+Maps each group of the current :class:`DataFrame` using a pandas 
udf and returns the result
+as a `DataFrame`.
--- End diff --

Fixed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...

2017-10-10 Thread icexelloss
Github user icexelloss commented on a diff in the pull request:

https://github.com/apache/spark/pull/18732#discussion_r143810948
  
--- Diff: python/pyspark/sql/group.py ---
@@ -192,7 +193,84 @@ def pivot(self, pivot_col, values=None):
 jgd = self._jgd.pivot(pivot_col)
 else:
 jgd = self._jgd.pivot(pivot_col, values)
-return GroupedData(jgd, self.sql_ctx)
+return GroupedData(jgd, self._df)
+
+@since(2.3)
+def apply(self, udf):
+"""
+Maps each group of the current :class:`DataFrame` using a pandas 
udf and returns the result
+as a `DataFrame`.
+
+The user-defined function should take a `pandas.DataFrame` and 
return another
+`pandas.DataFrame`. For each group, all columns are passed 
together as a `pandas.DataFrame`
+to the user-function and the returned `pandas.DataFrame` are 
combined as a `DataFrame`.
+The returned `pandas.DataFrame` can be arbitrary length and its 
schema must match the
+returnType of the pandas udf.
+
+This function does not support partial aggregation, and requires 
shuffling all the data in
+the `DataFrame`.
+
+:param udf: A wrapped udf function returned by 
:meth:`pyspark.sql.functions.pandas_udf`
--- End diff --

I think
```
 A pandas_udf returned by :meth:`pyspark.sql.functions.pandas_udf`
```
is redundant, how about 
```
 A function object  returned by :meth:`pyspark.sql.functions.pandas_udf`
```



---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #18732: [SPARK-20396][SQL][PySpark] groupby().apply() with panda...

2017-10-10 Thread icexelloss
Github user icexelloss commented on the issue:

https://github.com/apache/spark/pull/18732
  
@HyukjinKwon Thanks!

Thanks for everyone for reviewing this tirelessly.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



  1   2   3   4   5   6   7   8   >