Github user zsxwing commented on a diff in the pull request:

    https://github.com/apache/spark/pull/18107#discussion_r119013976
  
    --- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala
 ---
    @@ -102,28 +103,100 @@ trait StateStore {
     }
     
     
    -/** Trait representing a provider of a specific version of a 
[[StateStore]]. */
    +/**
    + * Trait representing a provider that provide [[StateStore]] instances 
representing
    + * versions of state data.
    + *
    + * The life cycle of a provider and its provide stores are as follows.
    + *
    + * - A StateStoreProvider is created in a executor for each unique 
[[StateStoreId]] when
    + *   the first batch of a streaming query is executed on the executor. All 
subsequent batches reuse
    + *   this provider instance until the query is stopped.
    + *
    + * - Every batch of streaming data request a specific version of the state 
data by invoking
    + *   `getStore(version)` which returns an instance of [[StateStore]] 
through which the required
    + *   version of the data can be accessed. It is the responsible of the 
provider to populate
    + *   this store with context information like the schema of keys and 
values, etc.
    + *
    + * - After the streaming query is stopped, the created provider instances 
are lazily disposed off.
    + */
     trait StateStoreProvider {
     
    -  /** Get the store with the existing version. */
    +  /**
    +   * Initialize the provide with more contextual information from the SQL 
operator.
    +   * This method will be called first after creating an instance of the 
StateStoreProvider by
    +   * reflection.
    +   *
    +   * @param stateStoreId Id of the versioned StateStores that this 
provider will generate
    +   * @param keySchema Schema of keys to be stored
    +   * @param valueSchema Schema of value to be stored
    +   * @param keyIndexOrdinal Optional column (represent as the ordinal of 
the field in keySchema) by
    +   *                        which the StateStore implementation could 
index the data.
    +   * @param storeConfs Configurations used by the StateStores
    +   * @param hadoopConf Hadoop configuration that could be used by 
StateStore to save state data
    +   */
    +  def init(
    +      stateStoreId: StateStoreId,
    +      keySchema: StructType,
    +      valueSchema: StructType,
    +      keyIndexOrdinal: Option[Int], // for sorting the data by their keys
    +      storeConfs: StateStoreConf,
    +      hadoopConf: Configuration): Unit
    +
    +  /**
    +   * Return the id of the StateStores this provider will generate.
    +   * Should be the same as the one passed in init().
    +   */
    +  def id: StateStoreId
    +
    +  /** Called when the provider instance is unloaded from the executor */
    +  def close(): Unit
    +
    +  /** Return an instance of [[StateStore]] representing state data of the 
given version */
       def getStore(version: Long): StateStore
     
    -  /** Optional method for providers to allow for background maintenance */
    +  /** Optional method for providers to allow for background maintenance 
(e.g. compactions) */
       def doMaintenance(): Unit = { }
     }
     
    -
    -/** Trait representing updates made to a [[StateStore]]. */
    -sealed trait StoreUpdate {
    -  def key: UnsafeRow
    -  def value: UnsafeRow
    +object StateStoreProvider {
    +  /**
    +   * Return a provider instance of the given provider class.
    +   * The instance will be already initialized.
    +   */
    +  def instantiate(
    +      providerClass: String,
    +      stateStoreId: StateStoreId,
    +      keySchema: StructType,
    +      valueSchema: StructType,
    +      indexOrdinal: Option[Int], // for sorting the data
    +      storeConf: StateStoreConf,
    +      hadoopConf: Configuration): StateStoreProvider = {
    +    val provider = Utils.getContextOrSparkClassLoader
    --- End diff --
    
    nit: Use `Utils.classForName(providerClass)`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to