[GitHub] spark issue #15821: [SPARK-13534][WIP][PySpark] Using Apache Arrow to increa...
Github user icexelloss commented on the issue: https://github.com/apache/spark/pull/15821 Bryan, I am working on: (1) Add more numbers to benchmark.py (2) Add support for date/timestamp/binary type (3) Fix memory leaking in the code. All these should be done soon (tomorrow, if not today), but I think we can start getting feedbacks from Spark committers. What do you think? Is there anything else you want to be done before updating the PR to Spark committers? Li On Wed, Jan 18, 2017 at 7:52 PM, Bryan Cutler <notificati...@github.com> wrote: > Shall we update this PR to the latest and solicit from involvement from > Spark committers? > > Yeah, I think it's about ready for that. After we integrate the latest > changes, I'll go over once more for some minor cleanup and update this. > Probably in the next day or so. > > â > You are receiving this because you were mentioned. > Reply to this email directly, view it on GitHub > <https://github.com/apache/spark/pull/15821#issuecomment-273649530>, or mute > the thread > <https://github.com/notifications/unsubscribe-auth/AAwbrDkFMHPoyXOD0H-OYSyeAx_1EL2Jks5rTrO4gaJpZM4KtGBc> > . > --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #15821: [SPARK-13534][WIP][PySpark] Using Apache Arrow to increa...
Github user icexelloss commented on the issue: https://github.com/apache/spark/pull/15821 @BryanCutler , I have been working based on your branch here: https://github.com/BryanCutler/spark/tree/wip-toPandas_with_arrow-SPARK-13534 Is this the right one? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #15821: [SPARK-13534][PySpark] Using Apache Arrow to increase pe...
Github user icexelloss commented on the issue: https://github.com/apache/spark/pull/15821 @BryanCutler , there appears to be some stability issue in the current code. I am tried to repeated collect a DataFrame as Arrow BatchRecord in spark-shell and discovered that executors start to fail after a while. ![image](https://cloud.githubusercontent.com/assets/793516/24308213/7cd46daa-109d-11e7-8824-3255b9690cf2.png) I looked at one failed executor, it appeared to have been killed by mesos because of exceeding memory limit. I suspect there are some kind of memory leak. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #15821: [SPARK-13534][PySpark] Using Apache Arrow to increase pe...
Github user icexelloss commented on the issue: https://github.com/apache/spark/pull/15821 @BryanCutler Thanks! The issue is fixed with your new update. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #15821: [SPARK-13534][PySpark] Using Apache Arrow to increase pe...
Github user icexelloss commented on the issue: https://github.com/apache/spark/pull/15821 @BryanCutler I think this patch is in a good shape that I want to release this code internally in Two Sigma for beta users. My understanding is support for timestamp and date is not available until arrow 0.3, is that right? Also, are there any unresolved issues that you know of? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #15821: [SPARK-13534][PySpark] Using Apache Arrow to increase pe...
Github user icexelloss commented on the issue: https://github.com/apache/spark/pull/15821 Got it. Can you put up a patch to throw exception for timestamp and date type to arrow-integration branch? I would do it but I don't have my laptop with me now... --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #15821: [SPARK-13534][PySpark] Using Apache Arrow to increase pe...
Github user icexelloss commented on the issue: https://github.com/apache/spark/pull/15821 @BryanCutler from the arrow-integration branch. Where is the memory leak patch? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18664: [SPARK-21375][PYSPARK][SQL][WIP] Add Date and Timestamp ...
Github user icexelloss commented on the issue: https://github.com/apache/spark/pull/18664 It looks like "SESSION_LOCAL_TIMEZONE" is not respected in most of the pyspark functionality. I think `df.collect()` and `df.toPandas` can be fixed to respect SESSION_LOCAL_TIMEZONE. `TimestampType().toInternal(dt)` is tricky because it doesn't have a reference to the SQLConf object, we should maybe deprecate this method? I think we can also create separate Jira to address these, this PR can fix just the Arrow path. @BryanCutler and @ueshin do you think? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18664: [SPARK-21375][PYSPARK][SQL][WIP] Add Date and Timestamp ...
Github user icexelloss commented on the issue: https://github.com/apache/spark/pull/18664 @ueshin I amd +1 for fixing `df.collect()` and `df.toPandas()`, I don't think it is much of a backward-compatibility issue because the current behavior of `df.collect()` and `df.toPandas()` is broken when `SESSION_LOCAL_TIMEZONE` is set. The fact that no pyspark users complain that it doesn't work as expected is probably because it's not used widely in pyspark. IMHO we should fix this as soon as possible. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18664: [SPARK-21375][PYSPARK][SQL][WIP] Add Date and Timestamp ...
Github user icexelloss commented on the issue: https://github.com/apache/spark/pull/18664 To Wes's concern, I think we are only dealing with values in UTC here, both Spark and Arrow internally represents timestamp as microseconds since epoch. To the two issues Bryan and Ueshin brought up: Issue 1: I agree with Ueshin we should stick to `SESSION_LOCAL_TIMEZONE`. Bryan brought up a good point there in pyspark `df.toPandas()`, `df.collect()` and the python udf (through `Timestamp.fromInternal`) doesn't respect `SESSION_LOCAL_TIMEZONE` and therefore is confusing and inconsistent with Spark SQL behavior such as `df.show()`. Since it's going to be either inconsistent with Spark SQL (df.show()) or inconsistent with PySpark (i.e., the default df.toPandas()), I'd rather we do the right thing (by using `SESSION_LOCAL_TIMEZONE`) and fix other PySpark behavior separately. Issue 2: I agree with Bryan that we leave the timezone as is. I don't think there is performance issue because like Wes mentioned, it's just metadata operation. I think converting it back to system timezone defeat the purpose of using session timezone and throwing away the tzinfo seems unnecessary. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18664: [SPARK-21375][PYSPARK][SQL][WIP] Add Date and Tim...
Github user icexelloss commented on a diff in the pull request: https://github.com/apache/spark/pull/18664#discussion_r131227296 --- Diff: python/pyspark/sql/tests.py --- @@ -3036,6 +3052,9 @@ def test_toPandas_arrow_toggle(self): pdf = df.toPandas() self.spark.conf.set("spark.sql.execution.arrow.enable", "true") pdf_arrow = df.toPandas() +# need to remove timezone for comparison +pdf_arrow["7_timestamp_t"] = \ +pdf_arrow["7_timestamp_t"].apply(lambda ts: ts.tz_localize(None)) --- End diff -- @gatorsmile, can you be a bit explicit about the behavior we want? As we discovered that the current behavior of `df.toPandas()` is wrong and inconsistent with Spark SQL (because `df,toPandas()` doesn't respect `LOCAL_SESSION_TIMEZONE`), do you think we should: 1. be consistent with default `df.toPandas()` even though it is wrong or: 2. be consistent with Spark SQL Also this is not huge issue IMHO because we should fix the behavior of `df.toPandas()` soon anyway. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18933: [WIP][SPARK-21722][SQL][PYTHON] Enable timezone-a...
Github user icexelloss commented on a diff in the pull request: https://github.com/apache/spark/pull/18933#discussion_r133229705 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala --- @@ -912,6 +912,14 @@ object SQLConf { .intConf .createWithDefault(1) + val PANDAS_TIMEZONE_AWARE = --- End diff -- There are other parts of the pyspark that doesn't use session local timezone. For instance, df.collect() and (maybe) python udf execution. I am worried that having those to be inconsistent (some use local timezone, some doesn't) and complex (one configuration for each of these functionality?) While it will be harder to fix, but how about we use one configuration to control the behavior of `df.toPandas()` and `df.collect()` and python udf regarding session local timezone? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18732: groupby().apply() with pandas udf
GitHub user icexelloss opened a pull request: https://github.com/apache/spark/pull/18732 groupby().apply() with pandas udf ## What changes were proposed in this pull request? This PR adds an apply() function on df.groupby(). apply() takes a pandas udf that is a transformation on `pandas.DataFrame` -> `pandas.DataFrame`. A quick example: ``` schema = df.schema @pandas_udf(schema) def normalize(pdf): pdf.v1 = (pdf.v1 - pdf.v1.mean()) / pdf.v1.std() return pdf df.groupBy('id').apply(normalize(df)) ``` This Patch consists of multiple parts which can be broken into small PRs: * Arrow RecordBatch -> UnsafeRow conversions in ArrowConverters * pandas_udf in pyspark.sql.functions * FlatMapInPandas plan node to support groupby().apply() The first two parts can be used to implement other pandas udf functions. Design doc: https://github.com/icexelloss/spark/blob/pandas-udf-doc/docs/pyspark-pandas-udf.md ## How was this patch tested? * Arrow RecordBatch -> UnsafeRow conversions is tested with the existing ArrowConverters Suite * groupby().apply() is tested with a new pyspark test You can merge this pull request into a Git repository by running: $ git pull https://github.com/icexelloss/spark groupby-apply-SPARK-20396 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/18732.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #18732 commit 8f38c15ea372c1ec9c2fefc36c0bfc3a22c3be14 Author: Li Jin <ice.xell...@gmail.com> Date: 2017-07-25T15:37:39Z Initial commit of groupby apply with pandas udf --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18659: [SPARK-21404][PYSPARK][WIP] Simple Python Vectori...
Github user icexelloss commented on a diff in the pull request: https://github.com/apache/spark/pull/18659#discussion_r129412101 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowConverters.scala --- @@ -132,6 +135,61 @@ private[sql] object ArrowConverters { } } + private[sql] def fromPayloadIterator(iter: Iterator[ArrowPayload]): Iterator[InternalRow] = { +new Iterator[InternalRow] { + private val _allocator = new RootAllocator(Long.MaxValue) + private var _reader: ArrowFileReader = _ + private var _root: VectorSchemaRoot = _ + private var _index = 0 + + loadNextBatch() + + override def hasNext: Boolean = _root != null && _index < _root.getRowCount + + override def next(): InternalRow = { +val fields = _root.getFieldVectors.asScala + +val genericRowData = fields.map { field => + field.getAccessor.getObject(_index) +}.toArray[Any] --- End diff -- @BryanCutler, I have implemented arrow -> unsafe row conversions in: https://github.com/icexelloss/spark/commit/8f38c15ea372c1ec9c2fefc36c0bfc3a22c3be14#diff-52cca47e7a940849b28d476ddf99d65eR575 This reuses the row object and doesn't do boxing. Hopefully it's useful to you as well? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18664: [SPARK-21375][PYSPARK][SQL][WIP] Add Date and Timestamp ...
Github user icexelloss commented on the issue: https://github.com/apache/spark/pull/18664 Excited to see this being worked on. > SQLConf.SESSION_LOCAL_TIMEZONE I like this the best. This presents timestamp in local time which is compatible with the existing `toPandas()` and `collect()`. If we really want to have the result exactly the same as the non-arrow version of toPandas, we can do sth like `df[col] = df[col].dt.tz_localize(None)` after getting the `pandas.DataFrame` from arrow. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15821: [SPARK-13534][PySpark] Using Apache Arrow to incr...
Github user icexelloss commented on a diff in the pull request: https://github.com/apache/spark/pull/15821#discussion_r113728111 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowConverters.scala --- @@ -0,0 +1,396 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one or more +* contributor license agreements. See the NOTICE file distributed with +* this work for additional information regarding copyright ownership. +* The ASF licenses this file to You under the Apache License, Version 2.0 +* (the "License"); you may not use this file except in compliance with +* the License. You may obtain a copy of the License at +* +*http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.spark.sql.execution.arrow + +import java.io.ByteArrayOutputStream +import java.nio.channels.Channels + +import scala.collection.JavaConverters._ + +import io.netty.buffer.ArrowBuf +import org.apache.arrow.memory.{BufferAllocator, RootAllocator} +import org.apache.arrow.vector._ +import org.apache.arrow.vector.BaseValueVector.BaseMutator +import org.apache.arrow.vector.file._ +import org.apache.arrow.vector.schema.{ArrowFieldNode, ArrowRecordBatch} +import org.apache.arrow.vector.types.{DateUnit, FloatingPointPrecision, TimeUnit} +import org.apache.arrow.vector.types.pojo.{ArrowType, Field, FieldType, Schema} +import org.apache.arrow.vector.util.ByteArrayReadableSeekableByteChannel + +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.types._ +import org.apache.spark.util.Utils + + +/** + * Store Arrow data in a form that can be serialized by Spark + */ +private[sql] class ArrowPayload(val batchBytes: Array[Byte]) extends Serializable { + + def this(batch: ArrowRecordBatch, schema: StructType, allocator: BufferAllocator) = { +this(ArrowConverters.batchToByteArray(batch, schema, allocator)) + } + + def loadBatch(allocator: BufferAllocator): ArrowRecordBatch = { +ArrowConverters.byteArrayToBatch(batchBytes, allocator) + } +} + +private[sql] object ArrowConverters { + + /** + * Map a Spark DataType to ArrowType. + */ + private[arrow] def sparkTypeToArrowType(dataType: DataType): ArrowType = { +dataType match { + case BooleanType => ArrowType.Bool.INSTANCE + case ShortType => new ArrowType.Int(8 * ShortType.defaultSize, true) + case IntegerType => new ArrowType.Int(8 * IntegerType.defaultSize, true) + case LongType => new ArrowType.Int(8 * LongType.defaultSize, true) + case FloatType => new ArrowType.FloatingPoint(FloatingPointPrecision.SINGLE) + case DoubleType => new ArrowType.FloatingPoint(FloatingPointPrecision.DOUBLE) + case ByteType => new ArrowType.Int(8, true) + case StringType => ArrowType.Utf8.INSTANCE + case BinaryType => ArrowType.Binary.INSTANCE + case _ => throw new UnsupportedOperationException(s"Unsupported data type: $dataType") +} + } + + /** + * Convert a Spark Dataset schema to Arrow schema. + */ + private[arrow] def schemaToArrowSchema(schema: StructType): Schema = { +val arrowFields = schema.fields.map { f => + new Field(f.name, f.nullable, sparkTypeToArrowType(f.dataType), List.empty[Field].asJava) +} +new Schema(arrowFields.toList.asJava) + } + + /** + * Maps Iterator from InternalRow to ArrowPayload + */ + private[sql] def toPayloadIterator( + rowIter: Iterator[InternalRow], + schema: StructType): Iterator[ArrowPayload] = { +new Iterator[ArrowPayload] { + private val _allocator = new RootAllocator(Long.MaxValue) + private var _nextPayload = if (rowIter.nonEmpty) convert() else null + + override def hasNext: Boolean = _nextPayload != null + + override def next(): ArrowPayload = { +val obj = _nextPayload +if (hasNext) { + if (rowIter.hasNext) { +_nextPayload = convert() + } else { +_allocator.close() --- End diff -- I think also need to handle exception / task cancellation here to free up the memory. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. I
[GitHub] spark pull request #15821: [SPARK-13534][PySpark] Using Apache Arrow to incr...
Github user icexelloss commented on a diff in the pull request: https://github.com/apache/spark/pull/15821#discussion_r113728646 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowConverters.scala --- @@ -0,0 +1,396 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one or more +* contributor license agreements. See the NOTICE file distributed with +* this work for additional information regarding copyright ownership. +* The ASF licenses this file to You under the Apache License, Version 2.0 +* (the "License"); you may not use this file except in compliance with +* the License. You may obtain a copy of the License at +* +*http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.spark.sql.execution.arrow + +import java.io.ByteArrayOutputStream +import java.nio.channels.Channels + +import scala.collection.JavaConverters._ + +import io.netty.buffer.ArrowBuf +import org.apache.arrow.memory.{BufferAllocator, RootAllocator} +import org.apache.arrow.vector._ +import org.apache.arrow.vector.BaseValueVector.BaseMutator +import org.apache.arrow.vector.file._ +import org.apache.arrow.vector.schema.{ArrowFieldNode, ArrowRecordBatch} +import org.apache.arrow.vector.types.{DateUnit, FloatingPointPrecision, TimeUnit} +import org.apache.arrow.vector.types.pojo.{ArrowType, Field, FieldType, Schema} +import org.apache.arrow.vector.util.ByteArrayReadableSeekableByteChannel + +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.types._ +import org.apache.spark.util.Utils + + +/** + * Store Arrow data in a form that can be serialized by Spark + */ +private[sql] class ArrowPayload(val batchBytes: Array[Byte]) extends Serializable { + + def this(batch: ArrowRecordBatch, schema: StructType, allocator: BufferAllocator) = { +this(ArrowConverters.batchToByteArray(batch, schema, allocator)) + } + + def loadBatch(allocator: BufferAllocator): ArrowRecordBatch = { +ArrowConverters.byteArrayToBatch(batchBytes, allocator) + } +} + +private[sql] object ArrowConverters { + + /** + * Map a Spark DataType to ArrowType. + */ + private[arrow] def sparkTypeToArrowType(dataType: DataType): ArrowType = { +dataType match { + case BooleanType => ArrowType.Bool.INSTANCE + case ShortType => new ArrowType.Int(8 * ShortType.defaultSize, true) + case IntegerType => new ArrowType.Int(8 * IntegerType.defaultSize, true) + case LongType => new ArrowType.Int(8 * LongType.defaultSize, true) + case FloatType => new ArrowType.FloatingPoint(FloatingPointPrecision.SINGLE) + case DoubleType => new ArrowType.FloatingPoint(FloatingPointPrecision.DOUBLE) + case ByteType => new ArrowType.Int(8, true) + case StringType => ArrowType.Utf8.INSTANCE + case BinaryType => ArrowType.Binary.INSTANCE + case _ => throw new UnsupportedOperationException(s"Unsupported data type: $dataType") +} + } + + /** + * Convert a Spark Dataset schema to Arrow schema. + */ + private[arrow] def schemaToArrowSchema(schema: StructType): Schema = { +val arrowFields = schema.fields.map { f => + new Field(f.name, f.nullable, sparkTypeToArrowType(f.dataType), List.empty[Field].asJava) +} +new Schema(arrowFields.toList.asJava) + } + + /** + * Maps Iterator from InternalRow to ArrowPayload + */ + private[sql] def toPayloadIterator( + rowIter: Iterator[InternalRow], + schema: StructType): Iterator[ArrowPayload] = { +new Iterator[ArrowPayload] { + private val _allocator = new RootAllocator(Long.MaxValue) --- End diff -- I think it's better to have root allocator and use child allocators from the root allocator when an allocator is needed. This way it will be easy to find memory leaks. Maybe @julienledem can chime in and suggest the best practice? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- ---
[GitHub] spark pull request #15821: [SPARK-13534][PySpark] Using Apache Arrow to incr...
Github user icexelloss commented on a diff in the pull request: https://github.com/apache/spark/pull/15821#discussion_r113729309 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowConverters.scala --- @@ -0,0 +1,396 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one or more +* contributor license agreements. See the NOTICE file distributed with +* this work for additional information regarding copyright ownership. +* The ASF licenses this file to You under the Apache License, Version 2.0 +* (the "License"); you may not use this file except in compliance with +* the License. You may obtain a copy of the License at +* +*http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.spark.sql.execution.arrow + +import java.io.ByteArrayOutputStream +import java.nio.channels.Channels + +import scala.collection.JavaConverters._ + +import io.netty.buffer.ArrowBuf +import org.apache.arrow.memory.{BufferAllocator, RootAllocator} +import org.apache.arrow.vector._ +import org.apache.arrow.vector.BaseValueVector.BaseMutator +import org.apache.arrow.vector.file._ +import org.apache.arrow.vector.schema.{ArrowFieldNode, ArrowRecordBatch} +import org.apache.arrow.vector.types.{DateUnit, FloatingPointPrecision, TimeUnit} +import org.apache.arrow.vector.types.pojo.{ArrowType, Field, FieldType, Schema} +import org.apache.arrow.vector.util.ByteArrayReadableSeekableByteChannel + +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.types._ +import org.apache.spark.util.Utils + + +/** + * Store Arrow data in a form that can be serialized by Spark + */ +private[sql] class ArrowPayload(val batchBytes: Array[Byte]) extends Serializable { --- End diff -- I think it would be helpful to document that batchBytes here is in "Arrow FIle Format" that contains exactly one arrow record batch --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15821: [SPARK-13534][PySpark] Using Apache Arrow to incr...
Github user icexelloss commented on a diff in the pull request: https://github.com/apache/spark/pull/15821#discussion_r113730387 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowConverters.scala --- @@ -0,0 +1,396 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one or more +* contributor license agreements. See the NOTICE file distributed with +* this work for additional information regarding copyright ownership. +* The ASF licenses this file to You under the Apache License, Version 2.0 +* (the "License"); you may not use this file except in compliance with +* the License. You may obtain a copy of the License at +* +*http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.spark.sql.execution.arrow + +import java.io.ByteArrayOutputStream +import java.nio.channels.Channels + +import scala.collection.JavaConverters._ + +import io.netty.buffer.ArrowBuf +import org.apache.arrow.memory.{BufferAllocator, RootAllocator} +import org.apache.arrow.vector._ +import org.apache.arrow.vector.BaseValueVector.BaseMutator +import org.apache.arrow.vector.file._ +import org.apache.arrow.vector.schema.{ArrowFieldNode, ArrowRecordBatch} +import org.apache.arrow.vector.types.{DateUnit, FloatingPointPrecision, TimeUnit} +import org.apache.arrow.vector.types.pojo.{ArrowType, Field, FieldType, Schema} +import org.apache.arrow.vector.util.ByteArrayReadableSeekableByteChannel + +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.types._ +import org.apache.spark.util.Utils + + +/** + * Store Arrow data in a form that can be serialized by Spark + */ +private[sql] class ArrowPayload(val batchBytes: Array[Byte]) extends Serializable { + + def this(batch: ArrowRecordBatch, schema: StructType, allocator: BufferAllocator) = { +this(ArrowConverters.batchToByteArray(batch, schema, allocator)) + } + + def loadBatch(allocator: BufferAllocator): ArrowRecordBatch = { +ArrowConverters.byteArrayToBatch(batchBytes, allocator) + } +} + +private[sql] object ArrowConverters { + + /** + * Map a Spark DataType to ArrowType. + */ + private[arrow] def sparkTypeToArrowType(dataType: DataType): ArrowType = { +dataType match { + case BooleanType => ArrowType.Bool.INSTANCE + case ShortType => new ArrowType.Int(8 * ShortType.defaultSize, true) + case IntegerType => new ArrowType.Int(8 * IntegerType.defaultSize, true) + case LongType => new ArrowType.Int(8 * LongType.defaultSize, true) + case FloatType => new ArrowType.FloatingPoint(FloatingPointPrecision.SINGLE) + case DoubleType => new ArrowType.FloatingPoint(FloatingPointPrecision.DOUBLE) + case ByteType => new ArrowType.Int(8, true) + case StringType => ArrowType.Utf8.INSTANCE + case BinaryType => ArrowType.Binary.INSTANCE + case _ => throw new UnsupportedOperationException(s"Unsupported data type: $dataType") +} + } + + /** + * Convert a Spark Dataset schema to Arrow schema. + */ + private[arrow] def schemaToArrowSchema(schema: StructType): Schema = { +val arrowFields = schema.fields.map { f => + new Field(f.name, f.nullable, sparkTypeToArrowType(f.dataType), List.empty[Field].asJava) +} +new Schema(arrowFields.toList.asJava) + } + + /** + * Maps Iterator from InternalRow to ArrowPayload + */ + private[sql] def toPayloadIterator( + rowIter: Iterator[InternalRow], + schema: StructType): Iterator[ArrowPayload] = { +new Iterator[ArrowPayload] { + private val _allocator = new RootAllocator(Long.MaxValue) + private var _nextPayload = if (rowIter.nonEmpty) convert() else null + + override def hasNext: Boolean = _nextPayload != null + + override def next(): ArrowPayload = { +val obj = _nextPayload +if (hasNext) { + if (rowIter.hasNext) { +_nextPayload = convert() + } else { +_allocator.close() +_nextPayload = null + } +} +obj + } + + private def convert(): ArrowPayload = { +val batch = internalRowIterToArrowBatch(rowIter, schema, _all
[GitHub] spark issue #15821: [SPARK-13534][PySpark] Using Apache Arrow to increase pe...
Github user icexelloss commented on the issue: https://github.com/apache/spark/pull/15821 > An instance of this must be used each time a ArrowRecordBatch is created and then the batch and allocator must be released/closed after they have been processed I think it would useful to add test to check memory leaks in error cases, for instance: * Have a dataframe that throws exception after n rows. Invoke the arrow conversion function, and check allocator memory usage. * Have a dataframe that is slow, invoke the arrow conversion function, cancel the task, and check allocator memory usage. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18664: [SPARK-21375][PYSPARK][SQL][WIP] Add Date and Tim...
Github user icexelloss commented on a diff in the pull request: https://github.com/apache/spark/pull/18664#discussion_r130201966 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowUtils.scala --- @@ -42,6 +43,9 @@ object ArrowUtils { case StringType => ArrowType.Utf8.INSTANCE case BinaryType => ArrowType.Binary.INSTANCE case DecimalType.Fixed(precision, scale) => new ArrowType.Decimal(precision, scale) +case DateType => new ArrowType.Date(DateUnit.DAY) +case TimestampType => + new ArrowType.Timestamp(TimeUnit.MICROSECOND, DateTimeUtils.defaultTimeZone().getID) --- End diff -- I do think we should use SQLConf.SESSION_LOCAL_TIMEZONE in this PR. I am concerned about potential inconsistent behavior due to system timezone like @wesm mentioned. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18787: [SPARK-21583][SQL] Create a ColumnarBatch from Ar...
Github user icexelloss commented on a diff in the pull request: https://github.com/apache/spark/pull/18787#discussion_r132188490 --- Diff: sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnarBatch.java --- @@ -65,15 +65,35 @@ final Row row; public static ColumnarBatch allocate(StructType schema, MemoryMode memMode) { -return new ColumnarBatch(schema, DEFAULT_BATCH_SIZE, memMode); +return allocate(schema, memMode, DEFAULT_BATCH_SIZE); } public static ColumnarBatch allocate(StructType type) { -return new ColumnarBatch(type, DEFAULT_BATCH_SIZE, DEFAULT_MEMORY_MODE); +return allocate(type, DEFAULT_MEMORY_MODE, DEFAULT_BATCH_SIZE); } public static ColumnarBatch allocate(StructType schema, MemoryMode memMode, int maxRows) { -return new ColumnarBatch(schema, maxRows, memMode); +ColumnVector[] columns = allocateCols(schema, maxRows, memMode); +return new ColumnarBatch(schema, columns, maxRows); + } + + private static ColumnVector[] allocateCols(StructType schema, int maxRows, MemoryMode memMode) { +ColumnVector[] columns = new ColumnVector[schema.size()]; +for (int i = 0; i < schema.fields().length; ++i) { + StructField field = schema.fields()[i]; + columns[i] = ColumnVector.allocate(maxRows, field.dataType(), memMode); +} +return columns; + } + + public static ColumnarBatch createReadOnly( + StructType schema, + ReadOnlyColumnVector[] columns, + int numRows) { +assert(schema.length() == columns.length); +ColumnarBatch batch = new ColumnarBatch(schema, columns, numRows); --- End diff -- Why the capacity is set to `numRows` inside the ctor but need to call `batch.setNumRows()` manually? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18787: [SPARK-21583][SQL] Create a ColumnarBatch from Ar...
Github user icexelloss commented on a diff in the pull request: https://github.com/apache/spark/pull/18787#discussion_r132187027 --- Diff: sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnarBatch.java --- @@ -65,15 +65,35 @@ final Row row; public static ColumnarBatch allocate(StructType schema, MemoryMode memMode) { -return new ColumnarBatch(schema, DEFAULT_BATCH_SIZE, memMode); +return allocate(schema, memMode, DEFAULT_BATCH_SIZE); } public static ColumnarBatch allocate(StructType type) { -return new ColumnarBatch(type, DEFAULT_BATCH_SIZE, DEFAULT_MEMORY_MODE); +return allocate(type, DEFAULT_MEMORY_MODE, DEFAULT_BATCH_SIZE); } public static ColumnarBatch allocate(StructType schema, MemoryMode memMode, int maxRows) { -return new ColumnarBatch(schema, maxRows, memMode); +ColumnVector[] columns = allocateCols(schema, maxRows, memMode); +return new ColumnarBatch(schema, columns, maxRows); + } + + private static ColumnVector[] allocateCols(StructType schema, int maxRows, MemoryMode memMode) { +ColumnVector[] columns = new ColumnVector[schema.size()]; +for (int i = 0; i < schema.fields().length; ++i) { + StructField field = schema.fields()[i]; + columns[i] = ColumnVector.allocate(maxRows, field.dataType(), memMode); +} +return columns; + } + + public static ColumnarBatch createReadOnly( + StructType schema, + ReadOnlyColumnVector[] columns, + int numRows) { +assert(schema.length() == columns.length); +ColumnarBatch batch = new ColumnarBatch(schema, columns, numRows); +batch.setNumRows(numRows); --- End diff -- Do we need to check each ReadOnlyColumnVector has numRows? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #15821: [SPARK-13534][PySpark] Using Apache Arrow to increase pe...
Github user icexelloss commented on the issue: https://github.com/apache/spark/pull/15821 @BryanCutler , is Timestamp and Date type supported now with Arrow 0.3? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #15821: [SPARK-13534][PySpark] Using Apache Arrow to increase pe...
Github user icexelloss commented on the issue: https://github.com/apache/spark/pull/15821 >@icexelloss , yes Arrow supports it but Spark stores timestamps is a different way which caused some complication. After talking with Holden, we agreed it was better to keep this PR to simple data types only and extent type support in a follow up PR. Got it. Can you share some details? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19284: [SPARK-22067][SQL] ArrowWriter should use position when ...
Github user icexelloss commented on the issue: https://github.com/apache/spark/pull/19284 LGTM. What's the Arrow bug you mentioned? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...
Github user icexelloss commented on a diff in the pull request: https://github.com/apache/spark/pull/18732#discussion_r142845456 --- Diff: python/pyspark/sql/group.py --- @@ -192,7 +193,67 @@ def pivot(self, pivot_col, values=None): jgd = self._jgd.pivot(pivot_col) else: jgd = self._jgd.pivot(pivot_col, values) -return GroupedData(jgd, self.sql_ctx) +return GroupedData(jgd, self._df) + +def apply(self, udf): +""" +Maps each group of the current :class:`DataFrame` using a pandas udf and returns the result +as a :class:`DataFrame`. + +The user-defined function should take a `pandas.DataFrame` and return another +`pandas.DataFrame`. Each group is passed as a `pandas.DataFrame` to the user-function and +the returned`pandas.DataFrame` are combined as a :class:`DataFrame`. The returned +`pandas.DataFrame` can be arbitrary length and its schema should match the returnType of +the pandas udf. + +:param udf: A wrapped function returned by `pandas_udf` + +>>> df = spark.createDataFrame( +... [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)], +... ("id", "v")) +>>> @pandas_udf(returnType=df.schema) +... def normalize(pdf): +... v = pdf.v +... return pdf.assign(v=(v - v.mean()) / v.std()) +>>> df.groupby('id').apply(normalize).show() # doctest: +SKIP --- End diff -- Fixed. Thanks! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...
Github user icexelloss commented on a diff in the pull request: https://github.com/apache/spark/pull/18732#discussion_r142840490 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/object.scala --- @@ -519,3 +519,18 @@ case class CoGroup( outputObjAttr: Attribute, left: LogicalPlan, right: LogicalPlan) extends BinaryNode with ObjectProducer + +case class FlatMapGroupsInPandas( --- End diff -- Doc added. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18732: [SPARK-20396][SQL][PySpark] groupby().apply() with panda...
Github user icexelloss commented on the issue: https://github.com/apache/spark/pull/18732 I pushed a new commit addressing the comments. Let me scan through the comments again. I think there are some comments around worker.py not being addressed yet. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...
Github user icexelloss commented on a diff in the pull request: https://github.com/apache/spark/pull/18732#discussion_r142839010 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowEvalPythonExec.scala --- @@ -26,6 +26,25 @@ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.execution.SparkPlan import org.apache.spark.sql.types.StructType +private class BatchIterator[T](iter: Iterator[T], batchSize: Int) + extends Iterator[Iterator[T]] { + + override def hasNext: Boolean = iter.hasNext + + override def next(): Iterator[T] = { --- End diff -- Sorry I pushed a bit late. The comment is added now. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18664: [SPARK-21375][PYSPARK][SQL][WIP] Add Date and Timestamp ...
Github user icexelloss commented on the issue: https://github.com/apache/spark/pull/18664 If we all agree on the necessity of a design doc first, I can create a Jira and we can make progress there. What do you all think? @BryanCutler @gatorsmile @HyukjinKwon --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...
Github user icexelloss commented on a diff in the pull request: https://github.com/apache/spark/pull/18732#discussion_r143263694 --- Diff: python/pyspark/sql/group.py --- @@ -192,7 +193,69 @@ def pivot(self, pivot_col, values=None): jgd = self._jgd.pivot(pivot_col) else: jgd = self._jgd.pivot(pivot_col, values) -return GroupedData(jgd, self.sql_ctx) +return GroupedData(jgd, self._df) + +@since(2.3) +def apply(self, udf): --- End diff -- @rxin just to recap our discussion regarding naming: You asked: > What's the difference between this one and the transform function you also proposed? I'm trying to see if all the naming makes sense when considered together. Answer is: `transform` takes a function: pd.Series -> pd.Series and apply the function on each column (or subset of columns). The input and output Series are of the same length. `apply` takes a function: pd.DataFrame -> pd.DataFrame and apply the function on the group. Similar to `flatMapGroups` Does this make sense to you? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18664: [SPARK-21375][PYSPARK][SQL][WIP] Add Date and Timestamp ...
Github user icexelloss commented on the issue: https://github.com/apache/spark/pull/18664 > The baseline should be (as said above): Internal optimisation should not introduce any behaviour change, and we are discouraged to change the previous behaviour unless it has bugs in general. I am not sure if I totally agree with this. Take the struct for instance, in the non-Arrow version, struct is turned to `pyspark.sql.Row` object in `toPandas()`. I wouldn't call this bug because it's design choice. However, this is maybe not the best design choice because if the user pickle the `pandas.DataFrame` to a file and send it to someone, the receiver won't be able to deserialize this `pandas.DataFrame` without having the pyspark library dependency. Now I am **not** trying to argue we should or should not turn struct column to `pyspark.sql.Row`, my point is that there might be some design choices in the non-Arrow versions that are not ideal and maybe that's not a hard requirement to make behavior 100% the same between non-Arrow and Arrow version. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18664: [SPARK-21375][PYSPARK][SQL][WIP] Add Date and Timestamp ...
Github user icexelloss commented on the issue: https://github.com/apache/spark/pull/18664 I agree. I think some high level document describing these differences so we can discuss it. I think we should be more careful about Arrow-version behavior before releasing support for timestamp and nested types. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18732: [SPARK-20396][SQL][PySpark] groupby().apply() with panda...
Github user icexelloss commented on the issue: https://github.com/apache/spark/pull/18732 Hi All, I think all comments should be addressed at this point, except for the naming comment from @rxin. If I missed something or if there is anything else you want me to address, please let me know. Otherwise, I will just wait for Reynold's feedback regarding naming. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...
Github user icexelloss commented on a diff in the pull request: https://github.com/apache/spark/pull/18732#discussion_r143198047 --- Diff: python/pyspark/sql/group.py --- @@ -192,7 +193,69 @@ def pivot(self, pivot_col, values=None): jgd = self._jgd.pivot(pivot_col) else: jgd = self._jgd.pivot(pivot_col, values) -return GroupedData(jgd, self.sql_ctx) +return GroupedData(jgd, self._df) + +@since(2.3) +def apply(self, udf): +""" +Maps each group of the current :class:`DataFrame` using a pandas udf and returns the result +as a :class:`DataFrame`. + +The user-defined function should take a `pandas.DataFrame` and return another +`pandas.DataFrame`. Each group is passed as a `pandas.DataFrame` to the user-function and +the returned`pandas.DataFrame` are combined as a :class:`DataFrame`. The returned +`pandas.DataFrame` can be arbitrary length and its schema should match the returnType of +the pandas udf. + +:param udf: A wrapped udf function returned by :meth:`pyspark.sql.functions.pandas_udf` + +>>> from pyspark.sql.functions import pandas_udf +>>> df = spark.createDataFrame( +... [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)], +... ("id", "v")) +>>> @pandas_udf(returnType=df.schema) +... def normalize(pdf): +... v = pdf.v +... return pdf.assign(v=(v - v.mean()) / v.std()) +>>> df.groupby('id').apply(normalize).show() # doctest: +SKIP ++---+---+ +| id| v| ++---+---+ +| 1|-0.7071067811865475| +| 1| 0.7071067811865475| +| 2|-0.8320502943378437| +| 2|-0.2773500981126146| +| 2| 1.1094003924504583| ++---+---+ + +.. seealso:: :meth:`pyspark.sql.functions.pandas_udf` + +""" +from pyspark.sql.functions import pandas_udf + +# Columns are special because hasattr always return True +if isinstance(udf, Column) or not hasattr(udf, 'func') or not udf.vectorized: +raise ValueError("The argument to apply must be a pandas_udf") +if not isinstance(udf.returnType, StructType): +raise ValueError("The returnType of the pandas_udf must be a StructType") + +df = self._df +func = udf.func +returnType = udf.returnType + +# The python executors expects the function to take a list of pd.Series as input +# So we to create a wrapper function that turns that to a pd.DataFrame before passing +# down to the user function +columns = df.columns + +def wrapped(*cols): +import pandas as pd +return func(pd.concat(cols, axis=1, keys=columns)) + +wrapped_udf_obj = pandas_udf(wrapped, returnType) +udf_column = wrapped_udf_obj(*[df[col] for col in df.columns]) --- End diff -- I see. Yeah I can make it more clear in the doc. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...
Github user icexelloss commented on a diff in the pull request: https://github.com/apache/spark/pull/18732#discussion_r143213000 --- Diff: python/pyspark/sql/group.py --- @@ -192,7 +193,69 @@ def pivot(self, pivot_col, values=None): jgd = self._jgd.pivot(pivot_col) else: jgd = self._jgd.pivot(pivot_col, values) -return GroupedData(jgd, self.sql_ctx) +return GroupedData(jgd, self._df) + +@since(2.3) +def apply(self, udf): +""" +Maps each group of the current :class:`DataFrame` using a pandas udf and returns the result +as a :class:`DataFrame`. + +The user-defined function should take a `pandas.DataFrame` and return another +`pandas.DataFrame`. Each group is passed as a `pandas.DataFrame` to the user-function and +the returned`pandas.DataFrame` are combined as a :class:`DataFrame`. The returned +`pandas.DataFrame` can be arbitrary length and its schema should match the returnType of +the pandas udf. + +:param udf: A wrapped udf function returned by :meth:`pyspark.sql.functions.pandas_udf` + +>>> from pyspark.sql.functions import pandas_udf +>>> df = spark.createDataFrame( +... [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)], +... ("id", "v")) +>>> @pandas_udf(returnType=df.schema) +... def normalize(pdf): +... v = pdf.v +... return pdf.assign(v=(v - v.mean()) / v.std()) +>>> df.groupby('id').apply(normalize).show() # doctest: +SKIP ++---+---+ +| id| v| ++---+---+ +| 1|-0.7071067811865475| +| 1| 0.7071067811865475| +| 2|-0.8320502943378437| +| 2|-0.2773500981126146| +| 2| 1.1094003924504583| ++---+---+ + +.. seealso:: :meth:`pyspark.sql.functions.pandas_udf` + +""" +from pyspark.sql.functions import pandas_udf + +# Columns are special because hasattr always return True +if isinstance(udf, Column) or not hasattr(udf, 'func') or not udf.vectorized: +raise ValueError("The argument to apply must be a pandas_udf") +if not isinstance(udf.returnType, StructType): +raise ValueError("The returnType of the pandas_udf must be a StructType") + +df = self._df +func = udf.func +returnType = udf.returnType + +# The python executors expects the function to take a list of pd.Series as input +# So we to create a wrapper function that turns that to a pd.DataFrame before passing +# down to the user function +columns = df.columns + +def wrapped(*cols): +import pandas as pd +return func(pd.concat(cols, axis=1, keys=columns)) + +wrapped_udf_obj = pandas_udf(wrapped, returnType) +udf_column = wrapped_udf_obj(*[df[col] for col in df.columns]) --- End diff -- Added --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18664: [SPARK-21375][PYSPARK][SQL][WIP] Add Date and Timestamp ...
Github user icexelloss commented on the issue: https://github.com/apache/spark/pull/18664 Bryan, I haven't created. Go ahead! On Fri, Oct 6, 2017 at 5:45 PM Bryan Cutler <notificati...@github.com> wrote: > Thanks all for the discussion. I think there are a lot of subtleties at > play here, and what may or may not be considered a bug can depend on the > users intent. Regardless, I agree that there needs to be user facing > documentation that will address these details, such as the questions posed > by @gatorsmile <https://github.com/gatorsmile> above. I can create a JIRA > for this if @icexelloss <https://github.com/icexelloss> hasn't already. > > I am okay with proceeding separately for dealing with timezone, and > matching the behaviour with Arrow to the existing behaviour without Arrow > here with respect to timezone. > > @HyukjinKwon <https://github.com/hyukjinkwon> so you are suggesting to > not use a timezone for Arrow timestamps? We discussed that earlier in this > PR, but maybe that is the best solution for now > > â > You are receiving this because you were mentioned. > Reply to this email directly, view it on GitHub > <https://github.com/apache/spark/pull/18664#issuecomment-334877795>, or mute > the thread > <https://github.com/notifications/unsubscribe-auth/AAwbrN5xDaLC77p3DZGeMgxuKeP1vtcDks5spp-GgaJpZM4Oateu> > . > --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...
Github user icexelloss commented on a diff in the pull request: https://github.com/apache/spark/pull/18732#discussion_r142583590 --- Diff: python/pyspark/sql/group.py --- @@ -192,7 +193,66 @@ def pivot(self, pivot_col, values=None): jgd = self._jgd.pivot(pivot_col) else: jgd = self._jgd.pivot(pivot_col, values) -return GroupedData(jgd, self.sql_ctx) +return GroupedData(jgd, self) --- End diff -- A good catch. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...
Github user icexelloss commented on a diff in the pull request: https://github.com/apache/spark/pull/18732#discussion_r142583338 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowEvalPythonExec.scala --- @@ -44,14 +63,17 @@ case class ArrowEvalPythonExec(udfs: Seq[PythonUDF], output: Seq[Attribute], chi val schemaOut = StructType.fromAttributes(output.drop(child.output.length).zipWithIndex .map { case (attr, i) => attr.withName(s"_$i") }) +val batchSize = conf.arrowMaxRecordsPerBatch +val batchIter = if (batchSize > 0) new BatchIterator(iter, batchSize) else Iterator(iter) --- End diff -- This is actually my first implementation. However it turns out I cannot hold reference to input rows without copy so `grouped` doesn't work (grouped uses a ArrayBuffer to keep references) --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...
Github user icexelloss commented on a diff in the pull request: https://github.com/apache/spark/pull/18732#discussion_r142678914 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/python/FlatMapGroupsInPandasExec.scala --- @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.python + +import scala.collection.JavaConverters._ + +import org.apache.spark.TaskContext +import org.apache.spark.api.python.{ChainedPythonFunctions, PythonEvalType} +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.plans.physical.{ClusteredDistribution, Distribution, Partitioning} +import org.apache.spark.sql.execution.{GroupedIterator, SparkPlan, UnaryExecNode} + +case class FlatMapGroupsInPandasExec( +groupingAttributes: Seq[Attribute], +func: Expression, +output: Seq[Attribute], +child: SparkPlan) + extends UnaryExecNode { + + private val pandasFunction = func.asInstanceOf[PythonUDF].func + + override def outputPartitioning: Partitioning = child.outputPartitioning + + override def producedAttributes: AttributeSet = AttributeSet(output) + + override def requiredChildDistribution: Seq[Distribution] = +ClusteredDistribution(groupingAttributes) :: Nil + + override def requiredChildOrdering: Seq[Seq[SortOrder]] = +Seq(groupingAttributes.map(SortOrder(_, Ascending))) + + override protected def doExecute(): RDD[InternalRow] = { +val inputRDD = child.execute() + +val bufferSize = inputRDD.conf.getInt("spark.buffer.size", 65536) +val reuseWorker = inputRDD.conf.getBoolean("spark.python.worker.reuse", defaultValue = true) +val chainedFunc = Seq(ChainedPythonFunctions(Seq(pandasFunction))) +val argOffsets = Array((0 until child.schema.length).toArray) + +inputRDD.mapPartitionsInternal { iter => + val grouped = GroupedIterator(iter, groupingAttributes, child.output) + val context = TaskContext.get() + + val columnarBatchIter = new ArrowPythonRunner( +chainedFunc, bufferSize, reuseWorker, +PythonEvalType.SQL_PANDAS_UDF, argOffsets, child.schema) +.compute(grouped.map(_._2), context.partitionId(), context) + + val rowIter = new Iterator[InternalRow] { +private var currentIter = if (columnarBatchIter.hasNext) { + val batch = columnarBatchIter.next() + batch.rowIterator.asScala --- End diff -- The returned schema is checked on the python side. It will throw exception when serializer tries to coerce series types. Here is the test that covers wrong return types: https://github.com/icexelloss/spark/blob/groupby-apply-SPARK-20396/python/pyspark/sql/tests.py#L3458 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...
Github user icexelloss commented on a diff in the pull request: https://github.com/apache/spark/pull/18732#discussion_r142689702 --- Diff: python/pyspark/sql/tests.py --- @@ -3106,8 +3106,9 @@ def assertFramesEqual(self, df_with_arrow, df_without): self.assertTrue(df_without.equals(df_with_arrow), msg=msg) def test_unsupported_datatype(self): -schema = StructType([StructField("dt", DateType(), True)]) -df = self.spark.createDataFrame([(datetime.date(1970, 1, 1),)], schema=schema) --- End diff -- I think it's a white space thing. Let me revert this. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...
Github user icexelloss commented on a diff in the pull request: https://github.com/apache/spark/pull/18732#discussion_r142690650 --- Diff: python/pyspark/sql/group.py --- @@ -192,7 +193,66 @@ def pivot(self, pivot_col, values=None): jgd = self._jgd.pivot(pivot_col) else: jgd = self._jgd.pivot(pivot_col, values) -return GroupedData(jgd, self.sql_ctx) +return GroupedData(jgd, self) + +def apply(self, udf): +""" +Maps each group of the current :class:`DataFrame` using a pandas udf and returns the result +as a :class:`DataFrame`. + +The user-function should take a `pandas.DataFrame` and return another `pandas.DataFrame`. +Each group is passed as a `pandas.DataFrame` to the user-function and the returned +`pandas.DataFrame` are combined as a :class:`DataFrame`. The returned `pandas.DataFrame` +can be arbitrary length and its schema should match the returnType of the pandas udf. + +:param udf: A wrapped function returned by `pandas_udf` + +>>> df = spark.createDataFrame( +... [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)], +... ("id", "v")) +>>> @pandas_udf(returnType=df.schema) +... def normalize(pdf): +... v = pdf.v +... return pdf.assign(v=(v - v.mean()) / v.std()) +>>> df.groupby('id').apply(normalize).show() # doctest: + SKIP --- End diff -- Thanks! Fixed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...
Github user icexelloss commented on a diff in the pull request: https://github.com/apache/spark/pull/18732#discussion_r142690602 --- Diff: python/pyspark/sql/group.py --- @@ -192,7 +193,66 @@ def pivot(self, pivot_col, values=None): jgd = self._jgd.pivot(pivot_col) else: jgd = self._jgd.pivot(pivot_col, values) -return GroupedData(jgd, self.sql_ctx) +return GroupedData(jgd, self) --- End diff -- Fixed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...
Github user icexelloss commented on a diff in the pull request: https://github.com/apache/spark/pull/18732#discussion_r142694484 --- Diff: python/pyspark/worker.py --- @@ -74,17 +74,35 @@ def wrap_udf(f, return_type): def wrap_pandas_udf(f, return_type): -arrow_return_type = toArrowType(return_type) - -def verify_result_length(*a): -result = f(*a) -if not hasattr(result, "__len__"): -raise TypeError("Return type of pandas_udf should be a Pandas.Series") -if len(result) != len(a[0]): -raise RuntimeError("Result vector from pandas_udf was not the required length: " - "expected %d, got %d" % (len(a[0]), len(result))) -return result -return lambda *a: (verify_result_length(*a), arrow_return_type) +if isinstance(return_type, StructType): +arrow_return_types = [to_arrow_type(field.dataType) for field in return_type] + +def fn(*a): +import pandas as pd +out = f(*a) +assert isinstance(out, pd.DataFrame), \ +'Return value from the user function is not a pandas.DataFrame.' --- End diff -- Good catch. Yeah let's keep such terms consistent. Fixed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...
Github user icexelloss commented on a diff in the pull request: https://github.com/apache/spark/pull/18732#discussion_r142694381 --- Diff: python/pyspark/worker.py --- @@ -74,17 +74,35 @@ def wrap_udf(f, return_type): def wrap_pandas_udf(f, return_type): -arrow_return_type = toArrowType(return_type) - -def verify_result_length(*a): -result = f(*a) -if not hasattr(result, "__len__"): -raise TypeError("Return type of pandas_udf should be a Pandas.Series") -if len(result) != len(a[0]): -raise RuntimeError("Result vector from pandas_udf was not the required length: " - "expected %d, got %d" % (len(a[0]), len(result))) -return result -return lambda *a: (verify_result_length(*a), arrow_return_type) +if isinstance(return_type, StructType): +arrow_return_types = [to_arrow_type(field.dataType) for field in return_type] + +def fn(*a): +import pandas as pd +out = f(*a) --- End diff -- Done. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...
Github user icexelloss commented on a diff in the pull request: https://github.com/apache/spark/pull/18732#discussion_r142697418 --- Diff: python/pyspark/worker.py --- @@ -74,17 +74,35 @@ def wrap_udf(f, return_type): def wrap_pandas_udf(f, return_type): -arrow_return_type = toArrowType(return_type) - -def verify_result_length(*a): -result = f(*a) -if not hasattr(result, "__len__"): -raise TypeError("Return type of pandas_udf should be a Pandas.Series") -if len(result) != len(a[0]): -raise RuntimeError("Result vector from pandas_udf was not the required length: " - "expected %d, got %d" % (len(a[0]), len(result))) -return result -return lambda *a: (verify_result_length(*a), arrow_return_type) +if isinstance(return_type, StructType): --- End diff -- normal pandas doesn't support `StructType` as returnType that's why this works. However, I agree the way we distinguish grouping udf and normal udf is not clean. Ideally we should have a cleaner way of defining such wrapping functions for different pandas_udf use cases. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...
Github user icexelloss commented on a diff in the pull request: https://github.com/apache/spark/pull/18732#discussion_r142692448 --- Diff: python/pyspark/sql/tests.py --- @@ -3106,8 +3106,9 @@ def assertFramesEqual(self, df_with_arrow, df_without): self.assertTrue(df_without.equals(df_with_arrow), msg=msg) def test_unsupported_datatype(self): -schema = StructType([StructField("dt", DateType(), True)]) -df = self.spark.createDataFrame([(datetime.date(1970, 1, 1),)], schema=schema) --- End diff -- Reverted. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...
Github user icexelloss commented on a diff in the pull request: https://github.com/apache/spark/pull/18732#discussion_r142695501 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowEvalPythonExec.scala --- @@ -26,6 +26,25 @@ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.execution.SparkPlan import org.apache.spark.sql.types.StructType +private class BatchIterator[T](iter: Iterator[T], batchSize: Int) + extends Iterator[Iterator[T]] { + + override def hasNext: Boolean = iter.hasNext + + override def next(): Iterator[T] = { --- End diff -- +1. Let me add that. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...
Github user icexelloss commented on a diff in the pull request: https://github.com/apache/spark/pull/18732#discussion_r142695129 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/object.scala --- @@ -519,3 +519,18 @@ case class CoGroup( outputObjAttr: Attribute, left: LogicalPlan, right: LogicalPlan) extends BinaryNode with ObjectProducer + +case class FlatMapGroupsInPandas( --- End diff -- Yes agreed. I will fix that. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...
Github user icexelloss commented on a diff in the pull request: https://github.com/apache/spark/pull/18732#discussion_r142694835 --- Diff: python/pyspark/worker.py --- @@ -74,17 +74,35 @@ def wrap_udf(f, return_type): def wrap_pandas_udf(f, return_type): -arrow_return_type = toArrowType(return_type) - -def verify_result_length(*a): -result = f(*a) -if not hasattr(result, "__len__"): -raise TypeError("Return type of pandas_udf should be a Pandas.Series") -if len(result) != len(a[0]): -raise RuntimeError("Result vector from pandas_udf was not the required length: " - "expected %d, got %d" % (len(a[0]), len(result))) -return result -return lambda *a: (verify_result_length(*a), arrow_return_type) +if isinstance(return_type, StructType): +arrow_return_types = [to_arrow_type(field.dataType) for field in return_type] + +def fn(*a): +import pandas as pd +out = f(*a) +assert isinstance(out, pd.DataFrame), \ +'Return value from the user function is not a pandas.DataFrame.' +assert len(out.columns) == len(arrow_return_types), \ +'Number of columns of the returned pd.DataFrame doesn\'t match output schema. ' \ --- End diff -- Good catch. Fixed. (Btw thanks for catching these small things) --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...
Github user icexelloss commented on a diff in the pull request: https://github.com/apache/spark/pull/18732#discussion_r142691179 --- Diff: python/pyspark/sql/tests.py --- @@ -3106,8 +3106,9 @@ def assertFramesEqual(self, df_with_arrow, df_without): self.assertTrue(df_without.equals(df_with_arrow), msg=msg) def test_unsupported_datatype(self): -schema = StructType([StructField("dt", DateType(), True)]) -df = self.spark.createDataFrame([(datetime.date(1970, 1, 1),)], schema=schema) --- End diff -- Oh actually DataType() -> TimestampType(), let me double check. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...
Github user icexelloss commented on a diff in the pull request: https://github.com/apache/spark/pull/18732#discussion_r142704126 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/RelationalGroupedDataset.scala --- @@ -435,6 +435,33 @@ class RelationalGroupedDataset protected[sql]( df.logicalPlan.output, df.logicalPlan)) } + + private[sql] def flatMapGroupsInPandas(expr: PythonUDF): DataFrame = { +require(expr.vectorized, "Must pass a vectorized python udf") + +val output = expr.dataType match { + case s: StructType => s.map { +case StructField(name, dataType, nullable, metadata) => + AttributeReference(name, dataType, nullable, metadata)() + } +} + +val groupingAttributes: Seq[Attribute] = groupingExprs.map { + case ne: NamedExpression => ne.toAttribute --- End diff -- Fixed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...
Github user icexelloss commented on a diff in the pull request: https://github.com/apache/spark/pull/18732#discussion_r142703487 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/RelationalGroupedDataset.scala --- @@ -435,6 +435,33 @@ class RelationalGroupedDataset protected[sql]( df.logicalPlan.output, df.logicalPlan)) } + + private[sql] def flatMapGroupsInPandas(expr: PythonUDF): DataFrame = { +require(expr.vectorized, "Must pass a vectorized python udf") + +val output = expr.dataType match { + case s: StructType => s.map { +case StructField(name, dataType, nullable, metadata) => + AttributeReference(name, dataType, nullable, metadata)() + } +} + +val groupingAttributes: Seq[Attribute] = groupingExprs.map { + case ne: NamedExpression => ne.toAttribute +} + +val plan = FlatMapGroupsInPandas( + groupingAttributes, + expr, + output, + df.logicalPlan +) --- End diff -- Fixed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...
Github user icexelloss commented on a diff in the pull request: https://github.com/apache/spark/pull/18732#discussion_r142693843 --- Diff: python/pyspark/worker.py --- @@ -74,17 +74,35 @@ def wrap_udf(f, return_type): def wrap_pandas_udf(f, return_type): -arrow_return_type = toArrowType(return_type) - -def verify_result_length(*a): -result = f(*a) -if not hasattr(result, "__len__"): -raise TypeError("Return type of pandas_udf should be a Pandas.Series") -if len(result) != len(a[0]): -raise RuntimeError("Result vector from pandas_udf was not the required length: " - "expected %d, got %d" % (len(a[0]), len(result))) -return result -return lambda *a: (verify_result_length(*a), arrow_return_type) +if isinstance(return_type, StructType): +arrow_return_types = [to_arrow_type(field.dataType) for field in return_type] + +def fn(*a): +import pandas as pd +out = f(*a) --- End diff -- Yes that is more consistent. Let me change that. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...
Github user icexelloss commented on a diff in the pull request: https://github.com/apache/spark/pull/18732#discussion_r142703829 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/RelationalGroupedDataset.scala --- @@ -435,6 +435,33 @@ class RelationalGroupedDataset protected[sql]( df.logicalPlan.output, df.logicalPlan)) } + + private[sql] def flatMapGroupsInPandas(expr: PythonUDF): DataFrame = { +require(expr.vectorized, "Must pass a vectorized python udf") + +val output = expr.dataType match { + case s: StructType => s.map { --- End diff -- Fixed --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...
Github user icexelloss commented on a diff in the pull request: https://github.com/apache/spark/pull/18732#discussion_r142498880 --- Diff: python/pyspark/sql/group.py --- @@ -194,6 +194,37 @@ def pivot(self, pivot_col, values=None): jgd = self._jgd.pivot(pivot_col, values) return GroupedData(jgd, self.sql_ctx) +def apply(self, udf_obj): +""" +Maps each group of the current [[DataFrame]] using a pandas udf and returns the result +as a :class:`DataFrame`. + +""" +from pyspark.sql.functions import pandas_udf + +if not udf_obj._vectorized: --- End diff -- Fixed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...
Github user icexelloss commented on a diff in the pull request: https://github.com/apache/spark/pull/18732#discussion_r142498841 --- Diff: python/pyspark/worker.py --- @@ -74,17 +75,33 @@ def wrap_udf(f, return_type): def wrap_pandas_udf(f, return_type): -arrow_return_type = toArrowType(return_type) - -def verify_result_length(*a): -result = f(*a) -if not hasattr(result, "__len__"): -raise TypeError("Return type of pandas_udf should be a Pandas.Series") -if len(result) != len(a[0]): -raise RuntimeError("Result vector from pandas_udf was not the required length: " - "expected %d, got %d" % (len(a[0]), len(result))) -return result -return lambda *a: (verify_result_length(*a), arrow_return_type) +if isinstance(return_type, StructType): +arrow_return_types = list(to_arrow_type(field.dataType) for field in return_type) + +def fn(*a): +import pandas as pd +out = f(*a) +assert isinstance(out, pd.DataFrame), 'Must return a pd.DataFrame' +assert len(out.columns) == len(arrow_return_types), \ +'Columns of pd.DataFrame don\'t match return schema' --- End diff -- The result df actually don't have length required- it could be of different length. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...
Github user icexelloss commented on a diff in the pull request: https://github.com/apache/spark/pull/18732#discussion_r142498939 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/python/FlatMapGroupsInPandasExec.scala --- @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.python + +import scala.collection.JavaConverters._ + +import org.apache.spark.TaskContext +import org.apache.spark.api.python.{ChainedPythonFunctions, PythonEvalType} +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.{Ascending, Attribute, AttributeSet, Expression, NamedExpression, SortOrder, UnsafeProjection} +import org.apache.spark.sql.catalyst.plans.physical.{ClusteredDistribution, Distribution, Partitioning} +import org.apache.spark.sql.execution.{GroupedIterator, SparkPlan, UnaryExecNode} + +case class FlatMapGroupsInPandasExec( +groupingAttributes: Seq[Attribute], +func: Expression, +output: Seq[Attribute], +child: SparkPlan) + extends UnaryExecNode { + + private val pandasFunction = func.asInstanceOf[PythonUDF].func + + override def outputPartitioning: Partitioning = child.outputPartitioning + + override def producedAttributes: AttributeSet = AttributeSet(output) + + override def requiredChildDistribution: Seq[Distribution] = +ClusteredDistribution(groupingAttributes) :: Nil + + override def requiredChildOrdering: Seq[Seq[SortOrder]] = +Seq(groupingAttributes.map(SortOrder(_, Ascending))) + + override protected def doExecute(): RDD[InternalRow] = { +val inputRDD = child.execute() + +val bufferSize = inputRDD.conf.getInt("spark.buffer.size", 65536) +val reuseWorker = inputRDD.conf.getBoolean("spark.python.worker.reuse", defaultValue = true) +val chainedFunc = Seq(ChainedPythonFunctions(Seq(pandasFunction))) +val argOffsets = Array((0 until child.schema.length).toArray) + +inputRDD.mapPartitionsInternal { iter => + val grouped = GroupedIterator(iter, groupingAttributes, child.output) + val context = TaskContext.get() + + val columnarBatchIter = new ArrowPythonRunner( +chainedFunc, bufferSize, reuseWorker, +PythonEvalType.SQL_PANDAS_UDF, argOffsets, child.schema) +.compute(grouped.map(_._2), context.partitionId(), context) + + val vectorRowIter = new Iterator[InternalRow] { +private var currentIter = if (columnarBatchIter.hasNext) { + val batch = columnarBatchIter.next() + // assert(schemaOut.equals(batch.schema), + // s"Invalid schema from pandas_udf: expected $schemaOut, got ${batch.schema}") --- End diff -- Removed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...
Github user icexelloss commented on a diff in the pull request: https://github.com/apache/spark/pull/18732#discussion_r142482842 --- Diff: python/pyspark/sql/functions.py --- @@ -2181,31 +2186,69 @@ def udf(f=None, returnType=StringType()): @since(2.3) def pandas_udf(f=None, returnType=StringType()): """ -Creates a :class:`Column` expression representing a user defined function (UDF) that accepts -`Pandas.Series` as input arguments and outputs a `Pandas.Series` of the same length. +Creates a :class:`Column` expression representing a vectorized user defined function (UDF). + +The user-defined function can define one of the following transformations: +1. One or more `pandas.Series` -> A `pandas.Series` + + This udf is used with `DataFrame.withColumn` and `DataFrame.select`. + The returnType should be a primitive data type, e.g., DoubleType() + + Example: + + >>> from pyspark.sql.types import IntegerType, StringType + >>> slen = pandas_udf(lambda s: s.str.len(), IntegerType()) + >>> @pandas_udf(returnType=StringType()) + ... def to_upper(s): + ... return s.str.upper() + ... + >>> @pandas_udf(returnType="integer") + ... def add_one(x): + ... return x + 1 + ... + >>> df = spark.createDataFrame([(1, "John Doe", 21)], ("id", "name", "age")) + >>> df.select(slen("name").alias("slen(name)"), to_upper("name"), add_one("age")) \\ + ... .show() # doctest: +SKIP + +--+--++ + |slen(name)|to_upper(name)|add_one(age)| + +--+--++ + | 8| JOHN DOE| 22| + +--+--++ + +2. A `pandas.DataFrame` -> A `pandas.DataFrame` + + This udf is used with `GroupedData.apply` + The returnType should be a StructType describing the schema of the returned + `pandas.DataFrame`. + + Example: + + >>> df = spark.createDataFrame([(1, 1.0), (1, 2.0), (2, 3.0), (2, 4.0)], ("id", "v")) + >>> @pandas_udf(returnType=df.schema) + ... def normalize(df): + ... v = df.v + ... ret = df.assign(v=(v - v.mean()) / v.std()) + >>> df.groupby('id').apply(normalize).show() # doctest: + SKIP + +---+---+ + | id| v| + +---+---+ + | 1|-0.7071067811865475| + | 1| 0.7071067811865475| + | 2|-0.7071067811865475| + | 2| 0.7071067811865475| + +---+---+ + + +.. note:: The user-defined functions must be deterministic. :param f: python function if used as a standalone function :param returnType: a :class:`pyspark.sql.types.DataType` object ->>> from pyspark.sql.types import IntegerType, StringType ->>> slen = pandas_udf(lambda s: s.str.len(), IntegerType()) ->>> @pandas_udf(returnType=StringType()) -... def to_upper(s): -... return s.str.upper() -... ->>> @pandas_udf(returnType="integer") -... def add_one(x): -... return x + 1 -... ->>> df = spark.createDataFrame([(1, "John Doe", 21)], ("id", "name", "age")) ->>> df.select(slen("name").alias("slen(name)"), to_upper("name"), add_one("age")) \\ -... .show() # doctest: +SKIP -+--+--++ -|slen(name)|to_upper(name)|add_one(age)| -+--+--++ -| 8| JOHN DOE| 22| -+--+--++ """ +import pandas as pd +if isinstance(returnType, pd.Series): --- End diff -- Oh that's neat. Thanks! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...
Github user icexelloss commented on a diff in the pull request: https://github.com/apache/spark/pull/18732#discussion_r142486439 --- Diff: python/pyspark/sql/group.py --- @@ -194,6 +194,37 @@ def pivot(self, pivot_col, values=None): jgd = self._jgd.pivot(pivot_col, values) return GroupedData(jgd, self.sql_ctx) +def apply(self, udf_obj): +""" +Maps each group of the current [[DataFrame]] using a pandas udf and returns the result +as a :class:`DataFrame`. + +""" +from pyspark.sql.functions import pandas_udf + +if not udf_obj._vectorized: --- End diff -- I ended up checking `hasattr(input, 'func')` to check if it's a valid input. It's not great but I don't what's better though. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...
Github user icexelloss commented on a diff in the pull request: https://github.com/apache/spark/pull/18732#discussion_r142497616 --- Diff: python/pyspark/sql/functions.py --- @@ -2120,6 +2120,7 @@ def wrapper(*args): else self.func.__class__.__module__) wrapper.func = self.func wrapper.returnType = self.returnType +wrapper._vectorized = self._vectorized --- End diff -- Good call. I think it should not be underscore too. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...
Github user icexelloss commented on a diff in the pull request: https://github.com/apache/spark/pull/18732#discussion_r142500980 --- Diff: python/pyspark/sql/functions.py --- @@ -2129,8 +2130,12 @@ def _create_udf(f, returnType, vectorized): def _udf(f, returnType=StringType(), vectorized=vectorized): if vectorized: import inspect -if len(inspect.getargspec(f).args) == 0: -raise NotImplementedError("0-parameter pandas_udfs are not currently supported") +argspec = inspect.getargspec(f) +if len(argspec.args) == 0 and argspec.varargs is None: --- End diff -- Added. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...
Github user icexelloss commented on a diff in the pull request: https://github.com/apache/spark/pull/18732#discussion_r142474570 --- Diff: python/pyspark/sql/group.py --- @@ -194,6 +194,37 @@ def pivot(self, pivot_col, values=None): jgd = self._jgd.pivot(pivot_col, values) return GroupedData(jgd, self.sql_ctx) +def apply(self, udf_obj): +""" +Maps each group of the current [[DataFrame]] using a pandas udf and returns the result +as a :class:`DataFrame`. + +""" +from pyspark.sql.functions import pandas_udf + +if not udf_obj._vectorized: +raise ValueError("Must pass a pandas_udf") --- End diff -- Changed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...
Github user icexelloss commented on a diff in the pull request: https://github.com/apache/spark/pull/18732#discussion_r142499286 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/object.scala --- @@ -519,3 +519,18 @@ case class CoGroup( outputObjAttr: Attribute, left: LogicalPlan, right: LogicalPlan) extends BinaryNode with ObjectProducer + +case class FlatMapGroupsInPandas( --- End diff -- I like `FlatMapGroupsInPandas` a little better because `FlatMapGroupsInPython` doesn't imply it's using a vectorized udf. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...
Github user icexelloss commented on a diff in the pull request: https://github.com/apache/spark/pull/18732#discussion_r142499092 --- Diff: python/pyspark/sql/functions.py --- @@ -2120,6 +2120,7 @@ def wrapper(*args): else self.func.__class__.__module__) wrapper.func = self.func wrapper.returnType = self.returnType +wrapper._vectorized = self._vectorized --- End diff -- Fixed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...
Github user icexelloss commented on a diff in the pull request: https://github.com/apache/spark/pull/18732#discussion_r142499712 --- Diff: python/pyspark/sql/functions.py --- @@ -2129,8 +2130,12 @@ def _create_udf(f, returnType, vectorized): def _udf(f, returnType=StringType(), vectorized=vectorized): if vectorized: import inspect -if len(inspect.getargspec(f).args) == 0: -raise NotImplementedError("0-parameter pandas_udfs are not currently supported") +argspec = inspect.getargspec(f) +if len(argspec.args) == 0 and argspec.varargs is None: --- End diff -- I think `varargs` are fine. I will add the test. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...
Github user icexelloss commented on a diff in the pull request: https://github.com/apache/spark/pull/18732#discussion_r142478440 --- Diff: python/pyspark/sql/group.py --- @@ -194,6 +194,37 @@ def pivot(self, pivot_col, values=None): jgd = self._jgd.pivot(pivot_col, values) return GroupedData(jgd, self.sql_ctx) +def apply(self, udf_obj): +""" +Maps each group of the current [[DataFrame]] using a pandas udf and returns the result +as a :class:`DataFrame`. + +""" +from pyspark.sql.functions import pandas_udf + +if not udf_obj._vectorized: --- End diff -- It seems `foo?` and `foo??` only shows the property doc string if `foo` is a `function`. If `foo` is a `UserDefinedFunction`, it will show the docstring for the class. For that reason, I think we should keep the return value of `udf` and `pandas_udf` to be a wrapped function. And I will do checks on `udf_obj.func` to see if this is a valid wrapped udf function. @BryanCutler what do you think? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...
Github user icexelloss commented on a diff in the pull request: https://github.com/apache/spark/pull/18732#discussion_r142740947 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/RelationalGroupedDataset.scala --- @@ -435,6 +435,33 @@ class RelationalGroupedDataset protected[sql]( df.logicalPlan.output, df.logicalPlan)) } + + private[sql] def flatMapGroupsInPandas(expr: PythonUDF): DataFrame = { +require(expr.vectorized, "Must pass a vectorized python udf") + +val output = expr.dataType match { + case s: StructType => s.map { +case StructField(name, dataType, nullable, metadata) => + AttributeReference(name, dataType, nullable, metadata)() + } +} + +val groupingAttributes: Seq[Attribute] = groupingExprs.map { + case ne: NamedExpression => ne.toAttribute +} + +val plan = FlatMapGroupsInPandas( + groupingAttributes, + expr, + output, + df.logicalPlan +) + +Dataset.ofRows( --- End diff -- Fixed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...
Github user icexelloss commented on a diff in the pull request: https://github.com/apache/spark/pull/18732#discussion_r142518730 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowEvalPythonExec.scala --- @@ -26,6 +26,28 @@ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.execution.SparkPlan import org.apache.spark.sql.types.StructType +private object BatchIterator { + class InnerIterator[T](iter: Iterator[T], batchSize: Int) extends Iterator[T] { --- End diff -- Done. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...
Github user icexelloss commented on a diff in the pull request: https://github.com/apache/spark/pull/18732#discussion_r142533141 --- Diff: python/pyspark/sql/functions.py --- @@ -2058,7 +2058,7 @@ def __init__(self, func, returnType, name=None, vectorized=False): self._name = name or ( func.__name__ if hasattr(func, '__name__') else func.__class__.__name__) -self._vectorized = vectorized +self.vectorized = vectorized --- End diff -- I kind of dislike the inconsistency between `UserDefinedFunction` and its wrapped function. I think they are just the same thing except for the wrapped function has doc string. For ease of mind, I think we should make them either both private or public. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...
Github user icexelloss commented on a diff in the pull request: https://github.com/apache/spark/pull/18732#discussion_r142523354 --- Diff: python/pyspark/sql/group.py --- @@ -194,6 +194,65 @@ def pivot(self, pivot_col, values=None): jgd = self._jgd.pivot(pivot_col, values) return GroupedData(jgd, self.sql_ctx) +def apply(self, udf): +""" +Maps each group of the current :class:`DataFrame` using a pandas udf and returns the result +as a :class:`DataFrame`. + +The user-function should take a `pandas.DataFrame` and return another `pandas.DataFrame`. +Each group is passed as a `pandas.DataFrame` to the user-function and the returned +`pandas.DataFrame` are combined as a :class:`DataFrame`. The returned `pandas.DataFrame` +can be arbitrary length and its schema should match the returnType of the pandas udf. + +:param udf: A wrapped function returned by `pandas_udf` + +>>> df = spark.createDataFrame( +... [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)], +... ("id", "v")) +>>> @pandas_udf(returnType=df.schema) +... def normalize(pdf): +... v = pdf.v +... return pdf.assign(v=(v - v.mean()) / v.std()) +>>> df.groupby('id').apply(normalize).show() # doctest: + SKIP ++---+---+ +| id| v| ++---+---+ +| 1|-0.7071067811865475| +| 1| 0.7071067811865475| +| 2|-0.8320502943378437| +| 2|-0.2773500981126146| +| 2| 1.1094003924504583| ++---+---+ + +.. seealso:: :meth:`pyspark.sql.functions.pandas_udf` + +""" +from pyspark.sql.functions import pandas_udf + +# Columns are special because hasattr always return True +if isinstance(udf, Column) or not hasattr(udf, 'func') or not udf.vectorized: +raise ValueError("The argument to apply must be a pandas_udf") +if not isinstance(udf.returnType, StructType): +raise ValueError("The returnType of the pandas_udf must be a StructType") + +df = DataFrame(self._jgd.df(), self.sql_ctx) +func = udf.func +returnType = udf.returnType + +# The python executors expects the function to take a list of pd.Series as input +# So we to create a wrapper function that turns that to a pd.DataFrame before passing +# down to the user function +columns = df.columns + +def wrapped(*cols): +import pandas as pd +return func(pd.concat(cols, axis=1, keys=columns)) --- End diff -- I think we need to think a little more about how do we handle different formats of arrow data. Currently, the input of arrow serializer is a list of (pd.Series, DataType), I feel it's cleaner that this class not deal with type coercion and just serialization. It could take a `pyarrow.Table` for instance and let caller construct the `pyarrow.Table`. Another thing to think about is whatever the data we are passing are not purely `pd.Series` and `pd.DataFrame`. What if, for instance, we want to serialize a (pd.Series, pd.DataFrame) tuple or a tuple of (scalar value, pd.DataFrame). Maybe somehow making the serializer composable is more flexiable. i.e. a class knows how to serialize `pd.Series`, a class knows how to serialize `pd.DataFrame` and if we want to serialize (pd.Series, pd.DataFrame) tuple we can compose them. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...
Github user icexelloss commented on a diff in the pull request: https://github.com/apache/spark/pull/18732#discussion_r142482577 --- Diff: python/pyspark/sql/group.py --- @@ -194,6 +194,37 @@ def pivot(self, pivot_col, values=None): jgd = self._jgd.pivot(pivot_col, values) return GroupedData(jgd, self.sql_ctx) +def apply(self, udf_obj): +""" +Maps each group of the current [[DataFrame]] using a pandas udf and returns the result +as a :class:`DataFrame`. + +""" +from pyspark.sql.functions import pandas_udf + +if not udf_obj._vectorized: +raise ValueError("Must pass a pandas_udf") +if not isinstance(udf_obj.returnType, StructType): +raise ValueError("Must pass a StructType as return type in pandas_udf") --- End diff -- Fixed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...
Github user icexelloss commented on a diff in the pull request: https://github.com/apache/spark/pull/18732#discussion_r142571075 --- Diff: python/pyspark/worker.py --- @@ -32,8 +32,9 @@ from pyspark.serializers import write_with_length, write_int, read_long, \ write_long, read_int, SpecialLengths, PythonEvalType, UTF8Deserializer, PickleSerializer, \ BatchedSerializer, ArrowStreamPandasSerializer -from pyspark.sql.types import toArrowType +from pyspark.sql.types import to_arrow_type from pyspark import shuffle +from pyspark.sql.types import StructType --- End diff -- Good catch. Fixed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...
Github user icexelloss commented on a diff in the pull request: https://github.com/apache/spark/pull/18732#discussion_r142571047 --- Diff: python/pyspark/sql/tests.py --- @@ -3376,6 +3377,132 @@ def test_vectorized_udf_empty_partition(self): res = df.select(f(col('id'))) self.assertEquals(df.collect(), res.collect()) +def test_vectorized_udf_varargs(self): +from pyspark.sql.functions import pandas_udf, col +df = self.spark.createDataFrame(self.sc.parallelize([Row(id=1)], 2)) +f = pandas_udf(lambda *v: v[0], LongType()) +res = df.select(f(col('id'))) +self.assertEquals(df.collect(), res.collect()) + + +@unittest.skipIf(not _have_pandas or not _have_arrow, "Pandas or Arrow not installed") +class GroupbyApplyTests(ReusedPySparkTestCase): +@classmethod +def setUpClass(cls): +ReusedPySparkTestCase.setUpClass() +cls.spark = SparkSession(cls.sc) + +@classmethod +def tearDownClass(cls): +ReusedPySparkTestCase.tearDownClass() +cls.spark.stop() + +def assertFramesEqual(self, expected, result): +msg = ("DataFrames are not equal: " + + ("\n\nExpected:\n%s\n%s" % (expected, expected.dtypes)) + + ("\n\nResult:\n%s\n%s" % (result, result.dtypes))) +self.assertTrue(expected.equals(result), msg=msg) + +@property +def data(self): +from pyspark.sql.functions import pandas_udf, array, explode, col, lit +return self.spark.range(10).toDF('id') \ +.withColumn("vs", array([lit(i) for i in range(20, 30)])) \ +.withColumn("v", explode(col('vs'))).drop('vs') + +def test_simple(self): +from pyspark.sql.functions import pandas_udf +df = self.data + +def foo(df): +ret = df +ret = ret.assign(v1=df.v * df.id * 1.0) +ret = ret.assign(v2=df.v + df.id) +return ret + +foo_udf = pandas_udf( +foo, +StructType( +[StructField('id', LongType()), + StructField('v', IntegerType()), + StructField('v1', DoubleType()), + StructField('v2', LongType())])) + +result = df.groupby('id').apply(foo_udf).sort('id').toPandas() +expected = df.toPandas().groupby('id').apply(foo).reset_index(drop=True) +self.assertFramesEqual(expected, result) + +def test_decorator(self): +from pyspark.sql.functions import pandas_udf +df = self.data + +@pandas_udf(StructType( +[StructField('id', LongType()), + StructField('v', IntegerType()), + StructField('v1', DoubleType()), + StructField('v2', LongType())])) +def foo(df): +ret = df +ret = ret.assign(v1=df.v * df.id * 1.0) +ret = ret.assign(v2=df.v + df.id) +return ret --- End diff -- Fixed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...
Github user icexelloss commented on a diff in the pull request: https://github.com/apache/spark/pull/18732#discussion_r142571038 --- Diff: python/pyspark/sql/tests.py --- @@ -3376,6 +3377,132 @@ def test_vectorized_udf_empty_partition(self): res = df.select(f(col('id'))) self.assertEquals(df.collect(), res.collect()) +def test_vectorized_udf_varargs(self): +from pyspark.sql.functions import pandas_udf, col +df = self.spark.createDataFrame(self.sc.parallelize([Row(id=1)], 2)) +f = pandas_udf(lambda *v: v[0], LongType()) +res = df.select(f(col('id'))) +self.assertEquals(df.collect(), res.collect()) + + +@unittest.skipIf(not _have_pandas or not _have_arrow, "Pandas or Arrow not installed") +class GroupbyApplyTests(ReusedPySparkTestCase): +@classmethod +def setUpClass(cls): +ReusedPySparkTestCase.setUpClass() +cls.spark = SparkSession(cls.sc) + +@classmethod +def tearDownClass(cls): +ReusedPySparkTestCase.tearDownClass() +cls.spark.stop() + +def assertFramesEqual(self, expected, result): +msg = ("DataFrames are not equal: " + + ("\n\nExpected:\n%s\n%s" % (expected, expected.dtypes)) + + ("\n\nResult:\n%s\n%s" % (result, result.dtypes))) +self.assertTrue(expected.equals(result), msg=msg) + +@property +def data(self): +from pyspark.sql.functions import pandas_udf, array, explode, col, lit +return self.spark.range(10).toDF('id') \ +.withColumn("vs", array([lit(i) for i in range(20, 30)])) \ +.withColumn("v", explode(col('vs'))).drop('vs') + +def test_simple(self): +from pyspark.sql.functions import pandas_udf +df = self.data + +def foo(df): +ret = df +ret = ret.assign(v1=df.v * df.id * 1.0) +ret = ret.assign(v2=df.v + df.id) +return ret + +foo_udf = pandas_udf( +foo, +StructType( +[StructField('id', LongType()), + StructField('v', IntegerType()), + StructField('v1', DoubleType()), + StructField('v2', LongType())])) + +result = df.groupby('id').apply(foo_udf).sort('id').toPandas() +expected = df.toPandas().groupby('id').apply(foo).reset_index(drop=True) +self.assertFramesEqual(expected, result) + +def test_decorator(self): +from pyspark.sql.functions import pandas_udf +df = self.data + +@pandas_udf(StructType( +[StructField('id', LongType()), + StructField('v', IntegerType()), + StructField('v1', DoubleType()), + StructField('v2', LongType())])) +def foo(df): +ret = df +ret = ret.assign(v1=df.v * df.id * 1.0) +ret = ret.assign(v2=df.v + df.id) +return ret + +result = df.groupby('id').apply(foo).sort('id').toPandas() +expected = df.toPandas().groupby('id').apply(foo.func).reset_index(drop=True) +self.assertFramesEqual(expected, result) + +def test_coerce(self): +from pyspark.sql.functions import pandas_udf +df = self.data + +def foo(df): +ret = df +ret = ret.assign(v=df.v + 1) +return ret + +@pandas_udf(StructType([StructField('id', LongType()), StructField('v', DoubleType())])) +def foo(df): --- End diff -- Fixed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...
Github user icexelloss commented on a diff in the pull request: https://github.com/apache/spark/pull/18732#discussion_r142571056 --- Diff: python/pyspark/sql/tests.py --- @@ -3376,6 +3377,133 @@ def test_vectorized_udf_empty_partition(self): res = df.select(f(col('id'))) self.assertEquals(df.collect(), res.collect()) +def test_vectorized_udf_varargs(self): +from pyspark.sql.functions import pandas_udf, col +df = self.spark.createDataFrame(self.sc.parallelize([Row(id=1)], 2)) +f = pandas_udf(lambda *v: v[0], LongType()) +res = df.select(f(col('id'))) +self.assertEquals(df.collect(), res.collect()) + + +@unittest.skipIf(not _have_pandas or not _have_arrow, "Pandas or Arrow not installed") +class GroupbyApplyTests(ReusedPySparkTestCase): +@classmethod +def setUpClass(cls): +ReusedPySparkTestCase.setUpClass() +cls.spark = SparkSession(cls.sc) + +@classmethod +def tearDownClass(cls): +ReusedPySparkTestCase.tearDownClass() +cls.spark.stop() + +def assertFramesEqual(self, expected, result): +msg = ("DataFrames are not equal: " + + ("\n\nExpected:\n%s\n%s" % (expected, expected.dtypes)) + + ("\n\nResult:\n%s\n%s" % (result, result.dtypes))) +self.assertTrue(expected.equals(result), msg=msg) + +@property +def data(self): +from pyspark.sql.functions import pandas_udf, array, explode, col, lit +return self.spark.range(10).toDF('id') \ +.withColumn("vs", array([lit(i) for i in range(20, 30)])) \ +.withColumn("v", explode(col('vs'))).drop('vs') + +def test_simple(self): +from pyspark.sql.functions import pandas_udf +df = self.data + +def foo(df): +ret = df +ret = ret.assign(v1=df.v * df.id * 1.0) +ret = ret.assign(v2=df.v + df.id) +return ret --- End diff -- Fixed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...
Github user icexelloss commented on a diff in the pull request: https://github.com/apache/spark/pull/18732#discussion_r142571653 --- Diff: python/pyspark/worker.py --- @@ -74,17 +75,37 @@ def wrap_udf(f, return_type): def wrap_pandas_udf(f, return_type): -arrow_return_type = toArrowType(return_type) - -def verify_result_length(*a): -result = f(*a) -if not hasattr(result, "__len__"): -raise TypeError("Return type of pandas_udf should be a Pandas.Series") -if len(result) != len(a[0]): -raise RuntimeError("Result vector from pandas_udf was not the required length: " - "expected %d, got %d" % (len(a[0]), len(result))) -return result -return lambda *a: (verify_result_length(*a), arrow_return_type) +if isinstance(return_type, StructType): +import pyarrow as pa --- End diff -- Removed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...
Github user icexelloss commented on a diff in the pull request: https://github.com/apache/spark/pull/18732#discussion_r142571660 --- Diff: python/pyspark/worker.py --- @@ -74,17 +75,37 @@ def wrap_udf(f, return_type): def wrap_pandas_udf(f, return_type): -arrow_return_type = toArrowType(return_type) - -def verify_result_length(*a): -result = f(*a) -if not hasattr(result, "__len__"): -raise TypeError("Return type of pandas_udf should be a Pandas.Series") -if len(result) != len(a[0]): -raise RuntimeError("Result vector from pandas_udf was not the required length: " - "expected %d, got %d" % (len(a[0]), len(result))) -return result -return lambda *a: (verify_result_length(*a), arrow_return_type) +if isinstance(return_type, StructType): +import pyarrow as pa + +arrow_return_types = list(to_arrow_type(field.dataType) for field in return_type) --- End diff -- Fixed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...
Github user icexelloss commented on a diff in the pull request: https://github.com/apache/spark/pull/18732#discussion_r142572356 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/RelationalGroupedDataset.scala --- @@ -47,7 +47,7 @@ import org.apache.spark.sql.types.StructType */ @InterfaceStability.Stable class RelationalGroupedDataset protected[sql]( -df: DataFrame, +val df: DataFrame, --- End diff -- Reverted. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...
Github user icexelloss commented on a diff in the pull request: https://github.com/apache/spark/pull/18732#discussion_r142572643 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowEvalPythonExec.scala --- @@ -44,14 +63,22 @@ case class ArrowEvalPythonExec(udfs: Seq[PythonUDF], output: Seq[Attribute], chi val schemaOut = StructType.fromAttributes(output.drop(child.output.length).zipWithIndex .map { case (attr, i) => attr.withName(s"_$i") }) +val batchSize = conf.arrowMaxRecordsPerBatch + +val batchIter = if (batchSize > 0) { + new BatchIterator(iter, batchSize) +} else { + Iterator(iter) +} --- End diff -- I changed to one line. I think more concise. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...
Github user icexelloss commented on a diff in the pull request: https://github.com/apache/spark/pull/18732#discussion_r143507748 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/object.scala --- @@ -519,3 +519,18 @@ case class CoGroup( outputObjAttr: Attribute, left: LogicalPlan, right: LogicalPlan) extends BinaryNode with ObjectProducer + +case class FlatMapGroupsInPandas( --- End diff -- @HyukjinKwon Thanks for catching this. I added docs for `FlatMapGroupsInPandas` (function) `FlatMapGroupsInPandas` (logical node) and `FlatMapGroupsInPandasExec` and cross referenced them. @rxin I created file `pythonLogicalOperators` under `org.apache.spark.sql.catalyst.plans.logical` and move `FlatMapGroupsInPandas` under that file. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...
Github user icexelloss commented on a diff in the pull request: https://github.com/apache/spark/pull/18732#discussion_r143506845 --- Diff: python/pyspark/sql/group.py --- @@ -192,7 +193,69 @@ def pivot(self, pivot_col, values=None): jgd = self._jgd.pivot(pivot_col) else: jgd = self._jgd.pivot(pivot_col, values) -return GroupedData(jgd, self.sql_ctx) +return GroupedData(jgd, self._df) + +@since(2.3) +def apply(self, udf): +""" +Maps each group of the current :class:`DataFrame` using a pandas udf and returns the result +as a :class:`DataFrame`. + +The user-defined function should take a `pandas.DataFrame` and return another +`pandas.DataFrame`. For each group, all columns are passed together as a `pandas.DataFrame` +to the user-function and the returned `pandas.DataFrame` are combined as a +:class:`DataFrame`. The returned `pandas.DataFrame` can be arbitrary length and its schema +must match the returnType of the pandas udf. + +:param udf: A wrapped udf function returned by :meth:`pyspark.sql.functions.pandas_udf` + +>>> from pyspark.sql.functions import pandas_udf +>>> df = spark.createDataFrame( +... [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)], +... ("id", "v")) +>>> @pandas_udf(returnType=df.schema) +... def normalize(pdf): +... v = pdf.v +... return pdf.assign(v=(v - v.mean()) / v.std()) +>>> df.groupby('id').apply(normalize).show() # doctest: +SKIP ++---+---+ +| id| v| ++---+---+ +| 1|-0.7071067811865475| +| 1| 0.7071067811865475| +| 2|-0.8320502943378437| +| 2|-0.2773500981126146| +| 2| 1.1094003924504583| ++---+---+ + +.. seealso:: :meth:`pyspark.sql.functions.pandas_udf` + +""" +from pyspark.sql.functions import pandas_udf + +# Columns are special because hasattr always return True +if isinstance(udf, Column) or not hasattr(udf, 'func') or not udf.vectorized: +raise ValueError("The argument to apply must be a pandas_udf") +if not isinstance(udf.returnType, StructType): +raise ValueError("The returnType of the pandas_udf must be a StructType") + +df = self._df +func = udf.func +returnType = udf.returnType --- End diff -- I actually like it because I think it's more readable this way. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...
Github user icexelloss commented on a diff in the pull request: https://github.com/apache/spark/pull/18732#discussion_r143740636 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/object.scala --- @@ -519,3 +519,4 @@ case class CoGroup( outputObjAttr: Attribute, left: LogicalPlan, right: LogicalPlan) extends BinaryNode with ObjectProducer + --- End diff -- Reverted --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...
Github user icexelloss commented on a diff in the pull request: https://github.com/apache/spark/pull/18732#discussion_r143741944 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowEvalPythonExec.scala --- @@ -44,14 +73,18 @@ case class ArrowEvalPythonExec(udfs: Seq[PythonUDF], output: Seq[Attribute], chi val schemaOut = StructType.fromAttributes(output.drop(child.output.length).zipWithIndex .map { case (attr, i) => attr.withName(s"_$i") }) +val batchSize = conf.arrowMaxRecordsPerBatch +// DO NOT use iter.grouped(). See BatchIterator. +val batchIter = if (batchSize > 0) new BatchIterator(iter, batchSize) else Iterator(iter) + val columnarBatchIter = new ArrowPythonRunner( -funcs, conf.arrowMaxRecordsPerBatch, bufferSize, reuseWorker, +funcs, bufferSize, reuseWorker, PythonEvalType.SQL_PANDAS_UDF, argOffsets, schema) - .compute(iter, context.partitionId(), context) + .compute(batchIter, context.partitionId(), context) new Iterator[InternalRow] { - var currentIter = if (columnarBatchIter.hasNext) { + private var currentIter = if (columnarBatchIter.hasNext) { --- End diff -- I think so. The variable is reassigned for each columnar batch --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...
Github user icexelloss commented on a diff in the pull request: https://github.com/apache/spark/pull/18732#discussion_r143740157 --- Diff: python/pyspark/sql/tests.py --- @@ -3376,6 +3376,151 @@ def test_vectorized_udf_empty_partition(self): res = df.select(f(col('id'))) self.assertEquals(df.collect(), res.collect()) +def test_vectorized_udf_varargs(self): +from pyspark.sql.functions import pandas_udf, col +df = self.spark.createDataFrame(self.sc.parallelize([Row(id=1)], 2)) +f = pandas_udf(lambda *v: v[0], LongType()) +res = df.select(f(col('id'))) +self.assertEquals(df.collect(), res.collect()) + + +@unittest.skipIf(not _have_pandas or not _have_arrow, "Pandas or Arrow not installed") +class GroupbyApplyTests(ReusedPySparkTestCase): +@classmethod +def setUpClass(cls): +ReusedPySparkTestCase.setUpClass() +cls.spark = SparkSession(cls.sc) + +@classmethod +def tearDownClass(cls): +ReusedPySparkTestCase.tearDownClass() +cls.spark.stop() + +def assertFramesEqual(self, expected, result): +msg = ("DataFrames are not equal: " + + ("\n\nExpected:\n%s\n%s" % (expected, expected.dtypes)) + + ("\n\nResult:\n%s\n%s" % (result, result.dtypes))) +self.assertTrue(expected.equals(result), msg=msg) + +@property +def data(self): +from pyspark.sql.functions import array, explode, col, lit +return self.spark.range(10).toDF('id') \ +.withColumn("vs", array([lit(i) for i in range(20, 30)])) \ +.withColumn("v", explode(col('vs'))).drop('vs') + +def test_simple(self): +from pyspark.sql.functions import pandas_udf +df = self.data + +foo_udf = pandas_udf( +lambda df: df.assign(v1=df.v * df.id * 1.0, v2=df.v + df.id), +StructType( +[StructField('id', LongType()), + StructField('v', IntegerType()), + StructField('v1', DoubleType()), + StructField('v2', LongType())])) + +result = df.groupby('id').apply(foo_udf).sort('id').toPandas() +expected = df.toPandas().groupby('id').apply(foo_udf.func).reset_index(drop=True) +self.assertFramesEqual(expected, result) + +def test_decorator(self): +from pyspark.sql.functions import pandas_udf +df = self.data + +@pandas_udf(StructType( +[StructField('id', LongType()), + StructField('v', IntegerType()), + StructField('v1', DoubleType()), + StructField('v2', LongType())])) +def foo(df): +return df.assign(v1=df.v * df.id * 1.0, v2=df.v + df.id) + +result = df.groupby('id').apply(foo).sort('id').toPandas() +expected = df.toPandas().groupby('id').apply(foo.func).reset_index(drop=True) +self.assertFramesEqual(expected, result) + +def test_coerce(self): +from pyspark.sql.functions import pandas_udf +df = self.data + +foo = pandas_udf( +lambda df: df, +StructType([StructField('id', LongType()), StructField('v', DoubleType())])) + +result = df.groupby('id').apply(foo).sort('id').toPandas() +expected = df.toPandas().groupby('id').apply(foo.func).reset_index(drop=True) +expected = expected.assign(v=expected.v.astype('float64')) +self.assertFramesEqual(expected, result) + +def test_complex_groupby(self): +from pyspark.sql.functions import pandas_udf, col +df = self.data + +@pandas_udf(StructType( +[StructField('id', LongType()), + StructField('v', IntegerType()), + StructField('norm', DoubleType())])) +def normalize(pdf): +v = pdf.v +return pdf.assign(norm=(v - v.mean()) / v.std()) + +result = df.groupby(col('id') % 2 == 0).apply(normalize).sort('id', 'v').toPandas() +pdf = df.toPandas() +expected = pdf.groupby(pdf['id'] % 2 == 0).apply(normalize.func) +expected = expected.sort_values(['id', 'v']).reset_index(drop=True) +expected = expected.assign(norm=expected.norm.astype('float64')) +self.assertFramesEqual(expected, result) + +def test_empty_groupby(self): +from pyspark.sql.functions import pandas_udf, col +df = self.data + +@pandas_udf(StructType( +[StructField('id',
[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...
Github user icexelloss commented on a diff in the pull request: https://github.com/apache/spark/pull/18732#discussion_r143740078 --- Diff: python/pyspark/sql/tests.py --- @@ -3376,6 +3376,151 @@ def test_vectorized_udf_empty_partition(self): res = df.select(f(col('id'))) self.assertEquals(df.collect(), res.collect()) +def test_vectorized_udf_varargs(self): +from pyspark.sql.functions import pandas_udf, col +df = self.spark.createDataFrame(self.sc.parallelize([Row(id=1)], 2)) +f = pandas_udf(lambda *v: v[0], LongType()) +res = df.select(f(col('id'))) +self.assertEquals(df.collect(), res.collect()) + + +@unittest.skipIf(not _have_pandas or not _have_arrow, "Pandas or Arrow not installed") +class GroupbyApplyTests(ReusedPySparkTestCase): +@classmethod +def setUpClass(cls): +ReusedPySparkTestCase.setUpClass() +cls.spark = SparkSession(cls.sc) + +@classmethod +def tearDownClass(cls): +ReusedPySparkTestCase.tearDownClass() +cls.spark.stop() + +def assertFramesEqual(self, expected, result): +msg = ("DataFrames are not equal: " + + ("\n\nExpected:\n%s\n%s" % (expected, expected.dtypes)) + + ("\n\nResult:\n%s\n%s" % (result, result.dtypes))) +self.assertTrue(expected.equals(result), msg=msg) + +@property +def data(self): +from pyspark.sql.functions import array, explode, col, lit +return self.spark.range(10).toDF('id') \ +.withColumn("vs", array([lit(i) for i in range(20, 30)])) \ +.withColumn("v", explode(col('vs'))).drop('vs') + +def test_simple(self): +from pyspark.sql.functions import pandas_udf +df = self.data + +foo_udf = pandas_udf( +lambda df: df.assign(v1=df.v * df.id * 1.0, v2=df.v + df.id), +StructType( +[StructField('id', LongType()), + StructField('v', IntegerType()), + StructField('v1', DoubleType()), + StructField('v2', LongType())])) + +result = df.groupby('id').apply(foo_udf).sort('id').toPandas() +expected = df.toPandas().groupby('id').apply(foo_udf.func).reset_index(drop=True) +self.assertFramesEqual(expected, result) + +def test_decorator(self): +from pyspark.sql.functions import pandas_udf +df = self.data + +@pandas_udf(StructType( +[StructField('id', LongType()), + StructField('v', IntegerType()), + StructField('v1', DoubleType()), + StructField('v2', LongType())])) +def foo(df): +return df.assign(v1=df.v * df.id * 1.0, v2=df.v + df.id) --- End diff -- Fixed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...
Github user icexelloss commented on a diff in the pull request: https://github.com/apache/spark/pull/18732#discussion_r143740129 --- Diff: python/pyspark/sql/tests.py --- @@ -3376,6 +3376,151 @@ def test_vectorized_udf_empty_partition(self): res = df.select(f(col('id'))) self.assertEquals(df.collect(), res.collect()) +def test_vectorized_udf_varargs(self): +from pyspark.sql.functions import pandas_udf, col +df = self.spark.createDataFrame(self.sc.parallelize([Row(id=1)], 2)) +f = pandas_udf(lambda *v: v[0], LongType()) +res = df.select(f(col('id'))) +self.assertEquals(df.collect(), res.collect()) + + +@unittest.skipIf(not _have_pandas or not _have_arrow, "Pandas or Arrow not installed") +class GroupbyApplyTests(ReusedPySparkTestCase): +@classmethod +def setUpClass(cls): +ReusedPySparkTestCase.setUpClass() +cls.spark = SparkSession(cls.sc) + +@classmethod +def tearDownClass(cls): +ReusedPySparkTestCase.tearDownClass() +cls.spark.stop() + +def assertFramesEqual(self, expected, result): +msg = ("DataFrames are not equal: " + + ("\n\nExpected:\n%s\n%s" % (expected, expected.dtypes)) + + ("\n\nResult:\n%s\n%s" % (result, result.dtypes))) +self.assertTrue(expected.equals(result), msg=msg) + +@property +def data(self): +from pyspark.sql.functions import array, explode, col, lit +return self.spark.range(10).toDF('id') \ +.withColumn("vs", array([lit(i) for i in range(20, 30)])) \ +.withColumn("v", explode(col('vs'))).drop('vs') + +def test_simple(self): +from pyspark.sql.functions import pandas_udf +df = self.data + +foo_udf = pandas_udf( +lambda df: df.assign(v1=df.v * df.id * 1.0, v2=df.v + df.id), +StructType( +[StructField('id', LongType()), + StructField('v', IntegerType()), + StructField('v1', DoubleType()), + StructField('v2', LongType())])) + +result = df.groupby('id').apply(foo_udf).sort('id').toPandas() +expected = df.toPandas().groupby('id').apply(foo_udf.func).reset_index(drop=True) +self.assertFramesEqual(expected, result) + +def test_decorator(self): +from pyspark.sql.functions import pandas_udf +df = self.data + +@pandas_udf(StructType( +[StructField('id', LongType()), + StructField('v', IntegerType()), + StructField('v1', DoubleType()), + StructField('v2', LongType())])) +def foo(df): +return df.assign(v1=df.v * df.id * 1.0, v2=df.v + df.id) + +result = df.groupby('id').apply(foo).sort('id').toPandas() +expected = df.toPandas().groupby('id').apply(foo.func).reset_index(drop=True) +self.assertFramesEqual(expected, result) + +def test_coerce(self): +from pyspark.sql.functions import pandas_udf +df = self.data + +foo = pandas_udf( +lambda df: df, --- End diff -- Fixed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...
Github user icexelloss commented on a diff in the pull request: https://github.com/apache/spark/pull/18732#discussion_r143744197 --- Diff: python/pyspark/sql/functions.py --- @@ -2181,30 +2187,66 @@ def udf(f=None, returnType=StringType()): @since(2.3) def pandas_udf(f=None, returnType=StringType()): """ -Creates a :class:`Column` expression representing a user defined function (UDF) that accepts -`Pandas.Series` as input arguments and outputs a `Pandas.Series` of the same length. +Creates a vectorized user defined function (UDF). -:param f: python function if used as a standalone function +:param f: user-defined function. A python function if used as a standalone function :param returnType: a :class:`pyspark.sql.types.DataType` object ->>> from pyspark.sql.types import IntegerType, StringType ->>> slen = pandas_udf(lambda s: s.str.len(), IntegerType()) ->>> @pandas_udf(returnType=StringType()) -... def to_upper(s): -... return s.str.upper() -... ->>> @pandas_udf(returnType="integer") -... def add_one(x): -... return x + 1 -... ->>> df = spark.createDataFrame([(1, "John Doe", 21)], ("id", "name", "age")) ->>> df.select(slen("name").alias("slen(name)"), to_upper("name"), add_one("age")) \\ -... .show() # doctest: +SKIP -+--+--++ -|slen(name)|to_upper(name)|add_one(age)| -+--+--++ -| 8| JOHN DOE| 22| -+--+--++ +The user-defined function can define one of the following transformations: + +1. One or more `pandas.Series` -> A `pandas.Series` + + This udf is used with :meth:`pyspark.sql.DataFrame.withColumn` and + :meth:`pyspark.sql.DataFrame.select`. + The returnType should be a primitive data type, e.g., `DoubleType()`. + The length of the returned `pandas.Series` must be of the same as the input `pandas.Series`. + + >>> from pyspark.sql.types import IntegerType, StringType + >>> slen = pandas_udf(lambda s: s.str.len(), IntegerType()) + >>> @pandas_udf(returnType=StringType()) + ... def to_upper(s): + ... return s.str.upper() + ... + >>> @pandas_udf(returnType="integer") + ... def add_one(x): + ... return x + 1 + ... + >>> df = spark.createDataFrame([(1, "John Doe", 21)], ("id", "name", "age")) + >>> df.select(slen("name").alias("slen(name)"), to_upper("name"), add_one("age")) \\ + ... .show() # doctest: +SKIP + +--+--++ + |slen(name)|to_upper(name)|add_one(age)| + +--+--++ + | 8| JOHN DOE| 22| + +--+--++ + +2. A `pandas.DataFrame` -> A `pandas.DataFrame` + + This udf is used with :meth:`pyspark.sql.GroupedData.apply`. --- End diff -- Change to `This udf is only used with` and added `note`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...
Github user icexelloss commented on a diff in the pull request: https://github.com/apache/spark/pull/18732#discussion_r143740882 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/pythonLogicalOperators.scala --- @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.plans.logical + +import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeSet, Expression} + +/** + * Logical nodes specific to PySpark. + */ + +/** + * FlatMap groups using a udf: pandas.Dataframe -> pandas.DataFrame. --- End diff -- Fixed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...
Github user icexelloss commented on a diff in the pull request: https://github.com/apache/spark/pull/18732#discussion_r143740773 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/pythonLogicalOperators.scala --- @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.plans.logical + +import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeSet, Expression} + +/** + * Logical nodes specific to PySpark. + */ --- End diff -- Removed --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...
Github user icexelloss commented on a diff in the pull request: https://github.com/apache/spark/pull/18732#discussion_r143810355 --- Diff: python/pyspark/sql/group.py --- @@ -192,7 +193,84 @@ def pivot(self, pivot_col, values=None): jgd = self._jgd.pivot(pivot_col) else: jgd = self._jgd.pivot(pivot_col, values) -return GroupedData(jgd, self.sql_ctx) +return GroupedData(jgd, self._df) + +@since(2.3) +def apply(self, udf): +""" +Maps each group of the current :class:`DataFrame` using a pandas udf and returns the result +as a `DataFrame`. --- End diff -- I think "pandas udf" as a word is fine. `pandas_udf` is the function name. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...
Github user icexelloss commented on a diff in the pull request: https://github.com/apache/spark/pull/18732#discussion_r143810539 --- Diff: python/pyspark/sql/group.py --- @@ -192,7 +193,84 @@ def pivot(self, pivot_col, values=None): jgd = self._jgd.pivot(pivot_col) else: jgd = self._jgd.pivot(pivot_col, values) -return GroupedData(jgd, self.sql_ctx) +return GroupedData(jgd, self._df) + +@since(2.3) +def apply(self, udf): +""" +Maps each group of the current :class:`DataFrame` using a pandas udf and returns the result +as a `DataFrame`. + +The user-defined function should take a `pandas.DataFrame` and return another +`pandas.DataFrame`. For each group, all columns are passed together as a `pandas.DataFrame` +to the user-function and the returned `pandas.DataFrame` are combined as a `DataFrame`. --- End diff -- Fixed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...
Github user icexelloss commented on a diff in the pull request: https://github.com/apache/spark/pull/18732#discussion_r143810736 --- Diff: python/pyspark/sql/group.py --- @@ -192,7 +193,84 @@ def pivot(self, pivot_col, values=None): jgd = self._jgd.pivot(pivot_col) else: jgd = self._jgd.pivot(pivot_col, values) -return GroupedData(jgd, self.sql_ctx) +return GroupedData(jgd, self._df) + +@since(2.3) +def apply(self, udf): +""" +Maps each group of the current :class:`DataFrame` using a pandas udf and returns the result +as a `DataFrame`. + +The user-defined function should take a `pandas.DataFrame` and return another +`pandas.DataFrame`. For each group, all columns are passed together as a `pandas.DataFrame` +to the user-function and the returned `pandas.DataFrame` are combined as a `DataFrame`. +The returned `pandas.DataFrame` can be arbitrary length and its schema must match the --- End diff -- Fixed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...
Github user icexelloss commented on a diff in the pull request: https://github.com/apache/spark/pull/18732#discussion_r143812619 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowEvalPythonExec.scala --- @@ -44,14 +73,18 @@ case class ArrowEvalPythonExec(udfs: Seq[PythonUDF], output: Seq[Attribute], chi val schemaOut = StructType.fromAttributes(output.drop(child.output.length).zipWithIndex .map { case (attr, i) => attr.withName(s"_$i") }) +val batchSize = conf.arrowMaxRecordsPerBatch +// DO NOT use iter.grouped(). See BatchIterator. --- End diff -- Yes. The reason is explained in the docstring of BatchIterator. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...
Github user icexelloss commented on a diff in the pull request: https://github.com/apache/spark/pull/18732#discussion_r143812311 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/RelationalGroupedDataset.scala --- @@ -435,6 +435,35 @@ class RelationalGroupedDataset protected[sql]( df.logicalPlan.output, df.logicalPlan)) } + + /** + * Applies a vectorized python user-defined function to each group of data. + * The user-defined function defines a transformation: `Pandas.DataFrame` -> `Pandas.DataFrame`. + * For each group, all elements in the group are passed as a `Pandas.DataFrame` and the results + * for all groups are combined into a new `DataFrame`. + * + * This function does not support partial aggregation, and requires shuffling all the data in + * the `DataFrame`. --- End diff -- Fixed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...
Github user icexelloss commented on a diff in the pull request: https://github.com/apache/spark/pull/18732#discussion_r143813642 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/python/FlatMapGroupsInPandasExec.scala --- @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.python + +import scala.collection.JavaConverters._ + +import org.apache.spark.TaskContext +import org.apache.spark.api.python.{ChainedPythonFunctions, PythonEvalType} +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.plans.physical.{AllTuples, ClusteredDistribution, Distribution, Partitioning} +import org.apache.spark.sql.execution.{GroupedIterator, SparkPlan, UnaryExecNode} +import org.apache.spark.sql.types.StructType + +/** + * Physical node for [[org.apache.spark.sql.catalyst.plans.logical.FlatMapGroupsInPandas]] + * + * Rows in each group are passed to the python worker as a Arrow record batch. --- End diff -- Fixed "a Arrow -> an Arrow" Fixed "Python and Java capitalization" I am actually leaning toward keeping `pandas.DataFrame` . The preference to `pandas` is usually lower case: https://pandas.pydata.org/pandas-docs/stable/ --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...
Github user icexelloss commented on a diff in the pull request: https://github.com/apache/spark/pull/18732#discussion_r143809711 --- Diff: python/pyspark/sql/group.py --- @@ -192,7 +193,84 @@ def pivot(self, pivot_col, values=None): jgd = self._jgd.pivot(pivot_col) else: jgd = self._jgd.pivot(pivot_col, values) -return GroupedData(jgd, self.sql_ctx) +return GroupedData(jgd, self._df) + +@since(2.3) +def apply(self, udf): +""" +Maps each group of the current :class:`DataFrame` using a pandas udf and returns the result +as a `DataFrame`. --- End diff -- Fixed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...
Github user icexelloss commented on a diff in the pull request: https://github.com/apache/spark/pull/18732#discussion_r143810948 --- Diff: python/pyspark/sql/group.py --- @@ -192,7 +193,84 @@ def pivot(self, pivot_col, values=None): jgd = self._jgd.pivot(pivot_col) else: jgd = self._jgd.pivot(pivot_col, values) -return GroupedData(jgd, self.sql_ctx) +return GroupedData(jgd, self._df) + +@since(2.3) +def apply(self, udf): +""" +Maps each group of the current :class:`DataFrame` using a pandas udf and returns the result +as a `DataFrame`. + +The user-defined function should take a `pandas.DataFrame` and return another +`pandas.DataFrame`. For each group, all columns are passed together as a `pandas.DataFrame` +to the user-function and the returned `pandas.DataFrame` are combined as a `DataFrame`. +The returned `pandas.DataFrame` can be arbitrary length and its schema must match the +returnType of the pandas udf. + +This function does not support partial aggregation, and requires shuffling all the data in +the `DataFrame`. + +:param udf: A wrapped udf function returned by :meth:`pyspark.sql.functions.pandas_udf` --- End diff -- I think ``` A pandas_udf returned by :meth:`pyspark.sql.functions.pandas_udf` ``` is redundant, how about ``` A function object returned by :meth:`pyspark.sql.functions.pandas_udf` ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18732: [SPARK-20396][SQL][PySpark] groupby().apply() with panda...
Github user icexelloss commented on the issue: https://github.com/apache/spark/pull/18732 @HyukjinKwon Thanks! Thanks for everyone for reviewing this tirelessly. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org