zhengruifeng commented on code in PR #39149:
URL: https://github.com/apache/spark/pull/39149#discussion_r1055018263


##########
python/pyspark/sql/connect/column.py:
##########
@@ -568,6 +569,98 @@ def __repr__(self) -> str:
         return f"(LambdaFunction({str(self._function)}, {', 
'.join(self._arguments)})"
 
 
+class WindowExpression(Expression):
+    def __init__(
+        self,
+        windowFunction: Expression,
+        windowSpec: "WindowSpecType",
+    ) -> None:
+        super().__init__()
+
+        from pyspark.sql.connect.window import WindowSpec
+
+        assert windowFunction is not None and isinstance(windowFunction, 
Expression)
+
+        assert windowSpec is not None and isinstance(windowSpec, WindowSpec)
+
+        self._windowFunction = windowFunction
+
+        self._windowSpec = windowSpec
+
+    def to_plan(self, session: "SparkConnectClient") -> proto.Expression:
+        expr = proto.Expression()
+
+        
expr.window.window_function.CopyFrom(self._windowFunction.to_plan(session))
+
+        if len(self._windowSpec._partitionSpec) > 0:
+            expr.window.partition_spec.extend(
+                [p.to_plan(session) for p in self._windowSpec._partitionSpec]
+            )
+        else:
+            warnings.warn(
+                "WARN WindowExpression: No Partition Defined for Window 
operation! "
+                "Moving all data to a single partition, this can cause serious 
"
+                "performance degradation."
+            )
+
+        if len(self._windowSpec._orderSpec) > 0:
+            expr.window.order_spec.extend(
+                [s.to_plan(session).sort_order for s in 
self._windowSpec._orderSpec]
+            )
+
+        if self._windowSpec._frame is not None:
+            if self._windowSpec._frame._isRowFrame:
+                expr.window.frame_spec.frame_type = (
+                    proto.Expression.Window.WindowFrame.FrameType.ROW_FRAME
+                )
+
+                start = self._windowSpec._frame._start
+                if start == 0:
+                    expr.window.frame_spec.lower.current_row = True
+                elif start == JVM_LONG_MIN:
+                    expr.window.frame_spec.lower.unbounded = True
+                elif JVM_INT_MIN <= start <= JVM_INT_MAX:
+                    expr.window.frame_spec.lower.value.literal.integer = start
+                else:
+                    raise ValueError(f"start is out of bound: {start}")
+
+                end = self._windowSpec._frame._end
+                if end == 0:
+                    expr.window.frame_spec.upper.current_row = True
+                elif end == JVM_LONG_MAX:
+                    expr.window.frame_spec.upper.unbounded = True
+                elif JVM_INT_MIN <= end <= JVM_INT_MAX:
+                    expr.window.frame_spec.upper.value.literal.integer = end
+                else:
+                    raise ValueError(f"end is out of bound: {end}")
+
+            else:
+                expr.window.frame_spec.frame_type = (
+                    proto.Expression.Window.WindowFrame.FrameType.RANGE_FRAME
+                )
+
+                start = self._windowSpec._frame._start
+                if start == 0:
+                    expr.window.frame_spec.lower.current_row = True
+                elif start == JVM_LONG_MIN:

Review Comment:
   this is on purpose 
https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/expressions/WindowSpec.scala#L129



##########
python/pyspark/sql/connect/column.py:
##########
@@ -568,6 +569,98 @@ def __repr__(self) -> str:
         return f"(LambdaFunction({str(self._function)}, {', 
'.join(self._arguments)})"
 
 
+class WindowExpression(Expression):
+    def __init__(
+        self,
+        windowFunction: Expression,
+        windowSpec: "WindowSpecType",
+    ) -> None:
+        super().__init__()
+
+        from pyspark.sql.connect.window import WindowSpec
+
+        assert windowFunction is not None and isinstance(windowFunction, 
Expression)
+
+        assert windowSpec is not None and isinstance(windowSpec, WindowSpec)
+
+        self._windowFunction = windowFunction
+
+        self._windowSpec = windowSpec
+
+    def to_plan(self, session: "SparkConnectClient") -> proto.Expression:
+        expr = proto.Expression()
+
+        
expr.window.window_function.CopyFrom(self._windowFunction.to_plan(session))
+
+        if len(self._windowSpec._partitionSpec) > 0:
+            expr.window.partition_spec.extend(
+                [p.to_plan(session) for p in self._windowSpec._partitionSpec]
+            )
+        else:
+            warnings.warn(
+                "WARN WindowExpression: No Partition Defined for Window 
operation! "
+                "Moving all data to a single partition, this can cause serious 
"
+                "performance degradation."
+            )
+
+        if len(self._windowSpec._orderSpec) > 0:
+            expr.window.order_spec.extend(
+                [s.to_plan(session).sort_order for s in 
self._windowSpec._orderSpec]
+            )
+
+        if self._windowSpec._frame is not None:
+            if self._windowSpec._frame._isRowFrame:
+                expr.window.frame_spec.frame_type = (
+                    proto.Expression.Window.WindowFrame.FrameType.ROW_FRAME
+                )
+
+                start = self._windowSpec._frame._start
+                if start == 0:
+                    expr.window.frame_spec.lower.current_row = True
+                elif start == JVM_LONG_MIN:
+                    expr.window.frame_spec.lower.unbounded = True
+                elif JVM_INT_MIN <= start <= JVM_INT_MAX:
+                    expr.window.frame_spec.lower.value.literal.integer = start
+                else:
+                    raise ValueError(f"start is out of bound: {start}")
+
+                end = self._windowSpec._frame._end
+                if end == 0:
+                    expr.window.frame_spec.upper.current_row = True
+                elif end == JVM_LONG_MAX:
+                    expr.window.frame_spec.upper.unbounded = True
+                elif JVM_INT_MIN <= end <= JVM_INT_MAX:
+                    expr.window.frame_spec.upper.value.literal.integer = end
+                else:
+                    raise ValueError(f"end is out of bound: {end}")
+
+            else:
+                expr.window.frame_spec.frame_type = (
+                    proto.Expression.Window.WindowFrame.FrameType.RANGE_FRAME
+                )
+
+                start = self._windowSpec._frame._start
+                if start == 0:
+                    expr.window.frame_spec.lower.current_row = True
+                elif start == JVM_LONG_MIN:
+                    expr.window.frame_spec.lower.unbounded = True
+                else:
+                    expr.window.frame_spec.lower.value.literal.long = start
+
+                end = self._windowSpec._frame._end
+                if end == 0:
+                    expr.window.frame_spec.upper.current_row = True
+                elif end == JVM_LONG_MAX:

Review Comment:
   this one too



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to