HeartSaVioR commented on a change in pull request #33081:
URL: https://github.com/apache/spark/pull/33081#discussion_r670157882



##########
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
##########
@@ -3933,6 +3938,83 @@ object TimeWindowing extends Rule[LogicalPlan] {
   }
 }
 
+/** Maps a time column to a session window. */
+object SessionWindowing extends Rule[LogicalPlan] {
+  import org.apache.spark.sql.catalyst.dsl.expressions._
+
+  private final val SESSION_COL_NAME = "session_window"
+  private final val SESSION_START = "start"
+  private final val SESSION_END = "end"
+
+  /**
+   * Generates the logical plan for generating session window on a timestamp 
column.
+   * Each session window is initially defined as [timestamp, timestamp + gap).
+   *
+   * This also adds a marker to the session column so that downstream can 
easily find the column
+   * on session window.
+   */
+  def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsUp {
+    case p: LogicalPlan if p.children.size == 1 =>
+      val child = p.children.head
+      val sessionExpressions =
+        p.expressions.flatMap(_.collect { case s: SessionWindow => s }).toSet
+
+      val numWindowExpr = p.expressions.flatMap(_.collect {
+        case s: SessionWindow => s
+        case t: TimeWindow => t
+      }).toSet.size
+
+      // Only support a single session expression for now
+      if (numWindowExpr == 1 && sessionExpressions.nonEmpty &&
+          sessionExpressions.head.timeColumn.resolved &&
+          sessionExpressions.head.checkInputDataTypes().isSuccess) {
+
+        val session = sessionExpressions.head
+
+        val metadata = session.timeColumn match {
+          case a: Attribute => a.metadata
+          case _ => Metadata.empty
+        }
+
+        val newMetadata = new MetadataBuilder()
+          .withMetadata(metadata)
+          .putBoolean(SessionWindow.marker, true)
+          .build()
+
+        val sessionAttr = AttributeReference(
+          SESSION_COL_NAME, session.dataType, metadata = newMetadata)()
+
+        val sessionStart = PreciseTimestampConversion(session.timeColumn, 
TimestampType, LongType)
+        val sessionEnd = sessionStart + session.gapDuration
+
+        val literalSessionStruct = CreateNamedStruct(
+          Literal(SESSION_START) ::
+            PreciseTimestampConversion(sessionStart, LongType, TimestampType) 
::
+            Literal(SESSION_END) ::
+            PreciseTimestampConversion(sessionEnd, LongType, TimestampType) ::
+            Nil)
+
+        val sessionStruct = Alias(literalSessionStruct, SESSION_COL_NAME)(
+          exprId = sessionAttr.exprId, explicitMetadata = Some(newMetadata))
+
+        val replacedPlan = p transformExpressions {
+          case s: SessionWindow => sessionAttr
+        }
+
+        // For backwards compatibility we add a filter to filter out nulls
+        val filterExpr = IsNotNull(session.timeColumn)
+
+        replacedPlan.withNewChildren(
+          Filter(filterExpr,
+            Project(sessionStruct +: child.output, child)) :: Nil)

Review comment:
       Actually this follows the logic on time windows. I agree we'd be better 
to filter first.
   
   Shall we change the order to time windows as well? In different PR for sure.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to