HeartSaVioR commented on a change in pull request #26108: [SPARK-26154][SS] 
Streaming left/right outer join should not return outer nulls for already 
matched rows
URL: https://github.com/apache/spark/pull/26108#discussion_r339282692
 
 

 ##########
 File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManager.scala
 ##########
 @@ -435,9 +541,10 @@ class SymmetricHashJoinStateManager(
     }
 
     /** Put new value for key at the given index */
-    def put(key: UnsafeRow, valueIndex: Long, value: UnsafeRow): Unit = {
+    def put(key: UnsafeRow, valueIndex: Long, value: UnsafeRow, matched: 
Boolean): Unit = {
       val keyWithIndex = keyWithIndexRow(key, valueIndex)
-      stateStore.put(keyWithIndex, value)
+      val valueWithMatched = valueRowConverter.convertToValueRow(value, 
matched)
 
 Review comment:
   Even we write v2 data here, we only update part of state instead of full 
rewrite of state. It will leave v1 / v2 data co-exist and make state store not 
possible to read the entire rows - individual row doesn't have schema 
information. That's why we separate versions and isolate them. Yes that's bad, 
but that's the current state.
   
   Actually we don't even have schema information for state store as well so we 
would encounter undefined behavior if we do such unsafe thing. SPARK-27237 
(#24173) proposes to introduce schema information per state (not per row) and 
fail the query if schema is not compatible.
   
   We can still rewrite the state entirely in a batch query (cannot be online 
modification) - please refer the README of 
https://github.com/HeartSaVioR/spark-state-tools. I've already implemented the 
migration of v1 to v2 for streaming aggregation / flatMapGroupsWithState, and I 
may try to implement this migration as well. I'm also proposing the project as 
a part of Apache Spark, but not as it is - I'm proposing batch data source for 
"state" first (SPARK-28190), and once we adopt it, we could also consider to 
have some tools for helping migration. Flink recently added the state API which 
denotes the necessity of the data source for "state".

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to