HeartSaVioR commented on code in PR #36073:
URL: https://github.com/apache/spark/pull/36073#discussion_r843414788
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManager.scala:
##########
@@ -272,12 +289,36 @@ class SymmetricHashJoinStateManager(
if (index != numValues - 1) {
val valuePairAtMaxIndex = keyWithIndexToValue.get(currentKey,
numValues - 1)
if (valuePairAtMaxIndex != null) {
+ // likely case where last element is non-null and we can simply
swap with index
keyWithIndexToValue.put(currentKey, index,
valuePairAtMaxIndex.value,
valuePairAtMaxIndex.matched)
} else {
- val projectedKey = getInternalRowOfKeyWithIndex(currentKey)
- logWarning(s"`keyWithIndexToValue` returns a null value for index
${numValues - 1} " +
- s"at current key $projectedKey.")
+ // Find the rightmost non null index and swap values with that
index,
+ // if index returned is not the same as the passed one
+ val nonNullIndex = getRightMostNonNullIndex(index +
1).getOrElse(index)
+ if (nonNullIndex != index) {
+ val valuePair = keyWithIndexToValue.get(currentKey, nonNullIndex)
+ keyWithIndexToValue.put(currentKey, index, valuePair.value,
+ valuePair.matched)
+ }
+
+ // If nulls were found at the end, get the projected key and log a
warning
+ // for the range of null indices.
+ if (nonNullIndex != numValues - 1) {
+ val projectedKey = getInternalRowOfKeyWithIndex(currentKey)
+ logWarning(s"`keyWithIndexToValue` returns a null value for
indices " +
+ s"with range from startIndex=${nonNullIndex + 1} " +
+ s"and endIndex=${numValues - 1} " +
+ s"at currentKey=$projectedKey.")
Review Comment:
Yeah there are two options, log or not. I don't think obfuscation (which can
be decodable by us) would work in anyway. We are assuming attackers here, who
are experts on figuring out how to decode in arbitrarily encoded one, and we
expose the code how to encode. What we can do is what they can eventually do.
I generally agree that logging data could be considered as a security issue.
If we are enforced to respect the clean and stable security policy then it
would have been very clear, but we are yet having such thing on the decision
and it is up to the reviewers/committers.
I'm OK with just logging the symptom without context on the data as of now,
and even probably need to propose hiding it in other places as well. Probably,
this seems to be a strong rationalization of having state data source, at least
read functionality; you stop the query if you figure out something is broken
(Spark will/should notice it), and be able to perform some query against the
checkpoint to figure out whether state is already broken or not, and if it is
broken, which rows are affected. Even better if we have basic write
functionality to fix the broken rows as well.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]