Github user zsxwing commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20255#discussion_r162199330
  
    --- Diff: docs/structured-streaming-programming-guide.md ---
    @@ -1089,6 +1101,300 @@ streamingDf.join(staticDf, "type", "right_join")  # 
right outer join with a stat
     </div>
     </div>
     
    +Note that stream-static joins are not stateful, so no state management is 
necessary.
    +However, a few types of stream-static outer joins are not yet supported.
    +These are listed at the [end of this Join 
section](#support-matrix-for-joins-in-streaming-queries).
    +
    +#### Stream-stream Joins
    +In Spark 2.3, we have added support for stream-stream joins, that is, you 
can join two streaming
    +Datasets/DataFrames. The challenge of generating join results between two 
data streams is that,
    +at any point of time, the view of the dataset is incomplete for both sides 
of the join making
    +it much harder to find matches between inputs. Any row received from one 
input stream can match
    +with any future, yet-to-be-received row from the other input stream. 
Hence, for both the input
    +streams, we buffer past input as streaming state, so that we can match 
every future input with
    +past input and accordingly generate joined results. Furthermore, similar 
to streaming aggregations,
    +we automatically handle late, out-of-order data and can limit the state 
using watermarks.
    +Let’s discuss the different types of supported stream-stream joins and 
how to use them.
    +
    +##### Inner Joins with optional Watermarking
    +Inner joins on any kind of columns along with any kind of join conditions 
are supported.
    +However, as the stream runs, the size of streaming state will keep growing 
indefinitely as
    +*all* past input must be saved as the any new input can match with any 
input from the past.
    +To avoid unbounded state, you have to define additional join conditions 
such that indefinitely
    +old inputs cannot match with future inputs and therefore can be cleared 
from the state.
    +In other words, you will have to do the following additional steps in the 
join.
    +
    +1. Define watermark delays on both inputs such that the engine knows how 
delayed the input can be
    +(similar to streaming aggregations)
    +
    +1. Define a constraint on event-time across the two inputs such that the 
engine can figure out when
    +old rows of one input is not going to be required (i.e. will not satisfy 
the time constraint) for
    +matches with the other input. This constraint can be defined in one of the 
two ways.
    +
    +    1. Time range join conditions (e.g. `...JOIN ON leftTime BETWEN 
rightTime AND rightTime + INTERVAL 1 HOUR`),
    +
    +    1. Join on event-time windows (e.g. `...JOIN ON leftTimeWindow = 
rightTimeWindow`).
    +
    +Let’s understand this with an example.
    +
    +Let’s say we want to join a stream of advertisement impressions (when an 
ad was shown) with
    +another stream of user clicks on advertisements to correlate when 
impressions led to
    +monetizable clicks. To allow the state cleanup in this stream-stream join, 
you will have to
    +specify the watermarking delays and the time constraints as follows.
    +
    +1. Watermark delays: Say, the impressions and the corresponding clicks can 
be late/out-of-order
    +in event-time by at most 2 and 3 hours, respectively.
    +
    +1. Event-time range condition: Say, a click can occur within a time range 
of 0 seconds to 1 hour
    +after the corresponding impression.
    +
    +The code would look like this.
    +
    +<div class="codetabs">
    +<div data-lang="scala"  markdown="1">
    +
    +{% highlight scala %}
    +import org.apache.spark.sql.functions.expr
    +
    +val impressions = spark.readStream. ...
    +val clicks = spark.readStream. ...
    +
    +// Apply watermarks on event-time columns
    +val impressionsWithWatermark = impressions.withWatermark("impressionTime", 
"2 hours")
    +val clicksWithWatermark = clicks.withWatermark("clickTime", "3 hours")
    +
    +// Join with event-time constraints
    +impressionsWithWatermark.join(
    +  clicksWithWatermark,
    +  expr("""
    +    clickAdId = impressionAdId AND
    +    clickTime >= impressionTime AND
    +    clickTime <= impressionTime + interval 1 hour
    +    """)
    +)
    +
    +{% endhighlight %}
    +
    +</div>
    +<div data-lang="java"  markdown="1">
    +
    +{% highlight java %}
    +import static org.apache.spark.sql.functions.expr
    +
    +Dataset<Row> impressions = spark.readStream(). ...
    +Dataset<Row> clicks = spark.readStream(). ...
    +
    +// Apply watermarks on event-time columns
    +Dataset<Row> impressionsWithWatermark = 
impressions.withWatermark("impressionTime", "2 hours");
    +Dataset<Row> clicksWithWatermark = clicks.withWatermark("clickTime", "3 
hours");
    +
    +// Join with event-time constraints
    +impressionsWithWatermark.join(
    +  clicksWithWatermark,
    +  expr(
    +    "clickAdId = impressionAdId AND " +
    +    "clickTime >= impressionTime AND " +
    +    "clickTime <= impressionTime + interval 1 hour ")
    +);
    +
    +{% endhighlight %}
    +
    +
    +</div>
    +<div data-lang="python"  markdown="1">
    +
    +{% highlight python %}
    +from pyspark.sql.functions import expr
    +
    +impressions = spark.readStream. ...
    +clicks = spark.readStream. ...
    +
    +# Apply watermarks on event-time columns
    +impressionsWithWatermark = impressions.withWatermark("impressionTime", "2 
hours")
    +clicksWithWatermark = clicks.withWatermark("clickTime", "3 hours")
    +
    +# Join with event-time constraints
    +impressionsWithWatermark.join(
    +  clicksWithWatermark,
    +  expr("""
    +    clickAdId = impressionAdId AND
    +    clickTime >= impressionTime AND
    +    clickTime <= impressionTime + interval 1 hour
    +    """)
    +)
    +
    +{% endhighlight %}
    +
    +</div>
    +</div>
    +
    +##### Outer Joins with Watermarking
    +While the watermark + event-time constraints is optional for inner joins, 
for left and right outer
    +joins they must be specified. This is because for generating the NULL 
results in outer join, the
    +engine must know when an input row is not going to match with anything in 
future. Hence, the
    +watermark + event-time constraints must be specified for generating 
correct results. Therefore,
    +a query with outer-join will look quite like the ad-monetization example 
earlier, except that
    +there will be an additional parameter specifying it to be an outer-join.
    +
    +<div class="codetabs">
    +<div data-lang="scala"  markdown="1">
    +
    +{% highlight scala %}
    +
    +impressionsWithWatermark.join(
    +  clicksWithWatermark,
    +  expr("""
    +    clickAdId = impressionAdId AND
    +    clickTime >= impressionTime AND
    +    clickTime <= impressionTime + interval 1 hour
    +    """),
    +  joinType = "leftOuter"      // can be "inner", "leftOuter", "rightOuter"
    + )
    +
    +{% endhighlight %}
    +
    +</div>
    +<div data-lang="java"  markdown="1">
    +
    +{% highlight java %}
    +impressionsWithWatermark.join(
    +  clicksWithWatermark,
    +  expr(
    +    "clickAdId = impressionAdId AND " +
    +    "clickTime >= impressionTime AND " +
    +    "clickTime <= impressionTime + interval 1 hour "),
    +  "leftOuter"                 // can be "inner", "leftOuter", "rightOuter"
    +);
    +
    +{% endhighlight %}
    +
    +
    +</div>
    +<div data-lang="python"  markdown="1">
    +
    +{% highlight python %}
    +impressionsWithWatermark.join(
    +  clicksWithWatermark,
    +  expr("""
    +    clickAdId = impressionAdId AND
    +    clickTime >= impressionTime AND
    +    clickTime <= impressionTime + interval 1 hour
    +    """),
    +  "leftOuter"                 # can be "inner", "leftOuter", "rightOuter"
    +)
    +
    +{% endhighlight %}
    +
    +</div>
    +</div>
    +
    +However, note that the outer NULL results will be generated with a delay 
(depends on the specified
    +watermark delay and the time range condition) because the engine has to 
wait for that long to ensure
    +there were no matches and there will be no more matches in future.
    +
    +##### Support matrix for joins in streaming queries
    +
    +<table class ="table">
    +  <tr>
    +    <th>Left Input</th>
    +    <th>Right Input</th>
    +    <th>Join Type</th>
    +    <th></th>
    +  </tr>
    +  <tr>
    +      <td style="vertical-align: middle;">Static</td>
    +      <td style="vertical-align: middle;">Static</td>
    +      <td style="vertical-align: middle;">All types</td>
    +      <td style="vertical-align: middle;">
    +        Supported, since its not on streaming data even though it
    +        can be present in a streaming query
    +      </td>
    +  </tr>
    +  <tr>
    +    <td rowspan="4" style="vertical-align: middle;">Stream</td>
    +    <td rowspan="4" style="vertical-align: middle;">Static</td>
    +    <td style="vertical-align: middle;">Inner</td>
    +    <td style="vertical-align: middle;">Supported, not stateful</td>
    +  </tr>
    +  <tr>
    +    <td style="vertical-align: middle;">Left Outer</td>
    +    <td style="vertical-align: middle;">Supported, not stateful</td>
    +  </tr>
    +  <tr>
    +    <td style="vertical-align: middle;">Right Outer</td>
    +    <td style="vertical-align: middle;">Not supported</td>
    +  </tr>
    +  <tr>
    +    <td style="vertical-align: middle;">Full Outer</td>
    +    <td style="vertical-align: middle;">Not supported</td>
    +  </tr>
    +  <tr>
    +    <td rowspan="4" style="vertical-align: middle;">Static</td>
    +    <td rowspan="4" style="vertical-align: middle;">Stream</td>
    +    <td style="vertical-align: middle;">Inner</td>
    +    <td style="vertical-align: middle;">Supported, not stateful</td>
    +  </tr>
    +  <tr>
    +    <td style="vertical-align: middle;">Left Outer</td>
    +    <td style="vertical-align: middle;">Not supported</td>
    +  </tr>
    +  <tr>
    +    <td style="vertical-align: middle;">Right Outer</td>
    +    <td style="vertical-align: middle;">Supported, not stateful</td>
    +  </tr>
    +  <tr>
    +    <td style="vertical-align: middle;">Full Outer</td>
    +    <td style="vertical-align: middle;">Not supported</td>
    +  </tr>
    +  <tr>
    +    <td rowspan="4" style="vertical-align: middle;">Stream</td>
    +    <td rowspan="4" style="vertical-align: middle;">Stream</td>
    +    <td style="vertical-align: middle;">Inner</td>
    +    <td style="vertical-align: middle;">
    +      Supported, optionally specify watermark on both sides +
    +      time constraints for state cleanup<
    +    </td>
    +  </tr>
    +  <tr>
    +    <td style="vertical-align: middle;">Left Outer</td>
    +    <td style="vertical-align: middle;">
    +      Conditionally supported, must specify watermark on right + time 
constraints for correct
    +      results, optionally specify watermark on left for all state cleanup
    +    </td>
    +  </tr>
    +  <tr>
    +    <td style="vertical-align: middle;">Right Outer</td>
    +    <td style="vertical-align: middle;">
    +      Conditionally supported, must specify watermark on left + time 
constraints for correct
    +      results, optionally specify watermark on right for all state cleanup
    +    </td>
    +  </tr>
    +  <tr>
    +    <td style="vertical-align: middle;">Full Outer</td>
    +    <td style="vertical-align: middle;">Not supported</td>
    +  </tr>
    + <tr>
    +    <td></td>
    +    <td></td>
    +    <td></td>
    +    <td></td>
    +  </tr>
    +</table>
    +
    +Additional details on supported joins:
    +
    +- Joins can be cascaded, that is, you can do `df1.join(df2, ...).join(df3, 
...).join(df4, ....)`.
    +
    +- As of Spark 2.3, you can use joins only when the query is in Append 
output mode. Other output modes are not yet supported.
    +
    +- As of Spark 2.3, you cannot use other non-map-like operations before 
joins. Here are a few examples of
    +  what cannot be used.
    +
    +  - Cannot use streaming aggregations before joins.
    +
    +  - Cannot use mapGroupsWithState and flatMapGroupsWithState in Update 
mode cannot before joins.
    --- End diff --
    
    nit: ~~cannot~~ before joins.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to