Github user tdas commented on a diff in the pull request:
https://github.com/apache/spark/pull/18503#discussion_r128122780
--- Diff:
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala
---
@@ -413,21 +417,25 @@ private[state] class HDFSBackedStateStoreProvider
extends StateStoreProvider wit
throw new IOException(
s"Error reading snapshot file $fileToRead of $this: key size
cannot be $keySize")
} else {
- val keyRowBuffer = new Array[Byte](keySize)
+ // If key size in an existing file is not a multiple of 8, round
it to multiple of 8
+ val keyAllocationSize = ((keySize + 7) / 8) * 8
+ val keyRowBuffer = new Array[Byte](keyAllocationSize)
ByteStreams.readFully(input, keyRowBuffer, 0, keySize)
val keyRow = new UnsafeRow(keySchema.fields.length)
- keyRow.pointTo(keyRowBuffer, keySize)
+ keyRow.pointTo(keyRowBuffer, keyAllocationSize)
val valueSize = input.readInt()
if (valueSize < 0) {
throw new IOException(
s"Error reading snapshot file $fileToRead of $this: value
size cannot be $valueSize")
} else {
- val valueRowBuffer = new Array[Byte](valueSize)
+ // If value size in an existing file is not a multiple of 8,
round it to multiple of 8
+ val valueAllocationSize = ((valueSize + 7) / 8) * 8
--- End diff --
For example, how about something like this
`class UnsafeRow { def readFromStream(bytes: Int): this }`
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]