vanzin commented on a change in pull request #26108: [SPARK-26154][SS]
Streaming left/right outer join should not return outer nulls for already
matched rows
URL: https://github.com/apache/spark/pull/26108#discussion_r339737885
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManager.scala
##########
@@ -435,9 +541,10 @@ class SymmetricHashJoinStateManager(
}
/** Put new value for key at the given index */
- def put(key: UnsafeRow, valueIndex: Long, value: UnsafeRow): Unit = {
+ def put(key: UnsafeRow, valueIndex: Long, value: UnsafeRow, matched:
Boolean): Unit = {
val keyWithIndex = keyWithIndexRow(key, valueIndex)
- stateStore.put(keyWithIndex, value)
+ val valueWithMatched = valueRowConverter.convertToValueRow(value,
matched)
Review comment:
Ok, think I got it. Seems like even if we add some code that would write new
data in the v2 format, the state store still may have data in v1 format that
can trigger the correctness issue. So it's a small gain in the end, although
that old state data would be constrained to state generated before the "fixed"
Spark was used.
Given that I think that failing the query when the bad combination is
detected is better (I see you already made that change).
One thing is that a "rewrite" of the state in this case, at least, is not a
good option. The data is missing from the v1 data, so there's no way to
generate correct v2 data from it. The only thing you'd achieve by doing that is
to mask errors (since the exception you're adding wouldn't be triggered
anymore).
So unless it's possible to have a "mixed" solution here, where the same
store can have both v1 and v2 data, erroring out is the best outcome I can
think of. I can think of ways to hack it (you could e.g. compare the column
count for the `UnsafeRow` read from the store and see if it matches the count
in the expected schema, or if it has the extra fields expected by the v2 data),
but haven't thoroughly thought about it.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]