Github user davies commented on a diff in the pull request:
https://github.com/apache/spark/pull/15089#discussion_r79037083
--- Diff:
sql/core/src/main/scala/org/apache/spark/sql/execution/python/BatchEvalPythonExec.scala
---
@@ -17,18 +17,270 @@
package org.apache.spark.sql.execution.python
+import java.io._
+
import scala.collection.JavaConverters._
import scala.collection.mutable.ArrayBuffer
import net.razorvine.pickle.{Pickler, Unpickler}
-import org.apache.spark.TaskContext
+import org.apache.spark.{SparkEnv, TaskContext}
import org.apache.spark.api.python.{ChainedPythonFunctions, PythonRunner}
+import org.apache.spark.memory.{MemoryConsumer, TaskMemoryManager}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.types.{DataType, StructField, StructType}
+import org.apache.spark.unsafe.Platform
+import org.apache.spark.unsafe.memory.MemoryBlock
+import org.apache.spark.util.{CompletionIterator, Utils}
+
+
+/**
+ * A RowQueue is an FIFO queue for UnsafeRow.
+ */
+private[python] trait RowQueue {
+ /**
+ * Add a row to the end of it, returns true iff the row has added into
it.
+ */
+ def add(row: UnsafeRow): Boolean
+
+ /**
+ * Retrieve and remove the first row, returns null if it's empty.
+ */
+ def remove(): UnsafeRow
+
+ /**
+ * Cleanup all the resources.
+ */
+ def close(): Unit
+}
+
+/**
+ * A RowQueue that is based on in-memory page. UnsafeRows are appended
into it until it's full.
+ * Another thread could read from it at the same time (behind the writer).
+ */
+private[python] case class InMemoryRowQueue(page: MemoryBlock, fields:
Int) extends RowQueue {
+ private val base: AnyRef = page.getBaseObject
+ private var last = page.getBaseOffset // for writing
+ private var first = page.getBaseOffset // for reading
+ private val resultRow = new UnsafeRow(fields)
+
+ def add(row: UnsafeRow): Boolean = {
+ if (last + 4 + row.getSizeInBytes > page.getBaseOffset + page.size) {
+ if (last + 4 <= page.getBaseOffset + page.size) {
+ Platform.putInt(base, last, -1)
+ }
+ return false
+ }
+ Platform.putInt(base, last, row.getSizeInBytes)
+ Platform.copyMemory(row.getBaseObject, row.getBaseOffset, base, last +
4, row.getSizeInBytes)
+ last += 4 + row.getSizeInBytes
+ true
+ }
+
+ def remove(): UnsafeRow = {
+ if (first + 4 > page.getBaseOffset + page.size ||
Platform.getInt(base, first) < 0) {
--- End diff --
Remove() should only be called when we know that there are at least one row
in it.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]