HeartSaVioR commented on a change in pull request #26108: [SPARK-26154][SS] 
Streaming left/right outer join should not return outer nulls for already 
matched rows
URL: https://github.com/apache/spark/pull/26108#discussion_r339282692
 
 

 ##########
 File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManager.scala
 ##########
 @@ -435,9 +541,10 @@ class SymmetricHashJoinStateManager(
     }
 
     /** Put new value for key at the given index */
-    def put(key: UnsafeRow, valueIndex: Long, value: UnsafeRow): Unit = {
+    def put(key: UnsafeRow, valueIndex: Long, value: UnsafeRow, matched: 
Boolean): Unit = {
       val keyWithIndex = keyWithIndexRow(key, valueIndex)
-      stateStore.put(keyWithIndex, value)
+      val valueWithMatched = valueRowConverter.convertToValueRow(value, 
matched)
 
 Review comment:
   Even we write v2 data here, we only update part of state instead of full 
rewrite of state. It will leave v1 / v2 data co-exist and make state store not 
possible to read the entire rows. 
   
   Individual row doesn't have schema information. Actually we don't even have 
schema information for state store as well so we would encounter undefined 
behavior if we do such unsafe thing. SPARK-27237 (#24173) proposes to introduce 
schema information per state (not per row) and fail the query if schema is not 
compatible.
   
   We can still rewrite the state entirely in a batch query (cannot be online 
modification) - please refer the README of 
https://github.com/HeartSaVioR/spark-state-tools. I've already implemented the 
migration of v1 to v2 for streaming aggregation / flatMapGroupsWithState, and I 
may try to implement this migration as well. I'm also proposing the project as 
a part of Apache Spark, but not as it is - I'm proposing batch data source for 
"state" first (SPARK-28190), and once we adopt it, we could also consider to 
have some tools for helping migration. Flink recently added the state API.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to