[GitHub] spark pull request #15089: [SPARK-15621] [SQL] Support spilling for Python U...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/15089 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15089: [SPARK-15621] [SQL] Support spilling for Python U...
Github user davies commented on a diff in the pull request: https://github.com/apache/spark/pull/15089#discussion_r81419085 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/python/BatchEvalPythonExec.scala --- @@ -98,7 +353,7 @@ case class BatchEvalPythonExec(udfs: Seq[PythonUDF], output: Seq[Attribute], chi // For each row, add it to the queue. val inputIterator = iter.grouped(100).map { inputRows => val toBePickled = inputRows.map { inputRow => - queue.add(inputRow) + queue.add(inputRow.asInstanceOf[UnsafeRow]) --- End diff -- Comparing to assert, the error message actually looks similar: the row should be UnsafeRow but actually it's not. I think this is an fundamental assumption, there are many places (20 for now) that has this. ``` davies@localhost:~/work/spark$ git grep '.asInstanceOf\[UnsafeRow\]' | wc -l 20 ``` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15089: [SPARK-15621] [SQL] Support spilling for Python U...
Github user rxin commented on a diff in the pull request: https://github.com/apache/spark/pull/15089#discussion_r81237967 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/python/RowQueue.scala --- @@ -0,0 +1,277 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one or more +* contributor license agreements. See the NOTICE file distributed with +* this work for additional information regarding copyright ownership. +* The ASF licenses this file to You under the Apache License, Version 2.0 +* (the "License"); you may not use this file except in compliance with +* the License. You may obtain a copy of the License at +* +*http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.spark.sql.execution.python + +import java.io._ + +import com.google.common.io.Closeables + +import org.apache.spark.SparkException +import org.apache.spark.memory.{MemoryConsumer, TaskMemoryManager} +import org.apache.spark.sql.catalyst.expressions.UnsafeRow +import org.apache.spark.unsafe.Platform +import org.apache.spark.unsafe.memory.MemoryBlock + +/** + * A RowQueue is an FIFO queue for UnsafeRow. + * + * This RowQueue is ONLY designed and used for Python UDF, which has only one writer and only one + * reader, the reader ALWAYS ran behind the writer. See the doc of class [[BatchEvalPythonExec]] + * on how it works. + */ +private[python] trait RowQueue { + + /** + * Add a row to the end of it, returns true iff the row has added into it. + */ + def add(row: UnsafeRow): Boolean + + /** + * Retrieve and remove the first row, returns null if it's empty. + * + * It can only be called after add is called. + */ + def remove(): UnsafeRow + + /** + * Cleanup all the resources. + */ + def close(): Unit +} + +/** + * A RowQueue that is based on in-memory page. UnsafeRows are appended into it until it's full. + * Another thread could read from it at the same time (behind the writer). + * + * The format of UnsafeRow in page: + * [4 bytes to hold length of record (N)] [N bytes to hold record] [...] + */ +private[python] abstract class InMemoryRowQueue(val page: MemoryBlock, numFields: Int) + extends RowQueue { + private val base: AnyRef = page.getBaseObject + private val endOfPage: Long = page.getBaseOffset + page.size --- End diff -- My biggest question here is how the java memory model actually works with respect to two threads reading/writing using unsafe without any synchronization primitives. I wouldn't be surprised if this could lead to stale reads. We should verify this wouldn't be an issue; otherwise we should add explicit synchronization. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15089: [SPARK-15621] [SQL] Support spilling for Python U...
Github user rxin commented on a diff in the pull request: https://github.com/apache/spark/pull/15089#discussion_r81232667 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/python/BatchEvalPythonExec.scala --- @@ -98,7 +353,7 @@ case class BatchEvalPythonExec(udfs: Seq[PythonUDF], output: Seq[Attribute], chi // For each row, add it to the queue. val inputIterator = iter.grouped(100).map { inputRows => val toBePickled = inputRows.map { inputRow => - queue.add(inputRow) + queue.add(inputRow.asInstanceOf[UnsafeRow]) --- End diff -- Can we add an assert with a msg? I worry if we ever change that decision it's difficult to figure out what's going on. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15089: [SPARK-15621] [SQL] Support spilling for Python U...
Github user rxin commented on a diff in the pull request: https://github.com/apache/spark/pull/15089#discussion_r81234464 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/python/RowQueue.scala --- @@ -0,0 +1,277 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one or more +* contributor license agreements. See the NOTICE file distributed with +* this work for additional information regarding copyright ownership. +* The ASF licenses this file to You under the Apache License, Version 2.0 +* (the "License"); you may not use this file except in compliance with +* the License. You may obtain a copy of the License at +* +*http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.spark.sql.execution.python + +import java.io._ + +import com.google.common.io.Closeables + +import org.apache.spark.SparkException +import org.apache.spark.memory.{MemoryConsumer, TaskMemoryManager} +import org.apache.spark.sql.catalyst.expressions.UnsafeRow +import org.apache.spark.unsafe.Platform +import org.apache.spark.unsafe.memory.MemoryBlock + +/** + * A RowQueue is an FIFO queue for UnsafeRow. + * + * This RowQueue is ONLY designed and used for Python UDF, which has only one writer and only one + * reader, the reader ALWAYS ran behind the writer. See the doc of class [[BatchEvalPythonExec]] + * on how it works. + */ +private[python] trait RowQueue { + + /** + * Add a row to the end of it, returns true iff the row has added into it. + */ + def add(row: UnsafeRow): Boolean + + /** + * Retrieve and remove the first row, returns null if it's empty. + * + * It can only be called after add is called. + */ + def remove(): UnsafeRow + + /** + * Cleanup all the resources. + */ + def close(): Unit +} + +/** + * A RowQueue that is based on in-memory page. UnsafeRows are appended into it until it's full. + * Another thread could read from it at the same time (behind the writer). + * + * The format of UnsafeRow in page: + * [4 bytes to hold length of record (N)] [N bytes to hold record] [...] --- End diff -- I would document -1 length means end of a page here too. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15089: [SPARK-15621] [SQL] Support spilling for Python U...
Github user rxin commented on a diff in the pull request: https://github.com/apache/spark/pull/15089#discussion_r81235033 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/python/RowQueue.scala --- @@ -0,0 +1,277 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one or more +* contributor license agreements. See the NOTICE file distributed with +* this work for additional information regarding copyright ownership. +* The ASF licenses this file to You under the Apache License, Version 2.0 +* (the "License"); you may not use this file except in compliance with +* the License. You may obtain a copy of the License at +* +*http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.spark.sql.execution.python + +import java.io._ + +import com.google.common.io.Closeables + +import org.apache.spark.SparkException +import org.apache.spark.memory.{MemoryConsumer, TaskMemoryManager} +import org.apache.spark.sql.catalyst.expressions.UnsafeRow +import org.apache.spark.unsafe.Platform +import org.apache.spark.unsafe.memory.MemoryBlock + +/** + * A RowQueue is an FIFO queue for UnsafeRow. + * + * This RowQueue is ONLY designed and used for Python UDF, which has only one writer and only one + * reader, the reader ALWAYS ran behind the writer. See the doc of class [[BatchEvalPythonExec]] + * on how it works. + */ +private[python] trait RowQueue { + + /** + * Add a row to the end of it, returns true iff the row has added into it. --- End diff -- OK i understand this now after reading more code. Regardless it would be great to explain some of the possible causes when this can return false, e.g. when we run out of buffer space. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15089: [SPARK-15621] [SQL] Support spilling for Python U...
Github user rxin commented on a diff in the pull request: https://github.com/apache/spark/pull/15089#discussion_r81233633 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/python/BatchEvalPythonExec.scala --- @@ -37,9 +40,25 @@ import org.apache.spark.sql.types.{DataType, StructField, StructType} * Python evaluation works by sending the necessary (projected) input data via a socket to an * external Python process, and combine the result from the Python process with the original row. * - * For each row we send to Python, we also put it in a queue. For each output row from Python, + * For each row we send to Python, we also put it in a queue first. For each output row from Python, * we drain the queue to find the original input row. Note that if the Python process is way too - * slow, this could lead to the queue growing unbounded and eventually run out of memory. + * slow, this could lead to the queue growing unbounded and spill into disk when run out of memory. + * + * Here is a diagram to show how this works: + * + *Upstream (from child) --- End diff -- i suggest switching upstream and downstream and and put child operator (upstream) at the bottom, and parent operator (downstream) at the top to be more consistent with databases. basically the "tree" would face upward. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15089: [SPARK-15621] [SQL] Support spilling for Python U...
Github user rxin commented on a diff in the pull request: https://github.com/apache/spark/pull/15089#discussion_r81232436 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/python/BatchEvalPythonExec.scala --- @@ -70,7 +89,13 @@ case class BatchEvalPythonExec(udfs: Seq[PythonUDF], output: Seq[Attribute], chi // The queue used to buffer input rows so we can drain it to // combine input with output from Python. - val queue = new java.util.concurrent.ConcurrentLinkedQueue[InternalRow]() + val queue = HybridRowQueue( +TaskContext.get().taskMemoryManager(), +new File(Utils.getLocalDir(SparkEnv.get.conf)), +child.output.length) --- End diff -- nit: move this to the previous line? the indent is wrong here --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15089: [SPARK-15621] [SQL] Support spilling for Python U...
Github user rxin commented on a diff in the pull request: https://github.com/apache/spark/pull/15089#discussion_r81234071 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/python/RowQueue.scala --- @@ -0,0 +1,277 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one or more +* contributor license agreements. See the NOTICE file distributed with +* this work for additional information regarding copyright ownership. +* The ASF licenses this file to You under the Apache License, Version 2.0 +* (the "License"); you may not use this file except in compliance with +* the License. You may obtain a copy of the License at +* +*http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.spark.sql.execution.python + +import java.io._ + +import com.google.common.io.Closeables + +import org.apache.spark.SparkException +import org.apache.spark.memory.{MemoryConsumer, TaskMemoryManager} +import org.apache.spark.sql.catalyst.expressions.UnsafeRow +import org.apache.spark.unsafe.Platform +import org.apache.spark.unsafe.memory.MemoryBlock + +/** + * A RowQueue is an FIFO queue for UnsafeRow. + * + * This RowQueue is ONLY designed and used for Python UDF, which has only one writer and only one + * reader, the reader ALWAYS ran behind the writer. See the doc of class [[BatchEvalPythonExec]] + * on how it works. + */ +private[python] trait RowQueue { + + /** + * Add a row to the end of it, returns true iff the row has added into it. + */ + def add(row: UnsafeRow): Boolean + + /** + * Retrieve and remove the first row, returns null if it's empty. + * + * It can only be called after add is called. --- End diff -- This sentence is not obvious to me what it means, or why this matters. Can you explain? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15089: [SPARK-15621] [SQL] Support spilling for Python U...
Github user rxin commented on a diff in the pull request: https://github.com/apache/spark/pull/15089#discussion_r81234675 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/python/RowQueue.scala --- @@ -0,0 +1,277 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one or more +* contributor license agreements. See the NOTICE file distributed with +* this work for additional information regarding copyright ownership. +* The ASF licenses this file to You under the Apache License, Version 2.0 +* (the "License"); you may not use this file except in compliance with +* the License. You may obtain a copy of the License at +* +*http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.spark.sql.execution.python + +import java.io._ + +import com.google.common.io.Closeables + +import org.apache.spark.SparkException +import org.apache.spark.memory.{MemoryConsumer, TaskMemoryManager} +import org.apache.spark.sql.catalyst.expressions.UnsafeRow +import org.apache.spark.unsafe.Platform +import org.apache.spark.unsafe.memory.MemoryBlock + +/** + * A RowQueue is an FIFO queue for UnsafeRow. + * + * This RowQueue is ONLY designed and used for Python UDF, which has only one writer and only one + * reader, the reader ALWAYS ran behind the writer. See the doc of class [[BatchEvalPythonExec]] + * on how it works. + */ +private[python] trait RowQueue { + + /** + * Add a row to the end of it, returns true iff the row has added into it. + */ + def add(row: UnsafeRow): Boolean + + /** + * Retrieve and remove the first row, returns null if it's empty. + * + * It can only be called after add is called. + */ + def remove(): UnsafeRow + + /** + * Cleanup all the resources. + */ + def close(): Unit +} + +/** + * A RowQueue that is based on in-memory page. UnsafeRows are appended into it until it's full. + * Another thread could read from it at the same time (behind the writer). + * + * The format of UnsafeRow in page: + * [4 bytes to hold length of record (N)] [N bytes to hold record] [...] + */ +private[python] abstract class InMemoryRowQueue(val page: MemoryBlock, numFields: Int) --- End diff -- can we add more asserts everywhere to guard the invariant readOffset <= writeOffset? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15089: [SPARK-15621] [SQL] Support spilling for Python U...
Github user rxin commented on a diff in the pull request: https://github.com/apache/spark/pull/15089#discussion_r81233940 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/python/RowQueue.scala --- @@ -0,0 +1,277 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one or more +* contributor license agreements. See the NOTICE file distributed with +* this work for additional information regarding copyright ownership. +* The ASF licenses this file to You under the Apache License, Version 2.0 +* (the "License"); you may not use this file except in compliance with +* the License. You may obtain a copy of the License at +* +*http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.spark.sql.execution.python + +import java.io._ + +import com.google.common.io.Closeables + +import org.apache.spark.SparkException +import org.apache.spark.memory.{MemoryConsumer, TaskMemoryManager} +import org.apache.spark.sql.catalyst.expressions.UnsafeRow +import org.apache.spark.unsafe.Platform +import org.apache.spark.unsafe.memory.MemoryBlock + +/** + * A RowQueue is an FIFO queue for UnsafeRow. + * + * This RowQueue is ONLY designed and used for Python UDF, which has only one writer and only one + * reader, the reader ALWAYS ran behind the writer. See the doc of class [[BatchEvalPythonExec]] + * on how it works. + */ +private[python] trait RowQueue { + + /** + * Add a row to the end of it, returns true iff the row has added into it. --- End diff -- "iff the row has been added to the queue." Can you comment on when a row would be failed to add? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15089: [SPARK-15621] [SQL] Support spilling for Python U...
Github user davies commented on a diff in the pull request: https://github.com/apache/spark/pull/15089#discussion_r81228310 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/python/RowQueue.scala --- @@ -0,0 +1,276 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one or more +* contributor license agreements. See the NOTICE file distributed with +* this work for additional information regarding copyright ownership. +* The ASF licenses this file to You under the Apache License, Version 2.0 +* (the "License"); you may not use this file except in compliance with +* the License. You may obtain a copy of the License at +* +*http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.spark.sql.execution.python + +import java.io._ + +import com.google.common.io.Closeables + +import org.apache.spark.SparkException +import org.apache.spark.memory.{MemoryConsumer, TaskMemoryManager} +import org.apache.spark.sql.catalyst.expressions.UnsafeRow +import org.apache.spark.unsafe.Platform +import org.apache.spark.unsafe.memory.MemoryBlock + +/** + * A RowQueue is an FIFO queue for UnsafeRow. + * + * This RowQueue is ONLY designed and used for Python UDF, which has only one writer and only one + * reader, the reader ALWAYS ran behind the writer. + */ +private[python] trait RowQueue { + + /** + * Add a row to the end of it, returns true iff the row has added into it. + */ + def add(row: UnsafeRow): Boolean + + /** + * Retrieve and remove the first row, returns null if it's empty. + * + * It can only be called after add is called. + */ + def remove(): UnsafeRow + + /** + * Cleanup all the resources. + */ + def close(): Unit +} + +/** + * A RowQueue that is based on in-memory page. UnsafeRows are appended into it until it's full. + * Another thread could read from it at the same time (behind the writer). + * + * The format of UnsafeRow in page: + * [4 bytes to hold length of record (N)] [N bytes to hold record] [...] + */ +private[python] abstract class InMemoryRowQueue(val page: MemoryBlock, numFields: Int) + extends RowQueue { + private val base: AnyRef = page.getBaseObject + private val endOfPage: Long = page.getBaseOffset + page.size + // the first location where a new row would be written + private var writeOffset = page.getBaseOffset + // points to the start of the next row to read + private var readOffset = page.getBaseOffset + private val resultRow = new UnsafeRow(numFields) + + def add(row: UnsafeRow): Boolean = { +val size = row.getSizeInBytes +if (writeOffset + 4 + size > endOfPage) { + // if there is not enough space in this page to hold the new record + if (writeOffset + 4 <= endOfPage) { +// if there's extra space at the end of the page, store a special "end-of-page" length (-1) +Platform.putInt(base, writeOffset, -1) + } + false +} else { + Platform.putInt(base, writeOffset, size) + Platform.copyMemory(row.getBaseObject, row.getBaseOffset, base, writeOffset + 4, size) + writeOffset += 4 + size + true +} + } + + def remove(): UnsafeRow = { +if (readOffset + 4 > endOfPage || Platform.getInt(base, readOffset) < 0) { + null +} else { + val size = Platform.getInt(base, readOffset) + resultRow.pointTo(base, readOffset + 4, size) + readOffset += 4 + size + resultRow +} + } +} + +/** + * A RowQueue that is backed by a file on disk. This queue will stop accepting new rows once any + * reader has begun reading from the queue. + */ +private[python] case class DiskRowQueue(file: File, fields: Int) extends RowQueue { + private var out = new DataOutputStream( +new BufferedOutputStream(new FileOutputStream(file.toString))) + private var unreadBytes = 0L + + private var in: DataInputStream = _ + private val resultRow = new UnsafeRow(fields) + + def add(row: UnsafeRow): Boolean = synchronized { +if (out == null) { + // Another thread is reading, stop writing this one + return false +} +out.writeInt(row.getSizeInBytes) +out.write(row.getBytes) +unreadBytes += 4 + row.getSizeInBytes +
[GitHub] spark pull request #15089: [SPARK-15621] [SQL] Support spilling for Python U...
Github user lins05 commented on a diff in the pull request: https://github.com/apache/spark/pull/15089#discussion_r81208953 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/python/BatchEvalPythonExec.scala --- @@ -17,18 +17,21 @@ package org.apache.spark.sql.execution.python +import java.io.File + import scala.collection.JavaConverters._ import scala.collection.mutable.ArrayBuffer import net.razorvine.pickle.{Pickler, Unpickler} -import org.apache.spark.TaskContext +import org.apache.spark.{SparkEnv, TaskContext} import org.apache.spark.api.python.{ChainedPythonFunctions, PythonRunner} import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.execution.SparkPlan import org.apache.spark.sql.types.{DataType, StructField, StructType} +import org.apache.spark.util.{CompletionIterator, Utils} --- End diff -- Seems the import `CompletionIterator` is not used. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15089: [SPARK-15621] [SQL] Support spilling for Python U...
Github user lins05 commented on a diff in the pull request: https://github.com/apache/spark/pull/15089#discussion_r81212877 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/python/RowQueue.scala --- @@ -0,0 +1,276 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one or more +* contributor license agreements. See the NOTICE file distributed with +* this work for additional information regarding copyright ownership. +* The ASF licenses this file to You under the Apache License, Version 2.0 +* (the "License"); you may not use this file except in compliance with +* the License. You may obtain a copy of the License at +* +*http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.spark.sql.execution.python + +import java.io._ + +import com.google.common.io.Closeables + +import org.apache.spark.SparkException +import org.apache.spark.memory.{MemoryConsumer, TaskMemoryManager} +import org.apache.spark.sql.catalyst.expressions.UnsafeRow +import org.apache.spark.unsafe.Platform +import org.apache.spark.unsafe.memory.MemoryBlock + +/** + * A RowQueue is an FIFO queue for UnsafeRow. + * + * This RowQueue is ONLY designed and used for Python UDF, which has only one writer and only one + * reader, the reader ALWAYS ran behind the writer. + */ +private[python] trait RowQueue { + + /** + * Add a row to the end of it, returns true iff the row has added into it. + */ + def add(row: UnsafeRow): Boolean + + /** + * Retrieve and remove the first row, returns null if it's empty. + * + * It can only be called after add is called. + */ + def remove(): UnsafeRow + + /** + * Cleanup all the resources. + */ + def close(): Unit +} + +/** + * A RowQueue that is based on in-memory page. UnsafeRows are appended into it until it's full. + * Another thread could read from it at the same time (behind the writer). + * + * The format of UnsafeRow in page: + * [4 bytes to hold length of record (N)] [N bytes to hold record] [...] + */ +private[python] abstract class InMemoryRowQueue(val page: MemoryBlock, numFields: Int) + extends RowQueue { + private val base: AnyRef = page.getBaseObject + private val endOfPage: Long = page.getBaseOffset + page.size + // the first location where a new row would be written + private var writeOffset = page.getBaseOffset + // points to the start of the next row to read + private var readOffset = page.getBaseOffset + private val resultRow = new UnsafeRow(numFields) + + def add(row: UnsafeRow): Boolean = { +val size = row.getSizeInBytes +if (writeOffset + 4 + size > endOfPage) { + // if there is not enough space in this page to hold the new record + if (writeOffset + 4 <= endOfPage) { +// if there's extra space at the end of the page, store a special "end-of-page" length (-1) +Platform.putInt(base, writeOffset, -1) + } + false +} else { + Platform.putInt(base, writeOffset, size) + Platform.copyMemory(row.getBaseObject, row.getBaseOffset, base, writeOffset + 4, size) + writeOffset += 4 + size + true +} + } + + def remove(): UnsafeRow = { +if (readOffset + 4 > endOfPage || Platform.getInt(base, readOffset) < 0) { + null +} else { + val size = Platform.getInt(base, readOffset) + resultRow.pointTo(base, readOffset + 4, size) + readOffset += 4 + size + resultRow +} + } +} + +/** + * A RowQueue that is backed by a file on disk. This queue will stop accepting new rows once any + * reader has begun reading from the queue. + */ +private[python] case class DiskRowQueue(file: File, fields: Int) extends RowQueue { + private var out = new DataOutputStream( +new BufferedOutputStream(new FileOutputStream(file.toString))) + private var unreadBytes = 0L + + private var in: DataInputStream = _ + private val resultRow = new UnsafeRow(fields) + + def add(row: UnsafeRow): Boolean = synchronized { +if (out == null) { + // Another thread is reading, stop writing this one + return false +} +out.writeInt(row.getSizeInBytes) +out.write(row.getBytes) +unreadBytes += 4 + row.getSizeInBytes +
[GitHub] spark pull request #15089: [SPARK-15621] [SQL] Support spilling for Python U...
Github user davies commented on a diff in the pull request: https://github.com/apache/spark/pull/15089#discussion_r80532953 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/python/RowQueue.scala --- @@ -0,0 +1,278 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one or more +* contributor license agreements. See the NOTICE file distributed with +* this work for additional information regarding copyright ownership. +* The ASF licenses this file to You under the Apache License, Version 2.0 +* (the "License"); you may not use this file except in compliance with +* the License. You may obtain a copy of the License at +* +*http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.spark.sql.execution.python + +import java.io._ + +import com.google.common.io.Closeables + +import org.apache.spark.SparkException +import org.apache.spark.memory.{MemoryConsumer, TaskMemoryManager} +import org.apache.spark.sql.catalyst.expressions.UnsafeRow +import org.apache.spark.unsafe.Platform +import org.apache.spark.unsafe.memory.MemoryBlock + +/** + * A RowQueue is an FIFO queue for UnsafeRow. + */ +private[python] trait RowQueue { + /** + * Add a row to the end of it, returns true iff the row has added into it. + */ + def add(row: UnsafeRow): Boolean + + /** + * Retrieve and remove the first row, returns null if it's empty. + * + * It can only be called after add is called. + */ + def remove(): UnsafeRow + + /** + * Cleanup all the resources. + */ + def close(): Unit +} + +/** + * A RowQueue that is based on in-memory page. UnsafeRows are appended into it until it's full. + * Another thread could read from it at the same time (behind the writer). + * + * The format of UnsafeRow in page: + * [4 bytes to hold length of record (N)] [N bytes to hold record] [...] + */ +private[python] abstract class InMemoryRowQueue(val page: MemoryBlock, numFields: Int) + extends RowQueue { + private val base: AnyRef = page.getBaseObject + private val endOfPage: Long = page.getBaseOffset + page.size + // the first location where a new row would be written + private var writeOffset = page.getBaseOffset + // points to the start of the next row to read + private var readOffset = page.getBaseOffset + private val resultRow = new UnsafeRow(numFields) + + def add(row: UnsafeRow): Boolean = { +val size = row.getSizeInBytes +if (writeOffset + 4 + size > endOfPage) { + // if there is not enough space in this page to hold the new record + if (writeOffset + 4 <= endOfPage) { +// if there's extra space at the end of the page, store a special "end-of-page" length (-1) +Platform.putInt(base, writeOffset, -1) + } + false +} else { + Platform.putInt(base, writeOffset, size) + Platform.copyMemory(row.getBaseObject, row.getBaseOffset, base, writeOffset + 4, size) + writeOffset += 4 + size + true +} + } + + def remove(): UnsafeRow = { +if (readOffset + 4 > endOfPage || Platform.getInt(base, readOffset) < 0) { + null +} else { + val size = Platform.getInt(base, readOffset) + resultRow.pointTo(base, readOffset + 4, size) + readOffset += 4 + size + resultRow +} + } +} + +/** + * A RowQueue that is backed by a file on disk. This queue will stop accepting new rows once any + * reader has begun reading from the queue. + */ +private[python] case class DiskRowQueue(file: File, fields: Int) extends RowQueue { + private var fout = new FileOutputStream(file.toString) + private var out = new DataOutputStream(new BufferedOutputStream(fout)) + private var unreadBytes = 0L + + private var fin: FileInputStream = _ + private var in: DataInputStream = _ + private val resultRow = new UnsafeRow(fields) + + def add(row: UnsafeRow): Boolean = synchronized { +if (out == null) { + // Another thread is reading, stop writing this one + return false +} +out.writeInt(row.getSizeInBytes) +out.write(row.getBytes) +unreadBytes += 4 + row.getSizeInBytes +true + } + + def remove(): UnsafeRow = synchronized { +if (out != null) { +
[GitHub] spark pull request #15089: [SPARK-15621] [SQL] Support spilling for Python U...
Github user davies commented on a diff in the pull request: https://github.com/apache/spark/pull/15089#discussion_r80326226 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/python/RowQueue.scala --- @@ -0,0 +1,278 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one or more +* contributor license agreements. See the NOTICE file distributed with +* this work for additional information regarding copyright ownership. +* The ASF licenses this file to You under the Apache License, Version 2.0 +* (the "License"); you may not use this file except in compliance with +* the License. You may obtain a copy of the License at +* +*http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.spark.sql.execution.python + +import java.io._ + +import com.google.common.io.Closeables + +import org.apache.spark.SparkException +import org.apache.spark.memory.{MemoryConsumer, TaskMemoryManager} +import org.apache.spark.sql.catalyst.expressions.UnsafeRow +import org.apache.spark.unsafe.Platform +import org.apache.spark.unsafe.memory.MemoryBlock + +/** + * A RowQueue is an FIFO queue for UnsafeRow. + */ +private[python] trait RowQueue { + /** + * Add a row to the end of it, returns true iff the row has added into it. + */ + def add(row: UnsafeRow): Boolean + + /** + * Retrieve and remove the first row, returns null if it's empty. + * + * It can only be called after add is called. + */ + def remove(): UnsafeRow + + /** + * Cleanup all the resources. + */ + def close(): Unit +} + +/** + * A RowQueue that is based on in-memory page. UnsafeRows are appended into it until it's full. + * Another thread could read from it at the same time (behind the writer). + * + * The format of UnsafeRow in page: + * [4 bytes to hold length of record (N)] [N bytes to hold record] [...] + */ +private[python] abstract class InMemoryRowQueue(val page: MemoryBlock, numFields: Int) + extends RowQueue { + private val base: AnyRef = page.getBaseObject + private val endOfPage: Long = page.getBaseOffset + page.size + // the first location where a new row would be written + private var writeOffset = page.getBaseOffset + // points to the start of the next row to read + private var readOffset = page.getBaseOffset + private val resultRow = new UnsafeRow(numFields) + + def add(row: UnsafeRow): Boolean = { +val size = row.getSizeInBytes +if (writeOffset + 4 + size > endOfPage) { + // if there is not enough space in this page to hold the new record + if (writeOffset + 4 <= endOfPage) { +// if there's extra space at the end of the page, store a special "end-of-page" length (-1) +Platform.putInt(base, writeOffset, -1) + } + false +} else { + Platform.putInt(base, writeOffset, size) + Platform.copyMemory(row.getBaseObject, row.getBaseOffset, base, writeOffset + 4, size) + writeOffset += 4 + size + true +} + } + + def remove(): UnsafeRow = { +if (readOffset + 4 > endOfPage || Platform.getInt(base, readOffset) < 0) { + null +} else { + val size = Platform.getInt(base, readOffset) + resultRow.pointTo(base, readOffset + 4, size) + readOffset += 4 + size + resultRow +} + } +} + +/** + * A RowQueue that is backed by a file on disk. This queue will stop accepting new rows once any + * reader has begun reading from the queue. + */ +private[python] case class DiskRowQueue(file: File, fields: Int) extends RowQueue { + private var fout = new FileOutputStream(file.toString) + private var out = new DataOutputStream(new BufferedOutputStream(fout)) + private var unreadBytes = 0L + + private var fin: FileInputStream = _ + private var in: DataInputStream = _ + private val resultRow = new UnsafeRow(fields) + + def add(row: UnsafeRow): Boolean = synchronized { +if (out == null) { + // Another thread is reading, stop writing this one + return false +} +out.writeInt(row.getSizeInBytes) +out.write(row.getBytes) +unreadBytes += 4 + row.getSizeInBytes +true + } + + def remove(): UnsafeRow = synchronized { +if (out != null) { +
[GitHub] spark pull request #15089: [SPARK-15621] [SQL] Support spilling for Python U...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/15089#discussion_r80174300 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/python/RowQueue.scala --- @@ -0,0 +1,278 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one or more +* contributor license agreements. See the NOTICE file distributed with +* this work for additional information regarding copyright ownership. +* The ASF licenses this file to You under the Apache License, Version 2.0 +* (the "License"); you may not use this file except in compliance with +* the License. You may obtain a copy of the License at +* +*http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.spark.sql.execution.python + +import java.io._ + +import com.google.common.io.Closeables + +import org.apache.spark.SparkException +import org.apache.spark.memory.{MemoryConsumer, TaskMemoryManager} +import org.apache.spark.sql.catalyst.expressions.UnsafeRow +import org.apache.spark.unsafe.Platform +import org.apache.spark.unsafe.memory.MemoryBlock + +/** + * A RowQueue is an FIFO queue for UnsafeRow. + */ +private[python] trait RowQueue { + /** + * Add a row to the end of it, returns true iff the row has added into it. + */ + def add(row: UnsafeRow): Boolean + + /** + * Retrieve and remove the first row, returns null if it's empty. + * + * It can only be called after add is called. + */ + def remove(): UnsafeRow + + /** + * Cleanup all the resources. + */ + def close(): Unit +} + +/** + * A RowQueue that is based on in-memory page. UnsafeRows are appended into it until it's full. + * Another thread could read from it at the same time (behind the writer). + * + * The format of UnsafeRow in page: + * [4 bytes to hold length of record (N)] [N bytes to hold record] [...] + */ +private[python] abstract class InMemoryRowQueue(val page: MemoryBlock, numFields: Int) + extends RowQueue { + private val base: AnyRef = page.getBaseObject + private val endOfPage: Long = page.getBaseOffset + page.size + // the first location where a new row would be written + private var writeOffset = page.getBaseOffset + // points to the start of the next row to read + private var readOffset = page.getBaseOffset + private val resultRow = new UnsafeRow(numFields) + + def add(row: UnsafeRow): Boolean = { +val size = row.getSizeInBytes +if (writeOffset + 4 + size > endOfPage) { + // if there is not enough space in this page to hold the new record + if (writeOffset + 4 <= endOfPage) { +// if there's extra space at the end of the page, store a special "end-of-page" length (-1) +Platform.putInt(base, writeOffset, -1) + } + false +} else { + Platform.putInt(base, writeOffset, size) + Platform.copyMemory(row.getBaseObject, row.getBaseOffset, base, writeOffset + 4, size) + writeOffset += 4 + size + true +} + } + + def remove(): UnsafeRow = { +if (readOffset + 4 > endOfPage || Platform.getInt(base, readOffset) < 0) { + null +} else { + val size = Platform.getInt(base, readOffset) + resultRow.pointTo(base, readOffset + 4, size) + readOffset += 4 + size + resultRow +} + } +} + +/** + * A RowQueue that is backed by a file on disk. This queue will stop accepting new rows once any + * reader has begun reading from the queue. + */ +private[python] case class DiskRowQueue(file: File, fields: Int) extends RowQueue { + private var fout = new FileOutputStream(file.toString) + private var out = new DataOutputStream(new BufferedOutputStream(fout)) + private var unreadBytes = 0L + + private var fin: FileInputStream = _ + private var in: DataInputStream = _ + private val resultRow = new UnsafeRow(fields) + + def add(row: UnsafeRow): Boolean = synchronized { +if (out == null) { + // Another thread is reading, stop writing this one + return false +} +out.writeInt(row.getSizeInBytes) +out.write(row.getBytes) +unreadBytes += 4 + row.getSizeInBytes +true + } + + def remove(): UnsafeRow = synchronized { +if (out != null) { +
[GitHub] spark pull request #15089: [SPARK-15621] [SQL] Support spilling for Python U...
Github user JoshRosen commented on a diff in the pull request: https://github.com/apache/spark/pull/15089#discussion_r80128271 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/python/BatchEvalPythonExec.scala --- @@ -17,18 +17,270 @@ package org.apache.spark.sql.execution.python +import java.io._ + import scala.collection.JavaConverters._ import scala.collection.mutable.ArrayBuffer import net.razorvine.pickle.{Pickler, Unpickler} -import org.apache.spark.TaskContext +import org.apache.spark.{SparkEnv, TaskContext} import org.apache.spark.api.python.{ChainedPythonFunctions, PythonRunner} +import org.apache.spark.memory.{MemoryConsumer, TaskMemoryManager} import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.execution.SparkPlan import org.apache.spark.sql.types.{DataType, StructField, StructType} +import org.apache.spark.unsafe.Platform +import org.apache.spark.unsafe.memory.MemoryBlock +import org.apache.spark.util.{CompletionIterator, Utils} + + +/** + * A RowQueue is an FIFO queue for UnsafeRow. + */ +private[python] trait RowQueue { + /** + * Add a row to the end of it, returns true iff the row has added into it. + */ + def add(row: UnsafeRow): Boolean + + /** + * Retrieve and remove the first row, returns null if it's empty. + */ + def remove(): UnsafeRow + + /** + * Cleanup all the resources. + */ + def close(): Unit +} + +/** + * A RowQueue that is based on in-memory page. UnsafeRows are appended into it until it's full. + * Another thread could read from it at the same time (behind the writer). + */ +private[python] case class InMemoryRowQueue(page: MemoryBlock, fields: Int) extends RowQueue { + private val base: AnyRef = page.getBaseObject + private var last = page.getBaseOffset // for writing + private var first = page.getBaseOffset // for reading + private val resultRow = new UnsafeRow(fields) + + def add(row: UnsafeRow): Boolean = { +if (last + 4 + row.getSizeInBytes > page.getBaseOffset + page.size) { + if (last + 4 <= page.getBaseOffset + page.size) { +Platform.putInt(base, last, -1) + } + return false +} +Platform.putInt(base, last, row.getSizeInBytes) +Platform.copyMemory(row.getBaseObject, row.getBaseOffset, base, last + 4, row.getSizeInBytes) +last += 4 + row.getSizeInBytes +true + } + + def remove(): UnsafeRow = { +if (first + 4 > page.getBaseOffset + page.size || Platform.getInt(base, first) < 0) { --- End diff -- Should we add a variable to enforce this? It seems like it would be pretty cheap to add a check and in the past I think that guarding against this type of accidental misuse would have prevented subtle bugs. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15089: [SPARK-15621] [SQL] Support spilling for Python U...
Github user JoshRosen commented on a diff in the pull request: https://github.com/apache/spark/pull/15089#discussion_r80129158 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/python/BatchEvalPythonExec.scala --- @@ -17,18 +17,270 @@ package org.apache.spark.sql.execution.python +import java.io._ + import scala.collection.JavaConverters._ import scala.collection.mutable.ArrayBuffer import net.razorvine.pickle.{Pickler, Unpickler} -import org.apache.spark.TaskContext +import org.apache.spark.{SparkEnv, TaskContext} import org.apache.spark.api.python.{ChainedPythonFunctions, PythonRunner} +import org.apache.spark.memory.{MemoryConsumer, TaskMemoryManager} import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.execution.SparkPlan import org.apache.spark.sql.types.{DataType, StructField, StructType} +import org.apache.spark.unsafe.Platform +import org.apache.spark.unsafe.memory.MemoryBlock +import org.apache.spark.util.{CompletionIterator, Utils} + + +/** + * A RowQueue is an FIFO queue for UnsafeRow. + */ +private[python] trait RowQueue { + /** + * Add a row to the end of it, returns true iff the row has added into it. + */ + def add(row: UnsafeRow): Boolean + + /** + * Retrieve and remove the first row, returns null if it's empty. + */ + def remove(): UnsafeRow + + /** + * Cleanup all the resources. + */ + def close(): Unit +} + +/** + * A RowQueue that is based on in-memory page. UnsafeRows are appended into it until it's full. + * Another thread could read from it at the same time (behind the writer). + */ +private[python] case class InMemoryRowQueue(page: MemoryBlock, fields: Int) extends RowQueue { + private val base: AnyRef = page.getBaseObject + private var last = page.getBaseOffset // for writing + private var first = page.getBaseOffset // for reading + private val resultRow = new UnsafeRow(fields) + + def add(row: UnsafeRow): Boolean = { +if (last + 4 + row.getSizeInBytes > page.getBaseOffset + page.size) { + if (last + 4 <= page.getBaseOffset + page.size) { +Platform.putInt(base, last, -1) + } + return false +} +Platform.putInt(base, last, row.getSizeInBytes) +Platform.copyMemory(row.getBaseObject, row.getBaseOffset, base, last + 4, row.getSizeInBytes) +last += 4 + row.getSizeInBytes +true + } + + def remove(): UnsafeRow = { +if (first + 4 > page.getBaseOffset + page.size || Platform.getInt(base, first) < 0) { + null +} else { + val size = Platform.getInt(base, first) + resultRow.pointTo(base, first + 4, size) + first += 4 + size + resultRow +} + } + + def close(): Unit = { +// caller should override close() to free page + } +} + +/** + * A RowQueue that is based on file in disk. It will stop to push once someone start to read + * from it. + */ +private[python] case class DiskRowQueue(path: String, fields: Int) extends RowQueue { + private var fout = new FileOutputStream(path) + private var out = new DataOutputStream(new BufferedOutputStream(fout)) + private var length = 0L + + private var fin: FileInputStream = _ + private var in: DataInputStream = _ + private val resultRow = new UnsafeRow(fields) + + def add(row: UnsafeRow): Boolean = { +synchronized { + if (out == null) { +// Another thread is reading, stop writing this one +return false + } +} +out.writeInt(row.getSizeInBytes) +out.write(row.getBytes) +length += 4 + row.getSizeInBytes +true + } + + def remove(): UnsafeRow = { +synchronized { + if (out != null) { +out.flush() +out.close() +out = null +fout.close() +fout = null + +fin = new FileInputStream(path) +in = new DataInputStream(new BufferedInputStream(fin)) --- End diff -- `DataInputStream` extends `FilterInputStream(InputStream)` and that class's `close()` method just calls the wrapped / underlying stream's `close()`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request #15089: [SPARK-15621] [SQL] Support spilling for Python U...
Github user JoshRosen commented on a diff in the pull request: https://github.com/apache/spark/pull/15089#discussion_r80130429 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/python/RowQueue.scala --- @@ -0,0 +1,278 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one or more +* contributor license agreements. See the NOTICE file distributed with +* this work for additional information regarding copyright ownership. +* The ASF licenses this file to You under the Apache License, Version 2.0 +* (the "License"); you may not use this file except in compliance with +* the License. You may obtain a copy of the License at +* +*http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.spark.sql.execution.python + +import java.io._ + +import com.google.common.io.Closeables + +import org.apache.spark.SparkException +import org.apache.spark.memory.{MemoryConsumer, TaskMemoryManager} +import org.apache.spark.sql.catalyst.expressions.UnsafeRow +import org.apache.spark.unsafe.Platform +import org.apache.spark.unsafe.memory.MemoryBlock + +/** + * A RowQueue is an FIFO queue for UnsafeRow. + */ +private[python] trait RowQueue { + /** + * Add a row to the end of it, returns true iff the row has added into it. + */ + def add(row: UnsafeRow): Boolean + + /** + * Retrieve and remove the first row, returns null if it's empty. + * + * It can only be called after add is called. + */ + def remove(): UnsafeRow + + /** + * Cleanup all the resources. + */ + def close(): Unit +} + +/** + * A RowQueue that is based on in-memory page. UnsafeRows are appended into it until it's full. + * Another thread could read from it at the same time (behind the writer). + * + * The format of UnsafeRow in page: + * [4 bytes to hold length of record (N)] [N bytes to hold record] [...] + */ +private[python] abstract class InMemoryRowQueue(val page: MemoryBlock, numFields: Int) + extends RowQueue { + private val base: AnyRef = page.getBaseObject + private val endOfPage: Long = page.getBaseOffset + page.size + // the first location where a new row would be written + private var writeOffset = page.getBaseOffset + // points to the start of the next row to read + private var readOffset = page.getBaseOffset + private val resultRow = new UnsafeRow(numFields) + + def add(row: UnsafeRow): Boolean = { +val size = row.getSizeInBytes +if (writeOffset + 4 + size > endOfPage) { + // if there is not enough space in this page to hold the new record + if (writeOffset + 4 <= endOfPage) { +// if there's extra space at the end of the page, store a special "end-of-page" length (-1) +Platform.putInt(base, writeOffset, -1) + } + false +} else { + Platform.putInt(base, writeOffset, size) + Platform.copyMemory(row.getBaseObject, row.getBaseOffset, base, writeOffset + 4, size) + writeOffset += 4 + size + true +} + } + + def remove(): UnsafeRow = { +if (readOffset + 4 > endOfPage || Platform.getInt(base, readOffset) < 0) { + null +} else { + val size = Platform.getInt(base, readOffset) + resultRow.pointTo(base, readOffset + 4, size) + readOffset += 4 + size + resultRow +} + } +} + +/** + * A RowQueue that is backed by a file on disk. This queue will stop accepting new rows once any + * reader has begun reading from the queue. + */ +private[python] case class DiskRowQueue(file: File, fields: Int) extends RowQueue { + private var fout = new FileOutputStream(file.toString) + private var out = new DataOutputStream(new BufferedOutputStream(fout)) + private var unreadBytes = 0L + + private var fin: FileInputStream = _ + private var in: DataInputStream = _ + private val resultRow = new UnsafeRow(fields) + + def add(row: UnsafeRow): Boolean = synchronized { +if (out == null) { + // Another thread is reading, stop writing this one + return false +} +out.writeInt(row.getSizeInBytes) +out.write(row.getBytes) +unreadBytes += 4 + row.getSizeInBytes +true + } + + def remove(): UnsafeRow = synchronized { +if (out != null) { +
[GitHub] spark pull request #15089: [SPARK-15621] [SQL] Support spilling for Python U...
Github user davies commented on a diff in the pull request: https://github.com/apache/spark/pull/15089#discussion_r79253457 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/python/BatchEvalPythonExec.scala --- @@ -17,18 +17,270 @@ package org.apache.spark.sql.execution.python +import java.io._ + import scala.collection.JavaConverters._ import scala.collection.mutable.ArrayBuffer import net.razorvine.pickle.{Pickler, Unpickler} -import org.apache.spark.TaskContext +import org.apache.spark.{SparkEnv, TaskContext} import org.apache.spark.api.python.{ChainedPythonFunctions, PythonRunner} +import org.apache.spark.memory.{MemoryConsumer, TaskMemoryManager} import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.execution.SparkPlan import org.apache.spark.sql.types.{DataType, StructField, StructType} +import org.apache.spark.unsafe.Platform +import org.apache.spark.unsafe.memory.MemoryBlock +import org.apache.spark.util.{CompletionIterator, Utils} + + +/** + * A RowQueue is an FIFO queue for UnsafeRow. + */ +private[python] trait RowQueue { + /** + * Add a row to the end of it, returns true iff the row has added into it. + */ + def add(row: UnsafeRow): Boolean + + /** + * Retrieve and remove the first row, returns null if it's empty. + */ + def remove(): UnsafeRow + + /** + * Cleanup all the resources. + */ + def close(): Unit +} + +/** + * A RowQueue that is based on in-memory page. UnsafeRows are appended into it until it's full. + * Another thread could read from it at the same time (behind the writer). + */ +private[python] case class InMemoryRowQueue(page: MemoryBlock, fields: Int) extends RowQueue { + private val base: AnyRef = page.getBaseObject + private var last = page.getBaseOffset // for writing + private var first = page.getBaseOffset // for reading + private val resultRow = new UnsafeRow(fields) + + def add(row: UnsafeRow): Boolean = { +if (last + 4 + row.getSizeInBytes > page.getBaseOffset + page.size) { + if (last + 4 <= page.getBaseOffset + page.size) { +Platform.putInt(base, last, -1) + } + return false +} +Platform.putInt(base, last, row.getSizeInBytes) +Platform.copyMemory(row.getBaseObject, row.getBaseOffset, base, last + 4, row.getSizeInBytes) +last += 4 + row.getSizeInBytes +true + } + + def remove(): UnsafeRow = { +if (first + 4 > page.getBaseOffset + page.size || Platform.getInt(base, first) < 0) { + null +} else { + val size = Platform.getInt(base, first) + resultRow.pointTo(base, first + 4, size) + first += 4 + size + resultRow +} + } + + def close(): Unit = { +// caller should override close() to free page + } +} + +/** + * A RowQueue that is based on file in disk. It will stop to push once someone start to read + * from it. + */ +private[python] case class DiskRowQueue(path: String, fields: Int) extends RowQueue { + private var fout = new FileOutputStream(path) + private var out = new DataOutputStream(new BufferedOutputStream(fout)) + private var length = 0L + + private var fin: FileInputStream = _ + private var in: DataInputStream = _ + private val resultRow = new UnsafeRow(fields) + + def add(row: UnsafeRow): Boolean = { +synchronized { + if (out == null) { +// Another thread is reading, stop writing this one +return false + } +} +out.writeInt(row.getSizeInBytes) +out.write(row.getBytes) +length += 4 + row.getSizeInBytes +true + } + + def remove(): UnsafeRow = { +synchronized { + if (out != null) { +out.flush() +out.close() +out = null +fout.close() +fout = null + +fin = new FileInputStream(path) +in = new DataInputStream(new BufferedInputStream(fin)) + } +} + +if (length > 0) { + val size = in.readInt() + assert(4 + size <= length, s"require ${4 + size} bytes for next row") + val bytes = new Array[Byte](size) + in.readFully(bytes) + length -= 4 + size + resultRow.pointTo(bytes, size) + resultRow +} else { + null +} + } + + def close(): Unit = { +synchronized { + if (fout != null) { +fout.close() +fout = null + } + if (fin !=
[GitHub] spark pull request #15089: [SPARK-15621] [SQL] Support spilling for Python U...
Github user davies commented on a diff in the pull request: https://github.com/apache/spark/pull/15089#discussion_r79253378 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/python/BatchEvalPythonExec.scala --- @@ -17,18 +17,270 @@ package org.apache.spark.sql.execution.python +import java.io._ + import scala.collection.JavaConverters._ import scala.collection.mutable.ArrayBuffer import net.razorvine.pickle.{Pickler, Unpickler} -import org.apache.spark.TaskContext +import org.apache.spark.{SparkEnv, TaskContext} import org.apache.spark.api.python.{ChainedPythonFunctions, PythonRunner} +import org.apache.spark.memory.{MemoryConsumer, TaskMemoryManager} import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.execution.SparkPlan import org.apache.spark.sql.types.{DataType, StructField, StructType} +import org.apache.spark.unsafe.Platform +import org.apache.spark.unsafe.memory.MemoryBlock +import org.apache.spark.util.{CompletionIterator, Utils} + + +/** + * A RowQueue is an FIFO queue for UnsafeRow. + */ +private[python] trait RowQueue { + /** + * Add a row to the end of it, returns true iff the row has added into it. + */ + def add(row: UnsafeRow): Boolean + + /** + * Retrieve and remove the first row, returns null if it's empty. + */ + def remove(): UnsafeRow + + /** + * Cleanup all the resources. + */ + def close(): Unit +} + +/** + * A RowQueue that is based on in-memory page. UnsafeRows are appended into it until it's full. + * Another thread could read from it at the same time (behind the writer). + */ +private[python] case class InMemoryRowQueue(page: MemoryBlock, fields: Int) extends RowQueue { + private val base: AnyRef = page.getBaseObject + private var last = page.getBaseOffset // for writing + private var first = page.getBaseOffset // for reading + private val resultRow = new UnsafeRow(fields) + + def add(row: UnsafeRow): Boolean = { +if (last + 4 + row.getSizeInBytes > page.getBaseOffset + page.size) { + if (last + 4 <= page.getBaseOffset + page.size) { +Platform.putInt(base, last, -1) + } + return false +} +Platform.putInt(base, last, row.getSizeInBytes) +Platform.copyMemory(row.getBaseObject, row.getBaseOffset, base, last + 4, row.getSizeInBytes) +last += 4 + row.getSizeInBytes +true + } + + def remove(): UnsafeRow = { +if (first + 4 > page.getBaseOffset + page.size || Platform.getInt(base, first) < 0) { + null +} else { + val size = Platform.getInt(base, first) + resultRow.pointTo(base, first + 4, size) + first += 4 + size + resultRow +} + } + + def close(): Unit = { +// caller should override close() to free page + } +} + +/** + * A RowQueue that is based on file in disk. It will stop to push once someone start to read + * from it. + */ +private[python] case class DiskRowQueue(path: String, fields: Int) extends RowQueue { + private var fout = new FileOutputStream(path) + private var out = new DataOutputStream(new BufferedOutputStream(fout)) + private var length = 0L + + private var fin: FileInputStream = _ + private var in: DataInputStream = _ + private val resultRow = new UnsafeRow(fields) + + def add(row: UnsafeRow): Boolean = { +synchronized { + if (out == null) { +// Another thread is reading, stop writing this one +return false + } +} +out.writeInt(row.getSizeInBytes) +out.write(row.getBytes) +length += 4 + row.getSizeInBytes +true + } + + def remove(): UnsafeRow = { +synchronized { + if (out != null) { +out.flush() +out.close() +out = null +fout.close() +fout = null + +fin = new FileInputStream(path) +in = new DataInputStream(new BufferedInputStream(fin)) + } +} + +if (length > 0) { + val size = in.readInt() + assert(4 + size <= length, s"require ${4 + size} bytes for next row") + val bytes = new Array[Byte](size) + in.readFully(bytes) + length -= 4 + size + resultRow.pointTo(bytes, size) + resultRow +} else { + null +} + } + + def close(): Unit = { +synchronized { + if (fout != null) { +fout.close() +fout = null + } + if (fin !=
[GitHub] spark pull request #15089: [SPARK-15621] [SQL] Support spilling for Python U...
Github user davies commented on a diff in the pull request: https://github.com/apache/spark/pull/15089#discussion_r79038500 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/python/BatchEvalPythonExec.scala --- @@ -17,18 +17,270 @@ package org.apache.spark.sql.execution.python +import java.io._ + import scala.collection.JavaConverters._ import scala.collection.mutable.ArrayBuffer import net.razorvine.pickle.{Pickler, Unpickler} -import org.apache.spark.TaskContext +import org.apache.spark.{SparkEnv, TaskContext} import org.apache.spark.api.python.{ChainedPythonFunctions, PythonRunner} +import org.apache.spark.memory.{MemoryConsumer, TaskMemoryManager} import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.execution.SparkPlan import org.apache.spark.sql.types.{DataType, StructField, StructType} +import org.apache.spark.unsafe.Platform +import org.apache.spark.unsafe.memory.MemoryBlock +import org.apache.spark.util.{CompletionIterator, Utils} + + +/** + * A RowQueue is an FIFO queue for UnsafeRow. + */ +private[python] trait RowQueue { + /** + * Add a row to the end of it, returns true iff the row has added into it. + */ + def add(row: UnsafeRow): Boolean + + /** + * Retrieve and remove the first row, returns null if it's empty. + */ + def remove(): UnsafeRow + + /** + * Cleanup all the resources. + */ + def close(): Unit +} + +/** + * A RowQueue that is based on in-memory page. UnsafeRows are appended into it until it's full. + * Another thread could read from it at the same time (behind the writer). + */ +private[python] case class InMemoryRowQueue(page: MemoryBlock, fields: Int) extends RowQueue { + private val base: AnyRef = page.getBaseObject + private var last = page.getBaseOffset // for writing + private var first = page.getBaseOffset // for reading + private val resultRow = new UnsafeRow(fields) + + def add(row: UnsafeRow): Boolean = { +if (last + 4 + row.getSizeInBytes > page.getBaseOffset + page.size) { + if (last + 4 <= page.getBaseOffset + page.size) { +Platform.putInt(base, last, -1) + } + return false +} +Platform.putInt(base, last, row.getSizeInBytes) +Platform.copyMemory(row.getBaseObject, row.getBaseOffset, base, last + 4, row.getSizeInBytes) +last += 4 + row.getSizeInBytes +true + } + + def remove(): UnsafeRow = { +if (first + 4 > page.getBaseOffset + page.size || Platform.getInt(base, first) < 0) { + null +} else { + val size = Platform.getInt(base, first) + resultRow.pointTo(base, first + 4, size) + first += 4 + size + resultRow +} + } + + def close(): Unit = { +// caller should override close() to free page + } +} + +/** + * A RowQueue that is based on file in disk. It will stop to push once someone start to read + * from it. + */ +private[python] case class DiskRowQueue(path: String, fields: Int) extends RowQueue { + private var fout = new FileOutputStream(path) + private var out = new DataOutputStream(new BufferedOutputStream(fout)) + private var length = 0L + + private var fin: FileInputStream = _ + private var in: DataInputStream = _ + private val resultRow = new UnsafeRow(fields) + + def add(row: UnsafeRow): Boolean = { +synchronized { + if (out == null) { +// Another thread is reading, stop writing this one +return false + } +} +out.writeInt(row.getSizeInBytes) +out.write(row.getBytes) +length += 4 + row.getSizeInBytes +true + } + + def remove(): UnsafeRow = { +synchronized { + if (out != null) { +out.flush() +out.close() +out = null +fout.close() +fout = null + +fin = new FileInputStream(path) +in = new DataInputStream(new BufferedInputStream(fin)) --- End diff -- the close of in will not call fin.close(), I think. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail:
[GitHub] spark pull request #15089: [SPARK-15621] [SQL] Support spilling for Python U...
Github user davies commented on a diff in the pull request: https://github.com/apache/spark/pull/15089#discussion_r79037083 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/python/BatchEvalPythonExec.scala --- @@ -17,18 +17,270 @@ package org.apache.spark.sql.execution.python +import java.io._ + import scala.collection.JavaConverters._ import scala.collection.mutable.ArrayBuffer import net.razorvine.pickle.{Pickler, Unpickler} -import org.apache.spark.TaskContext +import org.apache.spark.{SparkEnv, TaskContext} import org.apache.spark.api.python.{ChainedPythonFunctions, PythonRunner} +import org.apache.spark.memory.{MemoryConsumer, TaskMemoryManager} import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.execution.SparkPlan import org.apache.spark.sql.types.{DataType, StructField, StructType} +import org.apache.spark.unsafe.Platform +import org.apache.spark.unsafe.memory.MemoryBlock +import org.apache.spark.util.{CompletionIterator, Utils} + + +/** + * A RowQueue is an FIFO queue for UnsafeRow. + */ +private[python] trait RowQueue { + /** + * Add a row to the end of it, returns true iff the row has added into it. + */ + def add(row: UnsafeRow): Boolean + + /** + * Retrieve and remove the first row, returns null if it's empty. + */ + def remove(): UnsafeRow + + /** + * Cleanup all the resources. + */ + def close(): Unit +} + +/** + * A RowQueue that is based on in-memory page. UnsafeRows are appended into it until it's full. + * Another thread could read from it at the same time (behind the writer). + */ +private[python] case class InMemoryRowQueue(page: MemoryBlock, fields: Int) extends RowQueue { + private val base: AnyRef = page.getBaseObject + private var last = page.getBaseOffset // for writing + private var first = page.getBaseOffset // for reading + private val resultRow = new UnsafeRow(fields) + + def add(row: UnsafeRow): Boolean = { +if (last + 4 + row.getSizeInBytes > page.getBaseOffset + page.size) { + if (last + 4 <= page.getBaseOffset + page.size) { +Platform.putInt(base, last, -1) + } + return false +} +Platform.putInt(base, last, row.getSizeInBytes) +Platform.copyMemory(row.getBaseObject, row.getBaseOffset, base, last + 4, row.getSizeInBytes) +last += 4 + row.getSizeInBytes +true + } + + def remove(): UnsafeRow = { +if (first + 4 > page.getBaseOffset + page.size || Platform.getInt(base, first) < 0) { --- End diff -- Remove() should only be called when we know that there are at least one row in it. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15089: [SPARK-15621] [SQL] Support spilling for Python U...
Github user davies commented on a diff in the pull request: https://github.com/apache/spark/pull/15089#discussion_r79034499 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/python/BatchEvalPythonExec.scala --- @@ -98,7 +353,7 @@ case class BatchEvalPythonExec(udfs: Seq[PythonUDF], output: Seq[Attribute], chi // For each row, add it to the queue. val inputIterator = iter.grouped(100).map { inputRows => val toBePickled = inputRows.map { inputRow => - queue.add(inputRow) + queue.add(inputRow.asInstanceOf[UnsafeRow]) --- End diff -- No, we always use UnsafeRow between operators. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15089: [SPARK-15621] [SQL] Support spilling for Python U...
GitHub user davies opened a pull request: https://github.com/apache/spark/pull/15089 [SPARK-15621] [SQL] Support spilling for Python UDF ## What changes were proposed in this pull request? When execute a Python UDF, we buffer the input row into as queue, then pull them out to join with the result from Python UDF. In the case that Python UDF is slow or the input row is too wide, we could ran out of memory because of the queue. Since we can't flush all the buffers (sockets) between JVM and Python process from JVM side, we can't limit the rows in the queue, otherwise it could deadlock. This PR will manage the memory used by the queue, spill that into disk when there is no enough memory (also release the memory and disk space as soon as possible). ## How was this patch tested? Added unit tests. Also manually ran a workload with large input row and slow python UDF (with large broadcast) like this: ``` b = range(1<<24) add = udf(lambda x: x + len(b), IntegerType()) df = sqlContext.range(1, 1<<26, 1, 4) print df.select(df.id, lit("adf"*1).alias("s"), add(df.id).alias("add")).groupBy(length("s")).sum().collect() ``` It ran out of memory (hang because of full GC) before the patch, ran smoothly after the patch. You can merge this pull request into a Git repository by running: $ git pull https://github.com/davies/spark spill_udf Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/15089.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #15089 commit 4964b9a611ed01aaa5252ac642df94db07a38868 Author: Davies LiuDate: 2016-09-13T23:47:31Z spill the buffer for Python UDF into disk --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org