[GitHub] spark pull request #15089: [SPARK-15621] [SQL] Support spilling for Python U...

2016-10-07 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/15089


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15089: [SPARK-15621] [SQL] Support spilling for Python U...

2016-09-30 Thread davies
Github user davies commented on a diff in the pull request:

https://github.com/apache/spark/pull/15089#discussion_r81419085
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/python/BatchEvalPythonExec.scala
 ---
@@ -98,7 +353,7 @@ case class BatchEvalPythonExec(udfs: Seq[PythonUDF], 
output: Seq[Attribute], chi
   // For each row, add it to the queue.
   val inputIterator = iter.grouped(100).map { inputRows =>
 val toBePickled = inputRows.map { inputRow =>
-  queue.add(inputRow)
+  queue.add(inputRow.asInstanceOf[UnsafeRow])
--- End diff --

Comparing to assert, the error message actually looks similar: the row 
should be UnsafeRow but actually it's not. I think this is an fundamental 
assumption, there are many places (20 for now) that has this.
```
davies@localhost:~/work/spark$ git grep '.asInstanceOf\[UnsafeRow\]' | wc -l
20
```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15089: [SPARK-15621] [SQL] Support spilling for Python U...

2016-09-29 Thread rxin
Github user rxin commented on a diff in the pull request:

https://github.com/apache/spark/pull/15089#discussion_r81237967
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/python/RowQueue.scala ---
@@ -0,0 +1,277 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements.  See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License.  You may obtain a copy of the License at
+*
+*http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.spark.sql.execution.python
+
+import java.io._
+
+import com.google.common.io.Closeables
+
+import org.apache.spark.SparkException
+import org.apache.spark.memory.{MemoryConsumer, TaskMemoryManager}
+import org.apache.spark.sql.catalyst.expressions.UnsafeRow
+import org.apache.spark.unsafe.Platform
+import org.apache.spark.unsafe.memory.MemoryBlock
+
+/**
+ * A RowQueue is an FIFO queue for UnsafeRow.
+ *
+ * This RowQueue is ONLY designed and used for Python UDF, which has only 
one writer and only one
+ * reader, the reader ALWAYS ran behind the writer. See the doc of class 
[[BatchEvalPythonExec]]
+ * on how it works.
+ */
+private[python] trait RowQueue {
+
+  /**
+   * Add a row to the end of it, returns true iff the row has added into 
it.
+   */
+  def add(row: UnsafeRow): Boolean
+
+  /**
+   * Retrieve and remove the first row, returns null if it's empty.
+   *
+   * It can only be called after add is called.
+   */
+  def remove(): UnsafeRow
+
+  /**
+   * Cleanup all the resources.
+   */
+  def close(): Unit
+}
+
+/**
+ * A RowQueue that is based on in-memory page. UnsafeRows are appended 
into it until it's full.
+ * Another thread could read from it at the same time (behind the writer).
+ *
+ * The format of UnsafeRow in page:
+ * [4 bytes to hold length of record (N)] [N bytes to hold record] [...]
+ */
+private[python] abstract class InMemoryRowQueue(val page: MemoryBlock, 
numFields: Int)
+  extends RowQueue {
+  private val base: AnyRef = page.getBaseObject
+  private val endOfPage: Long = page.getBaseOffset + page.size
--- End diff --

My biggest question here is how the java memory model actually works with 
respect to two threads reading/writing using unsafe without any synchronization 
primitives. I wouldn't be surprised if this could lead to stale reads. We 
should verify this wouldn't be an issue; otherwise we should add explicit 
synchronization.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15089: [SPARK-15621] [SQL] Support spilling for Python U...

2016-09-29 Thread rxin
Github user rxin commented on a diff in the pull request:

https://github.com/apache/spark/pull/15089#discussion_r81232667
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/python/BatchEvalPythonExec.scala
 ---
@@ -98,7 +353,7 @@ case class BatchEvalPythonExec(udfs: Seq[PythonUDF], 
output: Seq[Attribute], chi
   // For each row, add it to the queue.
   val inputIterator = iter.grouped(100).map { inputRows =>
 val toBePickled = inputRows.map { inputRow =>
-  queue.add(inputRow)
+  queue.add(inputRow.asInstanceOf[UnsafeRow])
--- End diff --

Can we add an assert with a msg? I worry if we ever change that decision 
it's difficult to figure out what's going on.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15089: [SPARK-15621] [SQL] Support spilling for Python U...

2016-09-29 Thread rxin
Github user rxin commented on a diff in the pull request:

https://github.com/apache/spark/pull/15089#discussion_r81234464
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/python/RowQueue.scala ---
@@ -0,0 +1,277 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements.  See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License.  You may obtain a copy of the License at
+*
+*http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.spark.sql.execution.python
+
+import java.io._
+
+import com.google.common.io.Closeables
+
+import org.apache.spark.SparkException
+import org.apache.spark.memory.{MemoryConsumer, TaskMemoryManager}
+import org.apache.spark.sql.catalyst.expressions.UnsafeRow
+import org.apache.spark.unsafe.Platform
+import org.apache.spark.unsafe.memory.MemoryBlock
+
+/**
+ * A RowQueue is an FIFO queue for UnsafeRow.
+ *
+ * This RowQueue is ONLY designed and used for Python UDF, which has only 
one writer and only one
+ * reader, the reader ALWAYS ran behind the writer. See the doc of class 
[[BatchEvalPythonExec]]
+ * on how it works.
+ */
+private[python] trait RowQueue {
+
+  /**
+   * Add a row to the end of it, returns true iff the row has added into 
it.
+   */
+  def add(row: UnsafeRow): Boolean
+
+  /**
+   * Retrieve and remove the first row, returns null if it's empty.
+   *
+   * It can only be called after add is called.
+   */
+  def remove(): UnsafeRow
+
+  /**
+   * Cleanup all the resources.
+   */
+  def close(): Unit
+}
+
+/**
+ * A RowQueue that is based on in-memory page. UnsafeRows are appended 
into it until it's full.
+ * Another thread could read from it at the same time (behind the writer).
+ *
+ * The format of UnsafeRow in page:
+ * [4 bytes to hold length of record (N)] [N bytes to hold record] [...]
--- End diff --

I would document -1 length means end of a page here too.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15089: [SPARK-15621] [SQL] Support spilling for Python U...

2016-09-29 Thread rxin
Github user rxin commented on a diff in the pull request:

https://github.com/apache/spark/pull/15089#discussion_r81235033
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/python/RowQueue.scala ---
@@ -0,0 +1,277 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements.  See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License.  You may obtain a copy of the License at
+*
+*http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.spark.sql.execution.python
+
+import java.io._
+
+import com.google.common.io.Closeables
+
+import org.apache.spark.SparkException
+import org.apache.spark.memory.{MemoryConsumer, TaskMemoryManager}
+import org.apache.spark.sql.catalyst.expressions.UnsafeRow
+import org.apache.spark.unsafe.Platform
+import org.apache.spark.unsafe.memory.MemoryBlock
+
+/**
+ * A RowQueue is an FIFO queue for UnsafeRow.
+ *
+ * This RowQueue is ONLY designed and used for Python UDF, which has only 
one writer and only one
+ * reader, the reader ALWAYS ran behind the writer. See the doc of class 
[[BatchEvalPythonExec]]
+ * on how it works.
+ */
+private[python] trait RowQueue {
+
+  /**
+   * Add a row to the end of it, returns true iff the row has added into 
it.
--- End diff --

OK i understand this now after reading more code. Regardless it would be 
great to explain some of the possible causes when this can return false, e.g. 
when we run out of buffer space.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15089: [SPARK-15621] [SQL] Support spilling for Python U...

2016-09-29 Thread rxin
Github user rxin commented on a diff in the pull request:

https://github.com/apache/spark/pull/15089#discussion_r81233633
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/python/BatchEvalPythonExec.scala
 ---
@@ -37,9 +40,25 @@ import org.apache.spark.sql.types.{DataType, 
StructField, StructType}
  * Python evaluation works by sending the necessary (projected) input data 
via a socket to an
  * external Python process, and combine the result from the Python process 
with the original row.
  *
- * For each row we send to Python, we also put it in a queue. For each 
output row from Python,
+ * For each row we send to Python, we also put it in a queue first. For 
each output row from Python,
  * we drain the queue to find the original input row. Note that if the 
Python process is way too
- * slow, this could lead to the queue growing unbounded and eventually run 
out of memory.
+ * slow, this could lead to the queue growing unbounded and spill into 
disk when run out of memory.
+ *
+ * Here is a diagram to show how this works:
+ *
+ *Upstream (from child)
--- End diff --

i suggest switching upstream and downstream and and put child operator 
(upstream) at the bottom, and parent operator (downstream) at the top to be 
more consistent with databases. basically the "tree" would face upward.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15089: [SPARK-15621] [SQL] Support spilling for Python U...

2016-09-29 Thread rxin
Github user rxin commented on a diff in the pull request:

https://github.com/apache/spark/pull/15089#discussion_r81232436
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/python/BatchEvalPythonExec.scala
 ---
@@ -70,7 +89,13 @@ case class BatchEvalPythonExec(udfs: Seq[PythonUDF], 
output: Seq[Attribute], chi
 
   // The queue used to buffer input rows so we can drain it to
   // combine input with output from Python.
-  val queue = new 
java.util.concurrent.ConcurrentLinkedQueue[InternalRow]()
+  val queue = HybridRowQueue(
+TaskContext.get().taskMemoryManager(),
+new File(Utils.getLocalDir(SparkEnv.get.conf)),
+child.output.length)
--- End diff --

nit: move this to the previous line? the indent is wrong here


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15089: [SPARK-15621] [SQL] Support spilling for Python U...

2016-09-29 Thread rxin
Github user rxin commented on a diff in the pull request:

https://github.com/apache/spark/pull/15089#discussion_r81234071
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/python/RowQueue.scala ---
@@ -0,0 +1,277 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements.  See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License.  You may obtain a copy of the License at
+*
+*http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.spark.sql.execution.python
+
+import java.io._
+
+import com.google.common.io.Closeables
+
+import org.apache.spark.SparkException
+import org.apache.spark.memory.{MemoryConsumer, TaskMemoryManager}
+import org.apache.spark.sql.catalyst.expressions.UnsafeRow
+import org.apache.spark.unsafe.Platform
+import org.apache.spark.unsafe.memory.MemoryBlock
+
+/**
+ * A RowQueue is an FIFO queue for UnsafeRow.
+ *
+ * This RowQueue is ONLY designed and used for Python UDF, which has only 
one writer and only one
+ * reader, the reader ALWAYS ran behind the writer. See the doc of class 
[[BatchEvalPythonExec]]
+ * on how it works.
+ */
+private[python] trait RowQueue {
+
+  /**
+   * Add a row to the end of it, returns true iff the row has added into 
it.
+   */
+  def add(row: UnsafeRow): Boolean
+
+  /**
+   * Retrieve and remove the first row, returns null if it's empty.
+   *
+   * It can only be called after add is called.
--- End diff --

This sentence is not obvious to me what it means, or why this matters. Can 
you explain?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15089: [SPARK-15621] [SQL] Support spilling for Python U...

2016-09-29 Thread rxin
Github user rxin commented on a diff in the pull request:

https://github.com/apache/spark/pull/15089#discussion_r81234675
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/python/RowQueue.scala ---
@@ -0,0 +1,277 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements.  See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License.  You may obtain a copy of the License at
+*
+*http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.spark.sql.execution.python
+
+import java.io._
+
+import com.google.common.io.Closeables
+
+import org.apache.spark.SparkException
+import org.apache.spark.memory.{MemoryConsumer, TaskMemoryManager}
+import org.apache.spark.sql.catalyst.expressions.UnsafeRow
+import org.apache.spark.unsafe.Platform
+import org.apache.spark.unsafe.memory.MemoryBlock
+
+/**
+ * A RowQueue is an FIFO queue for UnsafeRow.
+ *
+ * This RowQueue is ONLY designed and used for Python UDF, which has only 
one writer and only one
+ * reader, the reader ALWAYS ran behind the writer. See the doc of class 
[[BatchEvalPythonExec]]
+ * on how it works.
+ */
+private[python] trait RowQueue {
+
+  /**
+   * Add a row to the end of it, returns true iff the row has added into 
it.
+   */
+  def add(row: UnsafeRow): Boolean
+
+  /**
+   * Retrieve and remove the first row, returns null if it's empty.
+   *
+   * It can only be called after add is called.
+   */
+  def remove(): UnsafeRow
+
+  /**
+   * Cleanup all the resources.
+   */
+  def close(): Unit
+}
+
+/**
+ * A RowQueue that is based on in-memory page. UnsafeRows are appended 
into it until it's full.
+ * Another thread could read from it at the same time (behind the writer).
+ *
+ * The format of UnsafeRow in page:
+ * [4 bytes to hold length of record (N)] [N bytes to hold record] [...]
+ */
+private[python] abstract class InMemoryRowQueue(val page: MemoryBlock, 
numFields: Int)
--- End diff --

can we add more asserts everywhere to guard the invariant readOffset <= 
writeOffset?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15089: [SPARK-15621] [SQL] Support spilling for Python U...

2016-09-29 Thread rxin
Github user rxin commented on a diff in the pull request:

https://github.com/apache/spark/pull/15089#discussion_r81233940
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/python/RowQueue.scala ---
@@ -0,0 +1,277 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements.  See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License.  You may obtain a copy of the License at
+*
+*http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.spark.sql.execution.python
+
+import java.io._
+
+import com.google.common.io.Closeables
+
+import org.apache.spark.SparkException
+import org.apache.spark.memory.{MemoryConsumer, TaskMemoryManager}
+import org.apache.spark.sql.catalyst.expressions.UnsafeRow
+import org.apache.spark.unsafe.Platform
+import org.apache.spark.unsafe.memory.MemoryBlock
+
+/**
+ * A RowQueue is an FIFO queue for UnsafeRow.
+ *
+ * This RowQueue is ONLY designed and used for Python UDF, which has only 
one writer and only one
+ * reader, the reader ALWAYS ran behind the writer. See the doc of class 
[[BatchEvalPythonExec]]
+ * on how it works.
+ */
+private[python] trait RowQueue {
+
+  /**
+   * Add a row to the end of it, returns true iff the row has added into 
it.
--- End diff --

"iff the row has been added to the queue."

Can you comment on when a row would be failed to add?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15089: [SPARK-15621] [SQL] Support spilling for Python U...

2016-09-29 Thread davies
Github user davies commented on a diff in the pull request:

https://github.com/apache/spark/pull/15089#discussion_r81228310
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/python/RowQueue.scala ---
@@ -0,0 +1,276 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements.  See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License.  You may obtain a copy of the License at
+*
+*http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.spark.sql.execution.python
+
+import java.io._
+
+import com.google.common.io.Closeables
+
+import org.apache.spark.SparkException
+import org.apache.spark.memory.{MemoryConsumer, TaskMemoryManager}
+import org.apache.spark.sql.catalyst.expressions.UnsafeRow
+import org.apache.spark.unsafe.Platform
+import org.apache.spark.unsafe.memory.MemoryBlock
+
+/**
+ * A RowQueue is an FIFO queue for UnsafeRow.
+ *
+ * This RowQueue is ONLY designed and used for Python UDF, which has only 
one writer and only one
+ * reader, the reader ALWAYS ran behind the writer.
+ */
+private[python] trait RowQueue {
+
+  /**
+   * Add a row to the end of it, returns true iff the row has added into 
it.
+   */
+  def add(row: UnsafeRow): Boolean
+
+  /**
+   * Retrieve and remove the first row, returns null if it's empty.
+   *
+   * It can only be called after add is called.
+   */
+  def remove(): UnsafeRow
+
+  /**
+   * Cleanup all the resources.
+   */
+  def close(): Unit
+}
+
+/**
+ * A RowQueue that is based on in-memory page. UnsafeRows are appended 
into it until it's full.
+ * Another thread could read from it at the same time (behind the writer).
+ *
+ * The format of UnsafeRow in page:
+ * [4 bytes to hold length of record (N)] [N bytes to hold record] [...]
+ */
+private[python] abstract class InMemoryRowQueue(val page: MemoryBlock, 
numFields: Int)
+  extends RowQueue {
+  private val base: AnyRef = page.getBaseObject
+  private val endOfPage: Long = page.getBaseOffset + page.size
+  // the first location where a new row would be written
+  private var writeOffset = page.getBaseOffset
+  // points to the start of the next row to read
+  private var readOffset = page.getBaseOffset
+  private val resultRow = new UnsafeRow(numFields)
+
+  def add(row: UnsafeRow): Boolean = {
+val size = row.getSizeInBytes
+if (writeOffset + 4 + size > endOfPage) {
+  // if there is not enough space in this page to hold the new record
+  if (writeOffset + 4 <= endOfPage) {
+// if there's extra space at the end of the page, store a special 
"end-of-page" length (-1)
+Platform.putInt(base, writeOffset, -1)
+  }
+  false
+} else {
+  Platform.putInt(base, writeOffset, size)
+  Platform.copyMemory(row.getBaseObject, row.getBaseOffset, base, 
writeOffset + 4, size)
+  writeOffset += 4 + size
+  true
+}
+  }
+
+  def remove(): UnsafeRow = {
+if (readOffset + 4 > endOfPage || Platform.getInt(base, readOffset) < 
0) {
+  null
+} else {
+  val size = Platform.getInt(base, readOffset)
+  resultRow.pointTo(base, readOffset + 4, size)
+  readOffset += 4 + size
+  resultRow
+}
+  }
+}
+
+/**
+ * A RowQueue that is backed by a file on disk. This queue will stop 
accepting new rows once any
+ * reader has begun reading from the queue.
+ */
+private[python] case class DiskRowQueue(file: File, fields: Int) extends 
RowQueue {
+  private var out = new DataOutputStream(
+new BufferedOutputStream(new FileOutputStream(file.toString)))
+  private var unreadBytes = 0L
+
+  private var in: DataInputStream = _
+  private val resultRow = new UnsafeRow(fields)
+
+  def add(row: UnsafeRow): Boolean = synchronized {
+if (out == null) {
+  // Another thread is reading, stop writing this one
+  return false
+}
+out.writeInt(row.getSizeInBytes)
+out.write(row.getBytes)
+unreadBytes += 4 + row.getSizeInBytes
+ 

[GitHub] spark pull request #15089: [SPARK-15621] [SQL] Support spilling for Python U...

2016-09-29 Thread lins05
Github user lins05 commented on a diff in the pull request:

https://github.com/apache/spark/pull/15089#discussion_r81208953
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/python/BatchEvalPythonExec.scala
 ---
@@ -17,18 +17,21 @@
 
 package org.apache.spark.sql.execution.python
 
+import java.io.File
+
 import scala.collection.JavaConverters._
 import scala.collection.mutable.ArrayBuffer
 
 import net.razorvine.pickle.{Pickler, Unpickler}
 
-import org.apache.spark.TaskContext
+import org.apache.spark.{SparkEnv, TaskContext}
 import org.apache.spark.api.python.{ChainedPythonFunctions, PythonRunner}
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.execution.SparkPlan
 import org.apache.spark.sql.types.{DataType, StructField, StructType}
+import org.apache.spark.util.{CompletionIterator, Utils}
--- End diff --

Seems the import `CompletionIterator` is not used.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15089: [SPARK-15621] [SQL] Support spilling for Python U...

2016-09-29 Thread lins05
Github user lins05 commented on a diff in the pull request:

https://github.com/apache/spark/pull/15089#discussion_r81212877
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/python/RowQueue.scala ---
@@ -0,0 +1,276 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements.  See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License.  You may obtain a copy of the License at
+*
+*http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.spark.sql.execution.python
+
+import java.io._
+
+import com.google.common.io.Closeables
+
+import org.apache.spark.SparkException
+import org.apache.spark.memory.{MemoryConsumer, TaskMemoryManager}
+import org.apache.spark.sql.catalyst.expressions.UnsafeRow
+import org.apache.spark.unsafe.Platform
+import org.apache.spark.unsafe.memory.MemoryBlock
+
+/**
+ * A RowQueue is an FIFO queue for UnsafeRow.
+ *
+ * This RowQueue is ONLY designed and used for Python UDF, which has only 
one writer and only one
+ * reader, the reader ALWAYS ran behind the writer.
+ */
+private[python] trait RowQueue {
+
+  /**
+   * Add a row to the end of it, returns true iff the row has added into 
it.
+   */
+  def add(row: UnsafeRow): Boolean
+
+  /**
+   * Retrieve and remove the first row, returns null if it's empty.
+   *
+   * It can only be called after add is called.
+   */
+  def remove(): UnsafeRow
+
+  /**
+   * Cleanup all the resources.
+   */
+  def close(): Unit
+}
+
+/**
+ * A RowQueue that is based on in-memory page. UnsafeRows are appended 
into it until it's full.
+ * Another thread could read from it at the same time (behind the writer).
+ *
+ * The format of UnsafeRow in page:
+ * [4 bytes to hold length of record (N)] [N bytes to hold record] [...]
+ */
+private[python] abstract class InMemoryRowQueue(val page: MemoryBlock, 
numFields: Int)
+  extends RowQueue {
+  private val base: AnyRef = page.getBaseObject
+  private val endOfPage: Long = page.getBaseOffset + page.size
+  // the first location where a new row would be written
+  private var writeOffset = page.getBaseOffset
+  // points to the start of the next row to read
+  private var readOffset = page.getBaseOffset
+  private val resultRow = new UnsafeRow(numFields)
+
+  def add(row: UnsafeRow): Boolean = {
+val size = row.getSizeInBytes
+if (writeOffset + 4 + size > endOfPage) {
+  // if there is not enough space in this page to hold the new record
+  if (writeOffset + 4 <= endOfPage) {
+// if there's extra space at the end of the page, store a special 
"end-of-page" length (-1)
+Platform.putInt(base, writeOffset, -1)
+  }
+  false
+} else {
+  Platform.putInt(base, writeOffset, size)
+  Platform.copyMemory(row.getBaseObject, row.getBaseOffset, base, 
writeOffset + 4, size)
+  writeOffset += 4 + size
+  true
+}
+  }
+
+  def remove(): UnsafeRow = {
+if (readOffset + 4 > endOfPage || Platform.getInt(base, readOffset) < 
0) {
+  null
+} else {
+  val size = Platform.getInt(base, readOffset)
+  resultRow.pointTo(base, readOffset + 4, size)
+  readOffset += 4 + size
+  resultRow
+}
+  }
+}
+
+/**
+ * A RowQueue that is backed by a file on disk. This queue will stop 
accepting new rows once any
+ * reader has begun reading from the queue.
+ */
+private[python] case class DiskRowQueue(file: File, fields: Int) extends 
RowQueue {
+  private var out = new DataOutputStream(
+new BufferedOutputStream(new FileOutputStream(file.toString)))
+  private var unreadBytes = 0L
+
+  private var in: DataInputStream = _
+  private val resultRow = new UnsafeRow(fields)
+
+  def add(row: UnsafeRow): Boolean = synchronized {
+if (out == null) {
+  // Another thread is reading, stop writing this one
+  return false
+}
+out.writeInt(row.getSizeInBytes)
+out.write(row.getBytes)
+unreadBytes += 4 + row.getSizeInBytes
+ 

[GitHub] spark pull request #15089: [SPARK-15621] [SQL] Support spilling for Python U...

2016-09-26 Thread davies
Github user davies commented on a diff in the pull request:

https://github.com/apache/spark/pull/15089#discussion_r80532953
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/python/RowQueue.scala ---
@@ -0,0 +1,278 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements.  See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License.  You may obtain a copy of the License at
+*
+*http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.spark.sql.execution.python
+
+import java.io._
+
+import com.google.common.io.Closeables
+
+import org.apache.spark.SparkException
+import org.apache.spark.memory.{MemoryConsumer, TaskMemoryManager}
+import org.apache.spark.sql.catalyst.expressions.UnsafeRow
+import org.apache.spark.unsafe.Platform
+import org.apache.spark.unsafe.memory.MemoryBlock
+
+/**
+ * A RowQueue is an FIFO queue for UnsafeRow.
+ */
+private[python] trait RowQueue {
+  /**
+   * Add a row to the end of it, returns true iff the row has added into 
it.
+   */
+  def add(row: UnsafeRow): Boolean
+
+  /**
+   * Retrieve and remove the first row, returns null if it's empty.
+   *
+   * It can only be called after add is called.
+   */
+  def remove(): UnsafeRow
+
+  /**
+   * Cleanup all the resources.
+   */
+  def close(): Unit
+}
+
+/**
+ * A RowQueue that is based on in-memory page. UnsafeRows are appended 
into it until it's full.
+ * Another thread could read from it at the same time (behind the writer).
+ *
+ * The format of UnsafeRow in page:
+ * [4 bytes to hold length of record (N)] [N bytes to hold record] [...]
+ */
+private[python] abstract class InMemoryRowQueue(val page: MemoryBlock, 
numFields: Int)
+  extends RowQueue {
+  private val base: AnyRef = page.getBaseObject
+  private val endOfPage: Long = page.getBaseOffset + page.size
+  // the first location where a new row would be written
+  private var writeOffset = page.getBaseOffset
+  // points to the start of the next row to read
+  private var readOffset = page.getBaseOffset
+  private val resultRow = new UnsafeRow(numFields)
+
+  def add(row: UnsafeRow): Boolean = {
+val size = row.getSizeInBytes
+if (writeOffset + 4 + size > endOfPage) {
+  // if there is not enough space in this page to hold the new record
+  if (writeOffset + 4 <= endOfPage) {
+// if there's extra space at the end of the page, store a special 
"end-of-page" length (-1)
+Platform.putInt(base, writeOffset, -1)
+  }
+  false
+} else {
+  Platform.putInt(base, writeOffset, size)
+  Platform.copyMemory(row.getBaseObject, row.getBaseOffset, base, 
writeOffset + 4, size)
+  writeOffset += 4 + size
+  true
+}
+  }
+
+  def remove(): UnsafeRow = {
+if (readOffset + 4 > endOfPage || Platform.getInt(base, readOffset) < 
0) {
+  null
+} else {
+  val size = Platform.getInt(base, readOffset)
+  resultRow.pointTo(base, readOffset + 4, size)
+  readOffset += 4 + size
+  resultRow
+}
+  }
+}
+
+/**
+ * A RowQueue that is backed by a file on disk. This queue will stop 
accepting new rows once any
+ * reader has begun reading from the queue.
+ */
+private[python] case class DiskRowQueue(file: File, fields: Int) extends 
RowQueue {
+  private var fout = new FileOutputStream(file.toString)
+  private var out = new DataOutputStream(new BufferedOutputStream(fout))
+  private var unreadBytes = 0L
+
+  private var fin: FileInputStream = _
+  private var in: DataInputStream = _
+  private val resultRow = new UnsafeRow(fields)
+
+  def add(row: UnsafeRow): Boolean = synchronized {
+if (out == null) {
+  // Another thread is reading, stop writing this one
+  return false
+}
+out.writeInt(row.getSizeInBytes)
+out.write(row.getBytes)
+unreadBytes += 4 + row.getSizeInBytes
+true
+  }
+
+  def remove(): UnsafeRow = synchronized {
+if (out != null) {
+  

[GitHub] spark pull request #15089: [SPARK-15621] [SQL] Support spilling for Python U...

2016-09-23 Thread davies
Github user davies commented on a diff in the pull request:

https://github.com/apache/spark/pull/15089#discussion_r80326226
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/python/RowQueue.scala ---
@@ -0,0 +1,278 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements.  See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License.  You may obtain a copy of the License at
+*
+*http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.spark.sql.execution.python
+
+import java.io._
+
+import com.google.common.io.Closeables
+
+import org.apache.spark.SparkException
+import org.apache.spark.memory.{MemoryConsumer, TaskMemoryManager}
+import org.apache.spark.sql.catalyst.expressions.UnsafeRow
+import org.apache.spark.unsafe.Platform
+import org.apache.spark.unsafe.memory.MemoryBlock
+
+/**
+ * A RowQueue is an FIFO queue for UnsafeRow.
+ */
+private[python] trait RowQueue {
+  /**
+   * Add a row to the end of it, returns true iff the row has added into 
it.
+   */
+  def add(row: UnsafeRow): Boolean
+
+  /**
+   * Retrieve and remove the first row, returns null if it's empty.
+   *
+   * It can only be called after add is called.
+   */
+  def remove(): UnsafeRow
+
+  /**
+   * Cleanup all the resources.
+   */
+  def close(): Unit
+}
+
+/**
+ * A RowQueue that is based on in-memory page. UnsafeRows are appended 
into it until it's full.
+ * Another thread could read from it at the same time (behind the writer).
+ *
+ * The format of UnsafeRow in page:
+ * [4 bytes to hold length of record (N)] [N bytes to hold record] [...]
+ */
+private[python] abstract class InMemoryRowQueue(val page: MemoryBlock, 
numFields: Int)
+  extends RowQueue {
+  private val base: AnyRef = page.getBaseObject
+  private val endOfPage: Long = page.getBaseOffset + page.size
+  // the first location where a new row would be written
+  private var writeOffset = page.getBaseOffset
+  // points to the start of the next row to read
+  private var readOffset = page.getBaseOffset
+  private val resultRow = new UnsafeRow(numFields)
+
+  def add(row: UnsafeRow): Boolean = {
+val size = row.getSizeInBytes
+if (writeOffset + 4 + size > endOfPage) {
+  // if there is not enough space in this page to hold the new record
+  if (writeOffset + 4 <= endOfPage) {
+// if there's extra space at the end of the page, store a special 
"end-of-page" length (-1)
+Platform.putInt(base, writeOffset, -1)
+  }
+  false
+} else {
+  Platform.putInt(base, writeOffset, size)
+  Platform.copyMemory(row.getBaseObject, row.getBaseOffset, base, 
writeOffset + 4, size)
+  writeOffset += 4 + size
+  true
+}
+  }
+
+  def remove(): UnsafeRow = {
+if (readOffset + 4 > endOfPage || Platform.getInt(base, readOffset) < 
0) {
+  null
+} else {
+  val size = Platform.getInt(base, readOffset)
+  resultRow.pointTo(base, readOffset + 4, size)
+  readOffset += 4 + size
+  resultRow
+}
+  }
+}
+
+/**
+ * A RowQueue that is backed by a file on disk. This queue will stop 
accepting new rows once any
+ * reader has begun reading from the queue.
+ */
+private[python] case class DiskRowQueue(file: File, fields: Int) extends 
RowQueue {
+  private var fout = new FileOutputStream(file.toString)
+  private var out = new DataOutputStream(new BufferedOutputStream(fout))
+  private var unreadBytes = 0L
+
+  private var fin: FileInputStream = _
+  private var in: DataInputStream = _
+  private val resultRow = new UnsafeRow(fields)
+
+  def add(row: UnsafeRow): Boolean = synchronized {
+if (out == null) {
+  // Another thread is reading, stop writing this one
+  return false
+}
+out.writeInt(row.getSizeInBytes)
+out.write(row.getBytes)
+unreadBytes += 4 + row.getSizeInBytes
+true
+  }
+
+  def remove(): UnsafeRow = synchronized {
+if (out != null) {
+  

[GitHub] spark pull request #15089: [SPARK-15621] [SQL] Support spilling for Python U...

2016-09-22 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/15089#discussion_r80174300
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/python/RowQueue.scala ---
@@ -0,0 +1,278 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements.  See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License.  You may obtain a copy of the License at
+*
+*http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.spark.sql.execution.python
+
+import java.io._
+
+import com.google.common.io.Closeables
+
+import org.apache.spark.SparkException
+import org.apache.spark.memory.{MemoryConsumer, TaskMemoryManager}
+import org.apache.spark.sql.catalyst.expressions.UnsafeRow
+import org.apache.spark.unsafe.Platform
+import org.apache.spark.unsafe.memory.MemoryBlock
+
+/**
+ * A RowQueue is an FIFO queue for UnsafeRow.
+ */
+private[python] trait RowQueue {
+  /**
+   * Add a row to the end of it, returns true iff the row has added into 
it.
+   */
+  def add(row: UnsafeRow): Boolean
+
+  /**
+   * Retrieve and remove the first row, returns null if it's empty.
+   *
+   * It can only be called after add is called.
+   */
+  def remove(): UnsafeRow
+
+  /**
+   * Cleanup all the resources.
+   */
+  def close(): Unit
+}
+
+/**
+ * A RowQueue that is based on in-memory page. UnsafeRows are appended 
into it until it's full.
+ * Another thread could read from it at the same time (behind the writer).
+ *
+ * The format of UnsafeRow in page:
+ * [4 bytes to hold length of record (N)] [N bytes to hold record] [...]
+ */
+private[python] abstract class InMemoryRowQueue(val page: MemoryBlock, 
numFields: Int)
+  extends RowQueue {
+  private val base: AnyRef = page.getBaseObject
+  private val endOfPage: Long = page.getBaseOffset + page.size
+  // the first location where a new row would be written
+  private var writeOffset = page.getBaseOffset
+  // points to the start of the next row to read
+  private var readOffset = page.getBaseOffset
+  private val resultRow = new UnsafeRow(numFields)
+
+  def add(row: UnsafeRow): Boolean = {
+val size = row.getSizeInBytes
+if (writeOffset + 4 + size > endOfPage) {
+  // if there is not enough space in this page to hold the new record
+  if (writeOffset + 4 <= endOfPage) {
+// if there's extra space at the end of the page, store a special 
"end-of-page" length (-1)
+Platform.putInt(base, writeOffset, -1)
+  }
+  false
+} else {
+  Platform.putInt(base, writeOffset, size)
+  Platform.copyMemory(row.getBaseObject, row.getBaseOffset, base, 
writeOffset + 4, size)
+  writeOffset += 4 + size
+  true
+}
+  }
+
+  def remove(): UnsafeRow = {
+if (readOffset + 4 > endOfPage || Platform.getInt(base, readOffset) < 
0) {
+  null
+} else {
+  val size = Platform.getInt(base, readOffset)
+  resultRow.pointTo(base, readOffset + 4, size)
+  readOffset += 4 + size
+  resultRow
+}
+  }
+}
+
+/**
+ * A RowQueue that is backed by a file on disk. This queue will stop 
accepting new rows once any
+ * reader has begun reading from the queue.
+ */
+private[python] case class DiskRowQueue(file: File, fields: Int) extends 
RowQueue {
+  private var fout = new FileOutputStream(file.toString)
+  private var out = new DataOutputStream(new BufferedOutputStream(fout))
+  private var unreadBytes = 0L
+
+  private var fin: FileInputStream = _
+  private var in: DataInputStream = _
+  private val resultRow = new UnsafeRow(fields)
+
+  def add(row: UnsafeRow): Boolean = synchronized {
+if (out == null) {
+  // Another thread is reading, stop writing this one
+  return false
+}
+out.writeInt(row.getSizeInBytes)
+out.write(row.getBytes)
+unreadBytes += 4 + row.getSizeInBytes
+true
+  }
+
+  def remove(): UnsafeRow = synchronized {
+if (out != null) {
+  

[GitHub] spark pull request #15089: [SPARK-15621] [SQL] Support spilling for Python U...

2016-09-22 Thread JoshRosen
Github user JoshRosen commented on a diff in the pull request:

https://github.com/apache/spark/pull/15089#discussion_r80128271
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/python/BatchEvalPythonExec.scala
 ---
@@ -17,18 +17,270 @@
 
 package org.apache.spark.sql.execution.python
 
+import java.io._
+
 import scala.collection.JavaConverters._
 import scala.collection.mutable.ArrayBuffer
 
 import net.razorvine.pickle.{Pickler, Unpickler}
 
-import org.apache.spark.TaskContext
+import org.apache.spark.{SparkEnv, TaskContext}
 import org.apache.spark.api.python.{ChainedPythonFunctions, PythonRunner}
+import org.apache.spark.memory.{MemoryConsumer, TaskMemoryManager}
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.execution.SparkPlan
 import org.apache.spark.sql.types.{DataType, StructField, StructType}
+import org.apache.spark.unsafe.Platform
+import org.apache.spark.unsafe.memory.MemoryBlock
+import org.apache.spark.util.{CompletionIterator, Utils}
+
+
+/**
+ * A RowQueue is an FIFO queue for UnsafeRow.
+ */
+private[python] trait RowQueue {
+  /**
+   * Add a row to the end of it, returns true iff the row has added into 
it.
+   */
+  def add(row: UnsafeRow): Boolean
+
+  /**
+   * Retrieve and remove the first row, returns null if it's empty.
+   */
+  def remove(): UnsafeRow
+
+  /**
+   * Cleanup all the resources.
+   */
+  def close(): Unit
+}
+
+/**
+ * A RowQueue that is based on in-memory page. UnsafeRows are appended 
into it until it's full.
+ * Another thread could read from it at the same time (behind the writer).
+ */
+private[python] case class InMemoryRowQueue(page: MemoryBlock, fields: 
Int) extends RowQueue {
+  private val base: AnyRef = page.getBaseObject
+  private var last = page.getBaseOffset  // for writing
+  private var first = page.getBaseOffset  // for reading
+  private val resultRow = new UnsafeRow(fields)
+
+  def add(row: UnsafeRow): Boolean = {
+if (last + 4 + row.getSizeInBytes > page.getBaseOffset + page.size) {
+  if (last + 4 <= page.getBaseOffset + page.size) {
+Platform.putInt(base, last, -1)
+  }
+  return false
+}
+Platform.putInt(base, last, row.getSizeInBytes)
+Platform.copyMemory(row.getBaseObject, row.getBaseOffset, base, last + 
4, row.getSizeInBytes)
+last += 4 + row.getSizeInBytes
+true
+  }
+
+  def remove(): UnsafeRow = {
+if (first + 4 > page.getBaseOffset + page.size || 
Platform.getInt(base, first) < 0) {
--- End diff --

Should we add a variable to enforce this? It seems like it would be pretty 
cheap to add a check and in the past I think that guarding against this type of 
accidental misuse would have prevented subtle bugs.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15089: [SPARK-15621] [SQL] Support spilling for Python U...

2016-09-22 Thread JoshRosen
Github user JoshRosen commented on a diff in the pull request:

https://github.com/apache/spark/pull/15089#discussion_r80129158
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/python/BatchEvalPythonExec.scala
 ---
@@ -17,18 +17,270 @@
 
 package org.apache.spark.sql.execution.python
 
+import java.io._
+
 import scala.collection.JavaConverters._
 import scala.collection.mutable.ArrayBuffer
 
 import net.razorvine.pickle.{Pickler, Unpickler}
 
-import org.apache.spark.TaskContext
+import org.apache.spark.{SparkEnv, TaskContext}
 import org.apache.spark.api.python.{ChainedPythonFunctions, PythonRunner}
+import org.apache.spark.memory.{MemoryConsumer, TaskMemoryManager}
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.execution.SparkPlan
 import org.apache.spark.sql.types.{DataType, StructField, StructType}
+import org.apache.spark.unsafe.Platform
+import org.apache.spark.unsafe.memory.MemoryBlock
+import org.apache.spark.util.{CompletionIterator, Utils}
+
+
+/**
+ * A RowQueue is an FIFO queue for UnsafeRow.
+ */
+private[python] trait RowQueue {
+  /**
+   * Add a row to the end of it, returns true iff the row has added into 
it.
+   */
+  def add(row: UnsafeRow): Boolean
+
+  /**
+   * Retrieve and remove the first row, returns null if it's empty.
+   */
+  def remove(): UnsafeRow
+
+  /**
+   * Cleanup all the resources.
+   */
+  def close(): Unit
+}
+
+/**
+ * A RowQueue that is based on in-memory page. UnsafeRows are appended 
into it until it's full.
+ * Another thread could read from it at the same time (behind the writer).
+ */
+private[python] case class InMemoryRowQueue(page: MemoryBlock, fields: 
Int) extends RowQueue {
+  private val base: AnyRef = page.getBaseObject
+  private var last = page.getBaseOffset  // for writing
+  private var first = page.getBaseOffset  // for reading
+  private val resultRow = new UnsafeRow(fields)
+
+  def add(row: UnsafeRow): Boolean = {
+if (last + 4 + row.getSizeInBytes > page.getBaseOffset + page.size) {
+  if (last + 4 <= page.getBaseOffset + page.size) {
+Platform.putInt(base, last, -1)
+  }
+  return false
+}
+Platform.putInt(base, last, row.getSizeInBytes)
+Platform.copyMemory(row.getBaseObject, row.getBaseOffset, base, last + 
4, row.getSizeInBytes)
+last += 4 + row.getSizeInBytes
+true
+  }
+
+  def remove(): UnsafeRow = {
+if (first + 4 > page.getBaseOffset + page.size || 
Platform.getInt(base, first) < 0) {
+  null
+} else {
+  val size = Platform.getInt(base, first)
+  resultRow.pointTo(base, first + 4, size)
+  first += 4 + size
+  resultRow
+}
+  }
+
+  def close(): Unit = {
+// caller should override close() to free page
+  }
+}
+
+/**
+ * A RowQueue that is based on file in disk. It will stop to push once 
someone start to read
+ * from it.
+ */
+private[python] case class DiskRowQueue(path: String, fields: Int) extends 
RowQueue {
+  private var fout = new FileOutputStream(path)
+  private var out = new DataOutputStream(new BufferedOutputStream(fout))
+  private var length = 0L
+
+  private var fin: FileInputStream = _
+  private var in: DataInputStream = _
+  private val resultRow = new UnsafeRow(fields)
+
+  def add(row: UnsafeRow): Boolean = {
+synchronized {
+  if (out == null) {
+// Another thread is reading, stop writing this one
+return false
+  }
+}
+out.writeInt(row.getSizeInBytes)
+out.write(row.getBytes)
+length += 4 + row.getSizeInBytes
+true
+  }
+
+  def remove(): UnsafeRow = {
+synchronized {
+  if (out != null) {
+out.flush()
+out.close()
+out = null
+fout.close()
+fout = null
+
+fin = new FileInputStream(path)
+in = new DataInputStream(new BufferedInputStream(fin))
--- End diff --

`DataInputStream` extends `FilterInputStream(InputStream)` and that class's 
`close()` method just calls the wrapped / underlying stream's `close()`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request #15089: [SPARK-15621] [SQL] Support spilling for Python U...

2016-09-22 Thread JoshRosen
Github user JoshRosen commented on a diff in the pull request:

https://github.com/apache/spark/pull/15089#discussion_r80130429
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/python/RowQueue.scala ---
@@ -0,0 +1,278 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements.  See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License.  You may obtain a copy of the License at
+*
+*http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.spark.sql.execution.python
+
+import java.io._
+
+import com.google.common.io.Closeables
+
+import org.apache.spark.SparkException
+import org.apache.spark.memory.{MemoryConsumer, TaskMemoryManager}
+import org.apache.spark.sql.catalyst.expressions.UnsafeRow
+import org.apache.spark.unsafe.Platform
+import org.apache.spark.unsafe.memory.MemoryBlock
+
+/**
+ * A RowQueue is an FIFO queue for UnsafeRow.
+ */
+private[python] trait RowQueue {
+  /**
+   * Add a row to the end of it, returns true iff the row has added into 
it.
+   */
+  def add(row: UnsafeRow): Boolean
+
+  /**
+   * Retrieve and remove the first row, returns null if it's empty.
+   *
+   * It can only be called after add is called.
+   */
+  def remove(): UnsafeRow
+
+  /**
+   * Cleanup all the resources.
+   */
+  def close(): Unit
+}
+
+/**
+ * A RowQueue that is based on in-memory page. UnsafeRows are appended 
into it until it's full.
+ * Another thread could read from it at the same time (behind the writer).
+ *
+ * The format of UnsafeRow in page:
+ * [4 bytes to hold length of record (N)] [N bytes to hold record] [...]
+ */
+private[python] abstract class InMemoryRowQueue(val page: MemoryBlock, 
numFields: Int)
+  extends RowQueue {
+  private val base: AnyRef = page.getBaseObject
+  private val endOfPage: Long = page.getBaseOffset + page.size
+  // the first location where a new row would be written
+  private var writeOffset = page.getBaseOffset
+  // points to the start of the next row to read
+  private var readOffset = page.getBaseOffset
+  private val resultRow = new UnsafeRow(numFields)
+
+  def add(row: UnsafeRow): Boolean = {
+val size = row.getSizeInBytes
+if (writeOffset + 4 + size > endOfPage) {
+  // if there is not enough space in this page to hold the new record
+  if (writeOffset + 4 <= endOfPage) {
+// if there's extra space at the end of the page, store a special 
"end-of-page" length (-1)
+Platform.putInt(base, writeOffset, -1)
+  }
+  false
+} else {
+  Platform.putInt(base, writeOffset, size)
+  Platform.copyMemory(row.getBaseObject, row.getBaseOffset, base, 
writeOffset + 4, size)
+  writeOffset += 4 + size
+  true
+}
+  }
+
+  def remove(): UnsafeRow = {
+if (readOffset + 4 > endOfPage || Platform.getInt(base, readOffset) < 
0) {
+  null
+} else {
+  val size = Platform.getInt(base, readOffset)
+  resultRow.pointTo(base, readOffset + 4, size)
+  readOffset += 4 + size
+  resultRow
+}
+  }
+}
+
+/**
+ * A RowQueue that is backed by a file on disk. This queue will stop 
accepting new rows once any
+ * reader has begun reading from the queue.
+ */
+private[python] case class DiskRowQueue(file: File, fields: Int) extends 
RowQueue {
+  private var fout = new FileOutputStream(file.toString)
+  private var out = new DataOutputStream(new BufferedOutputStream(fout))
+  private var unreadBytes = 0L
+
+  private var fin: FileInputStream = _
+  private var in: DataInputStream = _
+  private val resultRow = new UnsafeRow(fields)
+
+  def add(row: UnsafeRow): Boolean = synchronized {
+if (out == null) {
+  // Another thread is reading, stop writing this one
+  return false
+}
+out.writeInt(row.getSizeInBytes)
+out.write(row.getBytes)
+unreadBytes += 4 + row.getSizeInBytes
+true
+  }
+
+  def remove(): UnsafeRow = synchronized {
+if (out != null) {
+   

[GitHub] spark pull request #15089: [SPARK-15621] [SQL] Support spilling for Python U...

2016-09-16 Thread davies
Github user davies commented on a diff in the pull request:

https://github.com/apache/spark/pull/15089#discussion_r79253457
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/python/BatchEvalPythonExec.scala
 ---
@@ -17,18 +17,270 @@
 
 package org.apache.spark.sql.execution.python
 
+import java.io._
+
 import scala.collection.JavaConverters._
 import scala.collection.mutable.ArrayBuffer
 
 import net.razorvine.pickle.{Pickler, Unpickler}
 
-import org.apache.spark.TaskContext
+import org.apache.spark.{SparkEnv, TaskContext}
 import org.apache.spark.api.python.{ChainedPythonFunctions, PythonRunner}
+import org.apache.spark.memory.{MemoryConsumer, TaskMemoryManager}
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.execution.SparkPlan
 import org.apache.spark.sql.types.{DataType, StructField, StructType}
+import org.apache.spark.unsafe.Platform
+import org.apache.spark.unsafe.memory.MemoryBlock
+import org.apache.spark.util.{CompletionIterator, Utils}
+
+
+/**
+ * A RowQueue is an FIFO queue for UnsafeRow.
+ */
+private[python] trait RowQueue {
+  /**
+   * Add a row to the end of it, returns true iff the row has added into 
it.
+   */
+  def add(row: UnsafeRow): Boolean
+
+  /**
+   * Retrieve and remove the first row, returns null if it's empty.
+   */
+  def remove(): UnsafeRow
+
+  /**
+   * Cleanup all the resources.
+   */
+  def close(): Unit
+}
+
+/**
+ * A RowQueue that is based on in-memory page. UnsafeRows are appended 
into it until it's full.
+ * Another thread could read from it at the same time (behind the writer).
+ */
+private[python] case class InMemoryRowQueue(page: MemoryBlock, fields: 
Int) extends RowQueue {
+  private val base: AnyRef = page.getBaseObject
+  private var last = page.getBaseOffset  // for writing
+  private var first = page.getBaseOffset  // for reading
+  private val resultRow = new UnsafeRow(fields)
+
+  def add(row: UnsafeRow): Boolean = {
+if (last + 4 + row.getSizeInBytes > page.getBaseOffset + page.size) {
+  if (last + 4 <= page.getBaseOffset + page.size) {
+Platform.putInt(base, last, -1)
+  }
+  return false
+}
+Platform.putInt(base, last, row.getSizeInBytes)
+Platform.copyMemory(row.getBaseObject, row.getBaseOffset, base, last + 
4, row.getSizeInBytes)
+last += 4 + row.getSizeInBytes
+true
+  }
+
+  def remove(): UnsafeRow = {
+if (first + 4 > page.getBaseOffset + page.size || 
Platform.getInt(base, first) < 0) {
+  null
+} else {
+  val size = Platform.getInt(base, first)
+  resultRow.pointTo(base, first + 4, size)
+  first += 4 + size
+  resultRow
+}
+  }
+
+  def close(): Unit = {
+// caller should override close() to free page
+  }
+}
+
+/**
+ * A RowQueue that is based on file in disk. It will stop to push once 
someone start to read
+ * from it.
+ */
+private[python] case class DiskRowQueue(path: String, fields: Int) extends 
RowQueue {
+  private var fout = new FileOutputStream(path)
+  private var out = new DataOutputStream(new BufferedOutputStream(fout))
+  private var length = 0L
+
+  private var fin: FileInputStream = _
+  private var in: DataInputStream = _
+  private val resultRow = new UnsafeRow(fields)
+
+  def add(row: UnsafeRow): Boolean = {
+synchronized {
+  if (out == null) {
+// Another thread is reading, stop writing this one
+return false
+  }
+}
+out.writeInt(row.getSizeInBytes)
+out.write(row.getBytes)
+length += 4 + row.getSizeInBytes
+true
+  }
+
+  def remove(): UnsafeRow = {
+synchronized {
+  if (out != null) {
+out.flush()
+out.close()
+out = null
+fout.close()
+fout = null
+
+fin = new FileInputStream(path)
+in = new DataInputStream(new BufferedInputStream(fin))
+  }
+}
+
+if (length > 0) {
+  val size = in.readInt()
+  assert(4 + size <= length, s"require ${4 + size} bytes for next row")
+  val bytes = new Array[Byte](size)
+  in.readFully(bytes)
+  length -= 4 + size
+  resultRow.pointTo(bytes, size)
+  resultRow
+} else {
+  null
+}
+  }
+
+  def close(): Unit = {
+synchronized {
+  if (fout != null) {
+fout.close()
+fout = null
+  }
+  if (fin != 

[GitHub] spark pull request #15089: [SPARK-15621] [SQL] Support spilling for Python U...

2016-09-16 Thread davies
Github user davies commented on a diff in the pull request:

https://github.com/apache/spark/pull/15089#discussion_r79253378
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/python/BatchEvalPythonExec.scala
 ---
@@ -17,18 +17,270 @@
 
 package org.apache.spark.sql.execution.python
 
+import java.io._
+
 import scala.collection.JavaConverters._
 import scala.collection.mutable.ArrayBuffer
 
 import net.razorvine.pickle.{Pickler, Unpickler}
 
-import org.apache.spark.TaskContext
+import org.apache.spark.{SparkEnv, TaskContext}
 import org.apache.spark.api.python.{ChainedPythonFunctions, PythonRunner}
+import org.apache.spark.memory.{MemoryConsumer, TaskMemoryManager}
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.execution.SparkPlan
 import org.apache.spark.sql.types.{DataType, StructField, StructType}
+import org.apache.spark.unsafe.Platform
+import org.apache.spark.unsafe.memory.MemoryBlock
+import org.apache.spark.util.{CompletionIterator, Utils}
+
+
+/**
+ * A RowQueue is an FIFO queue for UnsafeRow.
+ */
+private[python] trait RowQueue {
+  /**
+   * Add a row to the end of it, returns true iff the row has added into 
it.
+   */
+  def add(row: UnsafeRow): Boolean
+
+  /**
+   * Retrieve and remove the first row, returns null if it's empty.
+   */
+  def remove(): UnsafeRow
+
+  /**
+   * Cleanup all the resources.
+   */
+  def close(): Unit
+}
+
+/**
+ * A RowQueue that is based on in-memory page. UnsafeRows are appended 
into it until it's full.
+ * Another thread could read from it at the same time (behind the writer).
+ */
+private[python] case class InMemoryRowQueue(page: MemoryBlock, fields: 
Int) extends RowQueue {
+  private val base: AnyRef = page.getBaseObject
+  private var last = page.getBaseOffset  // for writing
+  private var first = page.getBaseOffset  // for reading
+  private val resultRow = new UnsafeRow(fields)
+
+  def add(row: UnsafeRow): Boolean = {
+if (last + 4 + row.getSizeInBytes > page.getBaseOffset + page.size) {
+  if (last + 4 <= page.getBaseOffset + page.size) {
+Platform.putInt(base, last, -1)
+  }
+  return false
+}
+Platform.putInt(base, last, row.getSizeInBytes)
+Platform.copyMemory(row.getBaseObject, row.getBaseOffset, base, last + 
4, row.getSizeInBytes)
+last += 4 + row.getSizeInBytes
+true
+  }
+
+  def remove(): UnsafeRow = {
+if (first + 4 > page.getBaseOffset + page.size || 
Platform.getInt(base, first) < 0) {
+  null
+} else {
+  val size = Platform.getInt(base, first)
+  resultRow.pointTo(base, first + 4, size)
+  first += 4 + size
+  resultRow
+}
+  }
+
+  def close(): Unit = {
+// caller should override close() to free page
+  }
+}
+
+/**
+ * A RowQueue that is based on file in disk. It will stop to push once 
someone start to read
+ * from it.
+ */
+private[python] case class DiskRowQueue(path: String, fields: Int) extends 
RowQueue {
+  private var fout = new FileOutputStream(path)
+  private var out = new DataOutputStream(new BufferedOutputStream(fout))
+  private var length = 0L
+
+  private var fin: FileInputStream = _
+  private var in: DataInputStream = _
+  private val resultRow = new UnsafeRow(fields)
+
+  def add(row: UnsafeRow): Boolean = {
+synchronized {
+  if (out == null) {
+// Another thread is reading, stop writing this one
+return false
+  }
+}
+out.writeInt(row.getSizeInBytes)
+out.write(row.getBytes)
+length += 4 + row.getSizeInBytes
+true
+  }
+
+  def remove(): UnsafeRow = {
+synchronized {
+  if (out != null) {
+out.flush()
+out.close()
+out = null
+fout.close()
+fout = null
+
+fin = new FileInputStream(path)
+in = new DataInputStream(new BufferedInputStream(fin))
+  }
+}
+
+if (length > 0) {
+  val size = in.readInt()
+  assert(4 + size <= length, s"require ${4 + size} bytes for next row")
+  val bytes = new Array[Byte](size)
+  in.readFully(bytes)
+  length -= 4 + size
+  resultRow.pointTo(bytes, size)
+  resultRow
+} else {
+  null
+}
+  }
+
+  def close(): Unit = {
+synchronized {
+  if (fout != null) {
+fout.close()
+fout = null
+  }
+  if (fin != 

[GitHub] spark pull request #15089: [SPARK-15621] [SQL] Support spilling for Python U...

2016-09-15 Thread davies
Github user davies commented on a diff in the pull request:

https://github.com/apache/spark/pull/15089#discussion_r79038500
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/python/BatchEvalPythonExec.scala
 ---
@@ -17,18 +17,270 @@
 
 package org.apache.spark.sql.execution.python
 
+import java.io._
+
 import scala.collection.JavaConverters._
 import scala.collection.mutable.ArrayBuffer
 
 import net.razorvine.pickle.{Pickler, Unpickler}
 
-import org.apache.spark.TaskContext
+import org.apache.spark.{SparkEnv, TaskContext}
 import org.apache.spark.api.python.{ChainedPythonFunctions, PythonRunner}
+import org.apache.spark.memory.{MemoryConsumer, TaskMemoryManager}
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.execution.SparkPlan
 import org.apache.spark.sql.types.{DataType, StructField, StructType}
+import org.apache.spark.unsafe.Platform
+import org.apache.spark.unsafe.memory.MemoryBlock
+import org.apache.spark.util.{CompletionIterator, Utils}
+
+
+/**
+ * A RowQueue is an FIFO queue for UnsafeRow.
+ */
+private[python] trait RowQueue {
+  /**
+   * Add a row to the end of it, returns true iff the row has added into 
it.
+   */
+  def add(row: UnsafeRow): Boolean
+
+  /**
+   * Retrieve and remove the first row, returns null if it's empty.
+   */
+  def remove(): UnsafeRow
+
+  /**
+   * Cleanup all the resources.
+   */
+  def close(): Unit
+}
+
+/**
+ * A RowQueue that is based on in-memory page. UnsafeRows are appended 
into it until it's full.
+ * Another thread could read from it at the same time (behind the writer).
+ */
+private[python] case class InMemoryRowQueue(page: MemoryBlock, fields: 
Int) extends RowQueue {
+  private val base: AnyRef = page.getBaseObject
+  private var last = page.getBaseOffset  // for writing
+  private var first = page.getBaseOffset  // for reading
+  private val resultRow = new UnsafeRow(fields)
+
+  def add(row: UnsafeRow): Boolean = {
+if (last + 4 + row.getSizeInBytes > page.getBaseOffset + page.size) {
+  if (last + 4 <= page.getBaseOffset + page.size) {
+Platform.putInt(base, last, -1)
+  }
+  return false
+}
+Platform.putInt(base, last, row.getSizeInBytes)
+Platform.copyMemory(row.getBaseObject, row.getBaseOffset, base, last + 
4, row.getSizeInBytes)
+last += 4 + row.getSizeInBytes
+true
+  }
+
+  def remove(): UnsafeRow = {
+if (first + 4 > page.getBaseOffset + page.size || 
Platform.getInt(base, first) < 0) {
+  null
+} else {
+  val size = Platform.getInt(base, first)
+  resultRow.pointTo(base, first + 4, size)
+  first += 4 + size
+  resultRow
+}
+  }
+
+  def close(): Unit = {
+// caller should override close() to free page
+  }
+}
+
+/**
+ * A RowQueue that is based on file in disk. It will stop to push once 
someone start to read
+ * from it.
+ */
+private[python] case class DiskRowQueue(path: String, fields: Int) extends 
RowQueue {
+  private var fout = new FileOutputStream(path)
+  private var out = new DataOutputStream(new BufferedOutputStream(fout))
+  private var length = 0L
+
+  private var fin: FileInputStream = _
+  private var in: DataInputStream = _
+  private val resultRow = new UnsafeRow(fields)
+
+  def add(row: UnsafeRow): Boolean = {
+synchronized {
+  if (out == null) {
+// Another thread is reading, stop writing this one
+return false
+  }
+}
+out.writeInt(row.getSizeInBytes)
+out.write(row.getBytes)
+length += 4 + row.getSizeInBytes
+true
+  }
+
+  def remove(): UnsafeRow = {
+synchronized {
+  if (out != null) {
+out.flush()
+out.close()
+out = null
+fout.close()
+fout = null
+
+fin = new FileInputStream(path)
+in = new DataInputStream(new BufferedInputStream(fin))
--- End diff --

the close of in will not call fin.close(), I think.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: 

[GitHub] spark pull request #15089: [SPARK-15621] [SQL] Support spilling for Python U...

2016-09-15 Thread davies
Github user davies commented on a diff in the pull request:

https://github.com/apache/spark/pull/15089#discussion_r79037083
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/python/BatchEvalPythonExec.scala
 ---
@@ -17,18 +17,270 @@
 
 package org.apache.spark.sql.execution.python
 
+import java.io._
+
 import scala.collection.JavaConverters._
 import scala.collection.mutable.ArrayBuffer
 
 import net.razorvine.pickle.{Pickler, Unpickler}
 
-import org.apache.spark.TaskContext
+import org.apache.spark.{SparkEnv, TaskContext}
 import org.apache.spark.api.python.{ChainedPythonFunctions, PythonRunner}
+import org.apache.spark.memory.{MemoryConsumer, TaskMemoryManager}
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.execution.SparkPlan
 import org.apache.spark.sql.types.{DataType, StructField, StructType}
+import org.apache.spark.unsafe.Platform
+import org.apache.spark.unsafe.memory.MemoryBlock
+import org.apache.spark.util.{CompletionIterator, Utils}
+
+
+/**
+ * A RowQueue is an FIFO queue for UnsafeRow.
+ */
+private[python] trait RowQueue {
+  /**
+   * Add a row to the end of it, returns true iff the row has added into 
it.
+   */
+  def add(row: UnsafeRow): Boolean
+
+  /**
+   * Retrieve and remove the first row, returns null if it's empty.
+   */
+  def remove(): UnsafeRow
+
+  /**
+   * Cleanup all the resources.
+   */
+  def close(): Unit
+}
+
+/**
+ * A RowQueue that is based on in-memory page. UnsafeRows are appended 
into it until it's full.
+ * Another thread could read from it at the same time (behind the writer).
+ */
+private[python] case class InMemoryRowQueue(page: MemoryBlock, fields: 
Int) extends RowQueue {
+  private val base: AnyRef = page.getBaseObject
+  private var last = page.getBaseOffset  // for writing
+  private var first = page.getBaseOffset  // for reading
+  private val resultRow = new UnsafeRow(fields)
+
+  def add(row: UnsafeRow): Boolean = {
+if (last + 4 + row.getSizeInBytes > page.getBaseOffset + page.size) {
+  if (last + 4 <= page.getBaseOffset + page.size) {
+Platform.putInt(base, last, -1)
+  }
+  return false
+}
+Platform.putInt(base, last, row.getSizeInBytes)
+Platform.copyMemory(row.getBaseObject, row.getBaseOffset, base, last + 
4, row.getSizeInBytes)
+last += 4 + row.getSizeInBytes
+true
+  }
+
+  def remove(): UnsafeRow = {
+if (first + 4 > page.getBaseOffset + page.size || 
Platform.getInt(base, first) < 0) {
--- End diff --

Remove() should only be called when we know that there are at least one row 
in it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15089: [SPARK-15621] [SQL] Support spilling for Python U...

2016-09-15 Thread davies
Github user davies commented on a diff in the pull request:

https://github.com/apache/spark/pull/15089#discussion_r79034499
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/python/BatchEvalPythonExec.scala
 ---
@@ -98,7 +353,7 @@ case class BatchEvalPythonExec(udfs: Seq[PythonUDF], 
output: Seq[Attribute], chi
   // For each row, add it to the queue.
   val inputIterator = iter.grouped(100).map { inputRows =>
 val toBePickled = inputRows.map { inputRow =>
-  queue.add(inputRow)
+  queue.add(inputRow.asInstanceOf[UnsafeRow])
--- End diff --

No, we always use UnsafeRow between operators.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15089: [SPARK-15621] [SQL] Support spilling for Python U...

2016-09-13 Thread davies
GitHub user davies opened a pull request:

https://github.com/apache/spark/pull/15089

[SPARK-15621]  [SQL] Support spilling for Python UDF

## What changes were proposed in this pull request?

When execute a Python UDF, we buffer the input row into as queue, then pull 
them out to join with the result from Python UDF. In the case that Python UDF 
is slow or the input row is too wide, we could ran out of memory because of the 
queue. Since we can't flush all the buffers (sockets) between JVM and Python 
process from JVM side, we can't limit the rows in the queue, otherwise it could 
deadlock.

This PR will manage the memory used by the queue, spill that into disk when 
there is no enough memory (also release the memory and disk space as soon as 
possible). 

## How was this patch tested?

Added unit tests. Also manually ran a workload with large input row and 
slow python UDF (with  large broadcast) like this:

```
b = range(1<<24)
add = udf(lambda x: x + len(b), IntegerType())
df = sqlContext.range(1, 1<<26, 1, 4)
print df.select(df.id, lit("adf"*1).alias("s"), 
add(df.id).alias("add")).groupBy(length("s")).sum().collect()
```

It ran out of memory (hang because of full GC) before the patch, ran 
smoothly after the patch.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/davies/spark spill_udf

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/15089.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #15089


commit 4964b9a611ed01aaa5252ac642df94db07a38868
Author: Davies Liu 
Date:   2016-09-13T23:47:31Z

spill the buffer for Python UDF into disk




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org