grundprinzip commented on code in PR #39149:
URL: https://github.com/apache/spark/pull/39149#discussion_r1055153566
##########
python/pyspark/sql/connect/column.py:
##########
@@ -568,6 +569,98 @@ def __repr__(self) -> str:
return f"(LambdaFunction({str(self._function)}, {',
'.join(self._arguments)})"
+class WindowExpression(Expression):
+ def __init__(
+ self,
+ windowFunction: Expression,
+ windowSpec: "WindowSpecType",
+ ) -> None:
+ super().__init__()
+
+ from pyspark.sql.connect.window import WindowSpec
+
+ assert windowFunction is not None and isinstance(windowFunction,
Expression)
+
+ assert windowSpec is not None and isinstance(windowSpec, WindowSpec)
+
+ self._windowFunction = windowFunction
+
+ self._windowSpec = windowSpec
+
+ def to_plan(self, session: "SparkConnectClient") -> proto.Expression:
+ expr = proto.Expression()
+
+
expr.window.window_function.CopyFrom(self._windowFunction.to_plan(session))
+
+ if len(self._windowSpec._partitionSpec) > 0:
+ expr.window.partition_spec.extend(
+ [p.to_plan(session) for p in self._windowSpec._partitionSpec]
+ )
+ else:
+ warnings.warn(
+ "WARN WindowExpression: No Partition Defined for Window
operation! "
+ "Moving all data to a single partition, this can cause serious
"
+ "performance degradation."
+ )
+
+ if len(self._windowSpec._orderSpec) > 0:
+ expr.window.order_spec.extend(
+ [s.to_plan(session).sort_order for s in
self._windowSpec._orderSpec]
+ )
+
+ if self._windowSpec._frame is not None:
+ if self._windowSpec._frame._isRowFrame:
+ expr.window.frame_spec.frame_type = (
+ proto.Expression.Window.WindowFrame.FrameType.ROW_FRAME
+ )
+
+ start = self._windowSpec._frame._start
+ if start == 0:
+ expr.window.frame_spec.lower.current_row = True
+ elif start == JVM_LONG_MIN:
+ expr.window.frame_spec.lower.unbounded = True
+ elif JVM_INT_MIN <= start <= JVM_INT_MAX:
+ expr.window.frame_spec.lower.value.literal.integer = start
+ else:
+ raise ValueError(f"start is out of bound: {start}")
+
+ end = self._windowSpec._frame._end
+ if end == 0:
+ expr.window.frame_spec.upper.current_row = True
+ elif end == JVM_LONG_MAX:
+ expr.window.frame_spec.upper.unbounded = True
+ elif JVM_INT_MIN <= end <= JVM_INT_MAX:
+ expr.window.frame_spec.upper.value.literal.integer = end
+ else:
+ raise ValueError(f"end is out of bound: {end}")
+
+ else:
+ expr.window.frame_spec.frame_type = (
+ proto.Expression.Window.WindowFrame.FrameType.RANGE_FRAME
+ )
+
+ start = self._windowSpec._frame._start
+ if start == 0:
+ expr.window.frame_spec.lower.current_row = True
+ elif start == JVM_LONG_MIN:
Review Comment:
@zhengruifeng the problem that i tried to point out is that while in JVM
land there is exactly one value that matches JVM_MIN but in Python there are
more due to the unbounded size of the integer type.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]