[ 
https://issues.apache.org/jira/browse/SPARK-27433?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jungtaek Lim updated SPARK-27433:
---------------------------------
    Issue Type: Bug  (was: Question)

> Spark Structured Streaming left outer joins returns outer nulls for already 
> matched rows
> ----------------------------------------------------------------------------------------
>
>                 Key: SPARK-27433
>                 URL: https://issues.apache.org/jira/browse/SPARK-27433
>             Project: Spark
>          Issue Type: Bug
>          Components: Structured Streaming
>    Affects Versions: 2.3.0
>            Reporter: Binit
>            Priority: Blocker
>
> I m basically using the example given in Spark's the documentation here: 
> [https://spark.apache.org/docs/2.3.0/structured-streaming-programming-guide.html#outer-joins-with-watermarking]
>  with the built-in test stream in which one stream is ahead by 3 seconds (was 
> originally using kafka but ran into the same issue). The results returned the 
> match columns correctly, however after a while the same key is returned with 
> an outer null.
> Is this the expected behavior? Is there a way to exclude the duplicate outer 
> null results when there was a match?
> Code:
> {{val testStream = session.readStream.format("rate") .option("rowsPerSecond", 
> "5").option("numPartitions", "1").load() val impressions = testStream 
> .select( (col("value") + 15).as("impressionAdId"), 
> col("timestamp").as("impressionTime")) val clicks = testStream .select( 
> col("value").as("clickAdId"), col("timestamp").as("clickTime")) // Apply 
> watermarks on event-time columns val impressionsWithWatermark = 
> impressions.withWatermark("impressionTime", "20 seconds") val 
> clicksWithWatermark = clicks.withWatermark("clickTime", "30 seconds") // Join 
> with event-time constraints val result = impressionsWithWatermark.join( 
> clicksWithWatermark, expr(""" clickAdId = impressionAdId AND clickTime >= 
> impressionTime AND clickTime <= impressionTime + interval 10 seconds """), 
> joinType = "leftOuter" // can be "inner", "leftOuter", "rightOuter" ) val 
> query = 
> result.writeStream.outputMode("update").format("console").option("truncate", 
> false).start() query.awaitTermination()}}
> Result:
> {{------------------------------------------- Batch: 19 
> ------------------------------------------- 
> +--------------+-----------------------+---------+-----------------------+ 
> |impressionAdId|impressionTime |clickAdId|clickTime | 
> +--------------+-----------------------+---------+-----------------------+ 
> |100 |2018-05-23 22:18:38.362|100 |2018-05-23 22:18:41.362| |101 |2018-05-23 
> 22:18:38.562|101 |2018-05-23 22:18:41.562| |102 |2018-05-23 22:18:38.762|102 
> |2018-05-23 22:18:41.762| |103 |2018-05-23 22:18:38.962|103 |2018-05-23 
> 22:18:41.962| |104 |2018-05-23 22:18:39.162|104 |2018-05-23 22:18:42.162| 
> +--------------+-----------------------+---------+-----------------------+ 
> ------------------------------------------- Batch: 57 
> ------------------------------------------- 
> +--------------+-----------------------+---------+-----------------------+ 
> |impressionAdId|impressionTime |clickAdId|clickTime | 
> +--------------+-----------------------+---------+-----------------------+ 
> |290 |2018-05-23 22:19:16.362|290 |2018-05-23 22:19:19.362| |291 |2018-05-23 
> 22:19:16.562|291 |2018-05-23 22:19:19.562| |292 |2018-05-23 22:19:16.762|292 
> |2018-05-23 22:19:19.762| |293 |2018-05-23 22:19:16.962|293 |2018-05-23 
> 22:19:19.962| |294 |2018-05-23 22:19:17.162|294 |2018-05-23 22:19:20.162| 
> |100 |2018-05-23 22:18:38.362|null |null | |99 |2018-05-23 22:18:38.162|null 
> |null | |103 |2018-05-23 22:18:38.962|null |null | |101 |2018-05-23 
> 22:18:38.562|null |null | |102 |2018-05-23 22:18:38.762|null |null | 
> +--------------+-----------------------+---------+-----------------------+}}
> {{This question is also asked in the stackoverflow. Please find the link 
> below}}
> {{[https://stackoverflow.com/questions/50500111/spark-structured-streaming-left-outer-joins-returns-outer-nulls-for-already-matc/55616902#55616902]}}
> {{ }}
> {{101 & 103 have already come in the join but still it is coming in the outer 
> left join.}}
>  



--
This message was sent by Atlassian Jira
(v8.3.2#803003)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to