icexelloss commented on a change in pull request #22305:
[SPARK-24561][SQL][Python] User-defined window aggregation functions with
Pandas UDF (bounded window)
URL: https://github.com/apache/spark/pull/22305#discussion_r242302108
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/python/WindowInPandasExec.scala
##########
@@ -144,24 +285,107 @@ case class WindowInPandasExec(
queue.close()
}
- val inputProj = UnsafeProjection.create(allInputs, child.output)
- val pythonInput = grouped.map { case (_, rows) =>
- rows.map { row =>
- queue.add(row.asInstanceOf[UnsafeRow])
- inputProj(row)
+ val stream = iter.map { row =>
+ queue.add(row.asInstanceOf[UnsafeRow])
+ row
+ }
+
+ val pythonInput = new Iterator[Iterator[UnsafeRow]] {
+
+ // Manage the stream and the grouping.
+ var nextRow: UnsafeRow = null
+ var nextGroup: UnsafeRow = null
+ var nextRowAvailable: Boolean = false
+ private[this] def fetchNextRow() {
+ nextRowAvailable = stream.hasNext
+ if (nextRowAvailable) {
+ nextRow = stream.next().asInstanceOf[UnsafeRow]
+ nextGroup = grouping(nextRow)
+ } else {
+ nextRow = null
+ nextGroup = null
+ }
+ }
+ fetchNextRow()
+
+ // Manage the current partition.
+ val buffer: ExternalAppendOnlyUnsafeRowArray =
+ new ExternalAppendOnlyUnsafeRowArray(inMemoryThreshold,
spillThreshold)
+ var bufferIterator: Iterator[UnsafeRow] = _
+
+ val indexRow = new
SpecificInternalRow(Array.fill(numBoundIndices)(IntegerType))
+
+ val frames = factories.map(_(indexRow))
+
+ private[this] def fetchNextPartition() {
+ // Collect all the rows in the current partition.
+ // Before we start to fetch new input rows, make a copy of nextGroup.
+ val currentGroup = nextGroup.copy()
+
+ // clear last partition
+ buffer.clear()
+
+ while (nextRowAvailable && nextGroup == currentGroup) {
+ buffer.add(nextRow)
+ fetchNextRow()
+ }
+
+ // Setup the frames.
+ var i = 0
+ while (i < numFrames) {
+ frames(i).prepare(buffer)
+ i += 1
+ }
+
+ // Setup iteration
+ rowIndex = 0
+ bufferIterator = buffer.generateIterator()
+ }
+
+ // Iteration
+ var rowIndex = 0
+
+ override final def hasNext: Boolean =
+ (bufferIterator != null && bufferIterator.hasNext) ||
nextRowAvailable
+
+ override final def next(): Iterator[UnsafeRow] = {
+ // Load the next partition if we need to.
+ if ((bufferIterator == null || !bufferIterator.hasNext) &&
nextRowAvailable) {
+ fetchNextPartition()
+ }
+
+ val join = new JoinedRow
+
+ bufferIterator.zipWithIndex.map {
+ case (current, index) =>
+ var frameIndex = 0
+ while (frameIndex < numFrames) {
+ frames(frameIndex).write(index, current)
+ // If lowerBoundIndex of frame is < 0, it means the window is
unbounded
+ // and we don't need to write out window bounds.
+ if (lowerBoundIndex(frameIndex) >= 0) {
Review comment:
Added
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]