Github user zsxwing commented on a diff in the pull request:
https://github.com/apache/spark/pull/20255#discussion_r162199330
--- Diff: docs/structured-streaming-programming-guide.md ---
@@ -1089,6 +1101,300 @@ streamingDf.join(staticDf, "type", "right_join") #
right outer join with a stat
</div>
</div>
+Note that stream-static joins are not stateful, so no state management is
necessary.
+However, a few types of stream-static outer joins are not yet supported.
+These are listed at the [end of this Join
section](#support-matrix-for-joins-in-streaming-queries).
+
+#### Stream-stream Joins
+In Spark 2.3, we have added support for stream-stream joins, that is, you
can join two streaming
+Datasets/DataFrames. The challenge of generating join results between two
data streams is that,
+at any point of time, the view of the dataset is incomplete for both sides
of the join making
+it much harder to find matches between inputs. Any row received from one
input stream can match
+with any future, yet-to-be-received row from the other input stream.
Hence, for both the input
+streams, we buffer past input as streaming state, so that we can match
every future input with
+past input and accordingly generate joined results. Furthermore, similar
to streaming aggregations,
+we automatically handle late, out-of-order data and can limit the state
using watermarks.
+Letâs discuss the different types of supported stream-stream joins and
how to use them.
+
+##### Inner Joins with optional Watermarking
+Inner joins on any kind of columns along with any kind of join conditions
are supported.
+However, as the stream runs, the size of streaming state will keep growing
indefinitely as
+*all* past input must be saved as the any new input can match with any
input from the past.
+To avoid unbounded state, you have to define additional join conditions
such that indefinitely
+old inputs cannot match with future inputs and therefore can be cleared
from the state.
+In other words, you will have to do the following additional steps in the
join.
+
+1. Define watermark delays on both inputs such that the engine knows how
delayed the input can be
+(similar to streaming aggregations)
+
+1. Define a constraint on event-time across the two inputs such that the
engine can figure out when
+old rows of one input is not going to be required (i.e. will not satisfy
the time constraint) for
+matches with the other input. This constraint can be defined in one of the
two ways.
+
+ 1. Time range join conditions (e.g. `...JOIN ON leftTime BETWEN
rightTime AND rightTime + INTERVAL 1 HOUR`),
+
+ 1. Join on event-time windows (e.g. `...JOIN ON leftTimeWindow =
rightTimeWindow`).
+
+Letâs understand this with an example.
+
+Letâs say we want to join a stream of advertisement impressions (when an
ad was shown) with
+another stream of user clicks on advertisements to correlate when
impressions led to
+monetizable clicks. To allow the state cleanup in this stream-stream join,
you will have to
+specify the watermarking delays and the time constraints as follows.
+
+1. Watermark delays: Say, the impressions and the corresponding clicks can
be late/out-of-order
+in event-time by at most 2 and 3 hours, respectively.
+
+1. Event-time range condition: Say, a click can occur within a time range
of 0 seconds to 1 hour
+after the corresponding impression.
+
+The code would look like this.
+
+<div class="codetabs">
+<div data-lang="scala" markdown="1">
+
+{% highlight scala %}
+import org.apache.spark.sql.functions.expr
+
+val impressions = spark.readStream. ...
+val clicks = spark.readStream. ...
+
+// Apply watermarks on event-time columns
+val impressionsWithWatermark = impressions.withWatermark("impressionTime",
"2 hours")
+val clicksWithWatermark = clicks.withWatermark("clickTime", "3 hours")
+
+// Join with event-time constraints
+impressionsWithWatermark.join(
+ clicksWithWatermark,
+ expr("""
+ clickAdId = impressionAdId AND
+ clickTime >= impressionTime AND
+ clickTime <= impressionTime + interval 1 hour
+ """)
+)
+
+{% endhighlight %}
+
+</div>
+<div data-lang="java" markdown="1">
+
+{% highlight java %}
+import static org.apache.spark.sql.functions.expr
+
+Dataset<Row> impressions = spark.readStream(). ...
+Dataset<Row> clicks = spark.readStream(). ...
+
+// Apply watermarks on event-time columns
+Dataset<Row> impressionsWithWatermark =
impressions.withWatermark("impressionTime", "2 hours");
+Dataset<Row> clicksWithWatermark = clicks.withWatermark("clickTime", "3
hours");
+
+// Join with event-time constraints
+impressionsWithWatermark.join(
+ clicksWithWatermark,
+ expr(
+ "clickAdId = impressionAdId AND " +
+ "clickTime >= impressionTime AND " +
+ "clickTime <= impressionTime + interval 1 hour ")
+);
+
+{% endhighlight %}
+
+
+</div>
+<div data-lang="python" markdown="1">
+
+{% highlight python %}
+from pyspark.sql.functions import expr
+
+impressions = spark.readStream. ...
+clicks = spark.readStream. ...
+
+# Apply watermarks on event-time columns
+impressionsWithWatermark = impressions.withWatermark("impressionTime", "2
hours")
+clicksWithWatermark = clicks.withWatermark("clickTime", "3 hours")
+
+# Join with event-time constraints
+impressionsWithWatermark.join(
+ clicksWithWatermark,
+ expr("""
+ clickAdId = impressionAdId AND
+ clickTime >= impressionTime AND
+ clickTime <= impressionTime + interval 1 hour
+ """)
+)
+
+{% endhighlight %}
+
+</div>
+</div>
+
+##### Outer Joins with Watermarking
+While the watermark + event-time constraints is optional for inner joins,
for left and right outer
+joins they must be specified. This is because for generating the NULL
results in outer join, the
+engine must know when an input row is not going to match with anything in
future. Hence, the
+watermark + event-time constraints must be specified for generating
correct results. Therefore,
+a query with outer-join will look quite like the ad-monetization example
earlier, except that
+there will be an additional parameter specifying it to be an outer-join.
+
+<div class="codetabs">
+<div data-lang="scala" markdown="1">
+
+{% highlight scala %}
+
+impressionsWithWatermark.join(
+ clicksWithWatermark,
+ expr("""
+ clickAdId = impressionAdId AND
+ clickTime >= impressionTime AND
+ clickTime <= impressionTime + interval 1 hour
+ """),
+ joinType = "leftOuter" // can be "inner", "leftOuter", "rightOuter"
+ )
+
+{% endhighlight %}
+
+</div>
+<div data-lang="java" markdown="1">
+
+{% highlight java %}
+impressionsWithWatermark.join(
+ clicksWithWatermark,
+ expr(
+ "clickAdId = impressionAdId AND " +
+ "clickTime >= impressionTime AND " +
+ "clickTime <= impressionTime + interval 1 hour "),
+ "leftOuter" // can be "inner", "leftOuter", "rightOuter"
+);
+
+{% endhighlight %}
+
+
+</div>
+<div data-lang="python" markdown="1">
+
+{% highlight python %}
+impressionsWithWatermark.join(
+ clicksWithWatermark,
+ expr("""
+ clickAdId = impressionAdId AND
+ clickTime >= impressionTime AND
+ clickTime <= impressionTime + interval 1 hour
+ """),
+ "leftOuter" # can be "inner", "leftOuter", "rightOuter"
+)
+
+{% endhighlight %}
+
+</div>
+</div>
+
+However, note that the outer NULL results will be generated with a delay
(depends on the specified
+watermark delay and the time range condition) because the engine has to
wait for that long to ensure
+there were no matches and there will be no more matches in future.
+
+##### Support matrix for joins in streaming queries
+
+<table class ="table">
+ <tr>
+ <th>Left Input</th>
+ <th>Right Input</th>
+ <th>Join Type</th>
+ <th></th>
+ </tr>
+ <tr>
+ <td style="vertical-align: middle;">Static</td>
+ <td style="vertical-align: middle;">Static</td>
+ <td style="vertical-align: middle;">All types</td>
+ <td style="vertical-align: middle;">
+ Supported, since its not on streaming data even though it
+ can be present in a streaming query
+ </td>
+ </tr>
+ <tr>
+ <td rowspan="4" style="vertical-align: middle;">Stream</td>
+ <td rowspan="4" style="vertical-align: middle;">Static</td>
+ <td style="vertical-align: middle;">Inner</td>
+ <td style="vertical-align: middle;">Supported, not stateful</td>
+ </tr>
+ <tr>
+ <td style="vertical-align: middle;">Left Outer</td>
+ <td style="vertical-align: middle;">Supported, not stateful</td>
+ </tr>
+ <tr>
+ <td style="vertical-align: middle;">Right Outer</td>
+ <td style="vertical-align: middle;">Not supported</td>
+ </tr>
+ <tr>
+ <td style="vertical-align: middle;">Full Outer</td>
+ <td style="vertical-align: middle;">Not supported</td>
+ </tr>
+ <tr>
+ <td rowspan="4" style="vertical-align: middle;">Static</td>
+ <td rowspan="4" style="vertical-align: middle;">Stream</td>
+ <td style="vertical-align: middle;">Inner</td>
+ <td style="vertical-align: middle;">Supported, not stateful</td>
+ </tr>
+ <tr>
+ <td style="vertical-align: middle;">Left Outer</td>
+ <td style="vertical-align: middle;">Not supported</td>
+ </tr>
+ <tr>
+ <td style="vertical-align: middle;">Right Outer</td>
+ <td style="vertical-align: middle;">Supported, not stateful</td>
+ </tr>
+ <tr>
+ <td style="vertical-align: middle;">Full Outer</td>
+ <td style="vertical-align: middle;">Not supported</td>
+ </tr>
+ <tr>
+ <td rowspan="4" style="vertical-align: middle;">Stream</td>
+ <td rowspan="4" style="vertical-align: middle;">Stream</td>
+ <td style="vertical-align: middle;">Inner</td>
+ <td style="vertical-align: middle;">
+ Supported, optionally specify watermark on both sides +
+ time constraints for state cleanup<
+ </td>
+ </tr>
+ <tr>
+ <td style="vertical-align: middle;">Left Outer</td>
+ <td style="vertical-align: middle;">
+ Conditionally supported, must specify watermark on right + time
constraints for correct
+ results, optionally specify watermark on left for all state cleanup
+ </td>
+ </tr>
+ <tr>
+ <td style="vertical-align: middle;">Right Outer</td>
+ <td style="vertical-align: middle;">
+ Conditionally supported, must specify watermark on left + time
constraints for correct
+ results, optionally specify watermark on right for all state cleanup
+ </td>
+ </tr>
+ <tr>
+ <td style="vertical-align: middle;">Full Outer</td>
+ <td style="vertical-align: middle;">Not supported</td>
+ </tr>
+ <tr>
+ <td></td>
+ <td></td>
+ <td></td>
+ <td></td>
+ </tr>
+</table>
+
+Additional details on supported joins:
+
+- Joins can be cascaded, that is, you can do `df1.join(df2, ...).join(df3,
...).join(df4, ....)`.
+
+- As of Spark 2.3, you can use joins only when the query is in Append
output mode. Other output modes are not yet supported.
+
+- As of Spark 2.3, you cannot use other non-map-like operations before
joins. Here are a few examples of
+ what cannot be used.
+
+ - Cannot use streaming aggregations before joins.
+
+ - Cannot use mapGroupsWithState and flatMapGroupsWithState in Update
mode cannot before joins.
--- End diff --
nit: ~~cannot~~ before joins.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]