Github user zsxwing commented on a diff in the pull request:
https://github.com/apache/spark/pull/18107#discussion_r119013976
--- Diff:
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala
---
@@ -102,28 +103,100 @@ trait StateStore {
}
-/** Trait representing a provider of a specific version of a
[[StateStore]]. */
+/**
+ * Trait representing a provider that provide [[StateStore]] instances
representing
+ * versions of state data.
+ *
+ * The life cycle of a provider and its provide stores are as follows.
+ *
+ * - A StateStoreProvider is created in a executor for each unique
[[StateStoreId]] when
+ * the first batch of a streaming query is executed on the executor. All
subsequent batches reuse
+ * this provider instance until the query is stopped.
+ *
+ * - Every batch of streaming data request a specific version of the state
data by invoking
+ * `getStore(version)` which returns an instance of [[StateStore]]
through which the required
+ * version of the data can be accessed. It is the responsible of the
provider to populate
+ * this store with context information like the schema of keys and
values, etc.
+ *
+ * - After the streaming query is stopped, the created provider instances
are lazily disposed off.
+ */
trait StateStoreProvider {
- /** Get the store with the existing version. */
+ /**
+ * Initialize the provide with more contextual information from the SQL
operator.
+ * This method will be called first after creating an instance of the
StateStoreProvider by
+ * reflection.
+ *
+ * @param stateStoreId Id of the versioned StateStores that this
provider will generate
+ * @param keySchema Schema of keys to be stored
+ * @param valueSchema Schema of value to be stored
+ * @param keyIndexOrdinal Optional column (represent as the ordinal of
the field in keySchema) by
+ * which the StateStore implementation could
index the data.
+ * @param storeConfs Configurations used by the StateStores
+ * @param hadoopConf Hadoop configuration that could be used by
StateStore to save state data
+ */
+ def init(
+ stateStoreId: StateStoreId,
+ keySchema: StructType,
+ valueSchema: StructType,
+ keyIndexOrdinal: Option[Int], // for sorting the data by their keys
+ storeConfs: StateStoreConf,
+ hadoopConf: Configuration): Unit
+
+ /**
+ * Return the id of the StateStores this provider will generate.
+ * Should be the same as the one passed in init().
+ */
+ def id: StateStoreId
+
+ /** Called when the provider instance is unloaded from the executor */
+ def close(): Unit
+
+ /** Return an instance of [[StateStore]] representing state data of the
given version */
def getStore(version: Long): StateStore
- /** Optional method for providers to allow for background maintenance */
+ /** Optional method for providers to allow for background maintenance
(e.g. compactions) */
def doMaintenance(): Unit = { }
}
-
-/** Trait representing updates made to a [[StateStore]]. */
-sealed trait StoreUpdate {
- def key: UnsafeRow
- def value: UnsafeRow
+object StateStoreProvider {
+ /**
+ * Return a provider instance of the given provider class.
+ * The instance will be already initialized.
+ */
+ def instantiate(
+ providerClass: String,
+ stateStoreId: StateStoreId,
+ keySchema: StructType,
+ valueSchema: StructType,
+ indexOrdinal: Option[Int], // for sorting the data
+ storeConf: StateStoreConf,
+ hadoopConf: Configuration): StateStoreProvider = {
+ val provider = Utils.getContextOrSparkClassLoader
--- End diff --
nit: Use `Utils.classForName(providerClass)`.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]