Github user tdas commented on a diff in the pull request:
https://github.com/apache/spark/pull/20255#discussion_r161920813
--- Diff: docs/structured-streaming-programming-guide.md ---
@@ -1089,6 +1098,224 @@ streamingDf.join(staticDf, "type", "right_join") #
right outer join with a stat
</div>
</div>
+Note that stream-static joins are not stateful, so no state management is
necessary.
+However, a few types of stream-static outer join are not supported as the
incomplete view of
+all data in a stream makes it infeasible to calculate the results
correctly.
+These are discussed at the end of this section.
+
+#### Stream-stream Joins
+In Spark 2.3, we have added support for stream-stream joins, that is, you
can join two streaming
+Datasets/DataFrames. The challenge of generating join results between two
data streams is that,
+at any point of time, the view of the dataset is incomplete for both sides
of the join making
+it much harder to find matches between inputs. Any row received from one
input stream can match
+with any future, yet-to-be-received row from the other input stream.
Hence, for both the input
+streams, we buffer past input as streaming state, so that we can match
every future input with
+past input and accordingly generate joined results. Furthermore, similar
to streaming aggregations,
+we automatically handle late, out-of-order data and can limit the state
using watermarks.
+Letâs discuss the different types of supported stream-stream joins and
how to use them.
+
+##### Inner Joins with optional Watermarking
+Inner joins on any kind of columns along with any kind of join conditions
are supported.
+However, as the stream runs, the size of streaming state will keep growing
indefinitely as
+*all* past input must be saved as the any new input can match with any
input from the past.
+To avoid unbounded state, you have to define additional join conditions
such that indefinitely
+old inputs cannot match with future inputs and therefore can be cleared
from the state.
+In other words, you will have to do the following additional steps in the
join.
+
+1. Define watermark delays on both inputs such that the engine knows how
delayed the input can be
+(similar to streaming aggregations)
+
+1. Define a constraint on event-time across the two inputs such that the
engine can figure out when
+old rows of one input is not going to be required for matches with the
other input. This constraint
+can either be a time range condition (e.g. `...JOIN ON leftTime BETWEN
rightTime AND rightTime + INTERVAL 1 HOUR`),
+or equi-join on event-time windows (e.g. `...JOIN ON leftTimeWindow =
rightTimeWindow`).
+Letâs understand this with an example.
+
+Letâs say we want to join a stream of advertisement impressions (when an
ad was shown) with
+another stream of user clicks on advertisements to correlate when
impressions led to
+monetizable clicks. To allow the state cleanup in this stream-stream join,
you will have to
+specify the watermarking delays and the time constraints as follows.
+
+1. Watermark delays: Say, the impressions and the corresponding clicks can
be late/out-of-order
+in event-time by at most 2 and 3 hours, respectively.
+
+1. Event-time range condition: Say, a click can occur within a time range
of 0 seconds to 1 hour
+after the corresponding impression.
+
+The code would look like this.
+
+<div class="codetabs">
+<div data-lang="scala" markdown="1">
+
+{% highlight scala %}
+import org.apache.spark.sql.functions.expr
+
+val impressions = spark.readStream. ...
+val clicks = spark.readStream. ...
+
+// Apply watermarks on event-time columns
+val impressionsWithWatermark = impressions.withWatermark("impressionTime",
"2 hours")
+val clicksWithWatermark = clicks.withWatermark("clickTime", "3 hours")
+
+// Join with event-time constraints
+impressionsWithWatermark.join(
+ clicksWithWatermark,
+ expr("""
+ clickAdId = impressionAdId AND
+ clickTime >= impressionTime AND
+ clickTime <= impressionTime + interval 1 hour
+ """
+ ))
+
+{% endhighlight %}
+
+</div>
+<div data-lang="java" markdown="1">
+
+{% highlight java %}
+import static org.apache.spark.sql.functions.expr
+
+Dataset<Row> impressions = spark.readStream(). ...
+Dataset<Row> clicks = spark.readStream(). ...
+
+// Apply watermarks on event-time columns
+Dataset<Row> impressionsWithWatermark =
impressions.withWatermark("impressionTime", "2 hours");
+Dataset<Row> clicksWithWatermark = clicks.withWatermark("clickTime", "3
hours");
+
+// Join with event-time constraints
+impressionsWithWatermark.join(
+ clicksWithWatermark,
+ expr(
+ "clickAdId = impressionAdId AND " +
+ "clickTime >= impressionTime AND " +
+ "clickTime <= impressionTime + interval 1 hour "
+ ));
+
+{% endhighlight %}
+
+
+</div>
+<div data-lang="python" markdown="1">
+
+{% highlight python %}
+from pyspark.sql.functions import expr
+
+impressions = spark.readStream. ...
+clicks = spark.readStream. ...
+
+# Apply watermarks on event-time columns
+impressionsWithWatermark = impressions.withWatermark("impressionTime", "2
hours")
+clicksWithWatermark = clicks.withWatermark("clickTime", "3 hours")
+
+# Join with event-time constraints
+impressionsWithWatermark.join(
+ clicksWithWatermark,
+ expr("""
+ clickAdId = impressionAdId AND
+ clickTime >= impressionTime AND
+ clickTime <= impressionTime + interval 1 hour
+ """
+ ))
--- End diff --
Thank you!
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]