Github user tdas commented on a diff in the pull request:
https://github.com/apache/spark/pull/19271#discussion_r139831092
--- Diff:
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala
---
@@ -0,0 +1,330 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming
+
+import java.util.concurrent.TimeUnit.NANOSECONDS
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression,
JoinedRow, NamedExpression, UnsafeProjection, UnsafeRow}
+import org.apache.spark.sql.catalyst.plans._
+import org.apache.spark.sql.catalyst.plans.logical.EventTimeWatermark._
+import org.apache.spark.sql.catalyst.plans.physical._
+import org.apache.spark.sql.execution.{BinaryExecNode, SparkPlan}
+import
org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinExecHelper._
+import org.apache.spark.sql.execution.streaming.state._
+import org.apache.spark.sql.internal.SessionState
+import org.apache.spark.util.{CompletionIterator,
SerializableConfiguration}
+
+
+/**
+ * Performs stream-stream join using symmetric hash join algorithm. It
works as follows.
+ *
+ * /-----------------------\
+ * left side input --------->| left side state |------\
+ * \-----------------------/ |
+ * |-------->
joined output
+ * /-----------------------\ |
+ * right side input -------->| right side state |------/
+ * \-----------------------/
+ *
+ * Each join side buffers past input rows as streaming state so that the
past input can be joined
+ * with future input on the other side. This buffer state is effectively a
multi-map:
+ * equi-join key -> list of past input rows received with the join key
+ *
+ * For each input row in each side, the following operations take place.
+ * - Calculate join key from the row.
+ * - Use the join key to append the row to the buffer state of the side
that the row came from.
+ * - Find past buffered values for the key from the other side. For each
such value, emit the
+ * "joined row" (left-row, right-row)
+ * - Apply the optional condition to filter the joined rows as the final
output.
+ *
+ * If a timestamp column with event time watermark is present in the join
keys or in the input
+ * data, then the it uses the watermark figure out which rows in the
buffer will not join with
+ * and new data, and therefore can be discarded. Depending on the provided
query conditions, we
+ * can define thresholds on both state key (i.e. joining keys) and state
value (i.e. input rows).
+ * There are three kinds of queries possible regarding this as explained
below.
+ * Assume that watermark has been defined on both `leftTime` and
`rightTime` columns used below.
+ *
+ * 1. When timestamp/time-window + watermark is in the join keys. Example
(pseudo-SQL):
+ *
+ * SELECT * FROM leftTable, rightTable
+ * ON
+ * leftKey = rightKey AND
+ * window(leftTime, "1 hour") = window(rightTime, "1 hour") //
1hr tumbling windows
+ *
+ * In this case, this operator will join rows newer than watermark
which fall in the same
+ * 1 hour window. Say the event-time watermark is "12:34" (both left
and right input).
+ * Then input rows can only have time > 12:34. Hence, they can only
join with buffered rows
+ * where window >= 12:00 - 1:00 and all buffered rows with join window
< 12:00 can be
+ * discarded. In other words, the operator will discard all state where
+ * window in state key (i.e. join key) < event time watermark. This
threshold is called
+ * State Key Watermark.
+ *
+ * 2. When timestamp range conditions are provided (no time/window +
watermark in join keys). E.g.
+ *
+ * SELECT * FROM leftTable, rightTable
+ * ON
+ * leftKey = rightKey AND
+ * leftTime > rightTime - INTERVAL 8 MINUTES AND leftTime <
rightTime + INTERVAL 1 HOUR
+ *
+ * In this case, the event-time watermark and the BETWEEN condition can
be used to calculate a
+ * state watermark, i.e., time threshold for the state rows that can be
discarded.
+ * For example, say the each join side has a time column, named
"leftTime" and
+ * "rightTime", and there is a join condition "leftTime > rightTime - 8
min".
+ * While processing, say the watermark on right input is "12:34". This
means that from henceforth,
+ * only right inputs rows with "rightTime > 12:34" will be processed,
and any older rows will be
+ * considered as "too late" and therefore dropped. Then, the left side
buffer only needs
+ * to keep rows where "leftTime > rightTime - 8 min > 12:34 - 8m >
12:26".
+ * That is, the left state watermark is 12:26, and any rows older than
that can be dropped from
+ * the state. In other words, the operator will discard all state where
+ * timestamp in state value (input rows) < state watermark. This
threshold is called
+ * State Value Watermark (to distinguish from the state key watermark).
+ *
+ * Note:
+ * - The event watermark value of one side is used to calculate the
+ * state watermark of the other side. That is, a condition ~ "leftTime
> rightTime + X" with
+ * right side event watermark is used to calculate the left side state
watermark. Conversely,
+ * a condition ~ "left < rightTime + Y" with left side event watermark
is used to calculate
+ * right side state watermark.
+ * - Depending on the conditions, the state watermark maybe different
for the left and right
+ * side. In the above example, leftTime > 12:26 AND rightTime > 12:34
- 1 hour = 11:34.
+ * - State can be dropped from BOTH sides only when there are conditions
of the above forms that
+ * define time bounds on timestamp in both directions.
+ *
+ * 3. When both window in join key and time range condiions are present,
case 1 + 2.
--- End diff --
done
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]