icexelloss commented on a change in pull request #22305:
[SPARK-24561][SQL][Python] User-defined window aggregation functions with
Pandas UDF (bounded window)
URL: https://github.com/apache/spark/pull/22305#discussion_r240434370
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/python/WindowInPandasExec.scala
##########
@@ -144,24 +282,107 @@ case class WindowInPandasExec(
queue.close()
}
- val inputProj = UnsafeProjection.create(allInputs, child.output)
- val pythonInput = grouped.map { case (_, rows) =>
- rows.map { row =>
- queue.add(row.asInstanceOf[UnsafeRow])
- inputProj(row)
+ val stream = iter.map { row =>
+ queue.add(row.asInstanceOf[UnsafeRow])
+ row
+ }
+
+ val pythonInput = new Iterator[Iterator[UnsafeRow]] {
+
+ // Manage the stream and the grouping.
+ var nextRow: UnsafeRow = null
+ var nextGroup: UnsafeRow = null
+ var nextRowAvailable: Boolean = false
+ private[this] def fetchNextRow() {
+ nextRowAvailable = stream.hasNext
+ if (nextRowAvailable) {
+ nextRow = stream.next().asInstanceOf[UnsafeRow]
+ nextGroup = grouping(nextRow)
+ } else {
+ nextRow = null
+ nextGroup = null
+ }
+ }
+ fetchNextRow()
+
+ // Manage the current partition.
+ val buffer: ExternalAppendOnlyUnsafeRowArray =
+ new ExternalAppendOnlyUnsafeRowArray(inMemoryThreshold,
spillThreshold)
+ var bufferIterator: Iterator[UnsafeRow] = _
+
+ val indexRow = new
SpecificInternalRow(Array.fill(numBoundIndices)(IntegerType))
+
+ val frames = factories.map(_(indexRow))
+
+ private[this] def fetchNextPartition() {
+ // Collect all the rows in the current partition.
+ // Before we start to fetch new input rows, make a copy of nextGroup.
+ val currentGroup = nextGroup.copy()
+
+ // clear last partition
+ buffer.clear()
+
+ while (nextRowAvailable && nextGroup == currentGroup) {
Review comment:
@hvanhovell Thanks for chiming in. I am not very familiar with unsafe row
comparison and was just following @ueshin suggestion. If this is not needed. I
can close #23279 and leave it as is.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]