viirya commented on a change in pull request #26935:
URL: https://github.com/apache/spark/pull/26935#discussion_r491241918
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala
##########
@@ -109,6 +109,41 @@ trait StateStore {
def hasCommitted: Boolean
}
+/** A versioned key-value store which is same as [[StateStore]], but
write-protected. */
Review comment:
For write-protected, we have `WrappedReadOnlyStateStore`.
"write-protected" sounds like it can be written, but just being protected.
However, `ReadOnlyStateStore` is actually read-only state store. Can we update
this comment?
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala
##########
@@ -79,6 +78,23 @@ private[state] class HDFSBackedStateStoreProvider extends
StateStoreProvider wit
// java.util.ConcurrentModificationException
type MapType = java.util.concurrent.ConcurrentHashMap[UnsafeRow, UnsafeRow]
+ class HDFSBackedReadOnlyStateStore(val version: Long, map: MapType)
+ extends ReadOnlyStateStore {
+
+ override def id: StateStoreId =
HDFSBackedStateStoreProvider.this.stateStoreId
+
+ override def get(key: UnsafeRow): UnsafeRow = map.get(key)
+
+ override def abort(): Unit = {}
Review comment:
Isn't this a `ReadOnlyStateStore`? Why need to override `abort`? Can't
we have `throwNotAllowed` too for `abort` in `ReadOnlyStateStore`?
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreRDD.scala
##########
@@ -76,7 +76,7 @@ class StateStoreRDD[T: ClassTag, U: ClassTag](
store = StateStore.get(
storeProviderId, keySchema, valueSchema, indexOrdinal, storeVersion,
- storeConf, hadoopConfBroadcast.value.value)
+ storeConf, hadoopConfBroadcast.value.value, readOnly)
val inputIter = dataRDD.iterator(partition, ctxt)
storeUpdateFunction(store, inputIter)
Review comment:
For read-only, do we still need to call `storeUpdateFunction`?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]