Github user HeartSaVioR commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21733#discussion_r208602231
  
    --- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/package.scala
 ---
    @@ -81,4 +85,221 @@ package object state {
             storeCoordinator)
         }
       }
    +
    +  /**
    +   * Base trait for state manager purposed to be used from streaming 
aggregations.
    +   */
    +  sealed trait StreamingAggregationStateManager extends Serializable {
    +
    +    /**
    +     * Extract columns consisting key from input row, and return the new 
row for key columns.
    +     *
    +     * @param row The input row.
    +     * @return The row instance which only contains key columns.
    +     */
    +    def getKey(row: InternalRow): UnsafeRow
    +
    +    /**
    +     * Calculate schema for the value of state. The schema is mainly 
passed to the StateStoreRDD.
    +     *
    +     * @return An instance of StructType representing schema for the value 
of state.
    +     */
    +    def getStateValueSchema: StructType
    +
    +    /**
    +     * Get the current value of a non-null key from the target state store.
    +     *
    +     * @param store The target StateStore instance.
    +     * @param key The key whose associated value is to be returned.
    +     * @return A non-null row if the key exists in the store, otherwise 
null.
    +     */
    +    def get(store: StateStore, key: UnsafeRow): UnsafeRow
    +
    +    /**
    +     * Put a new value for a non-null key to the target state store. Note 
that key will be
    +     * extracted from the input row, and the key would be same as the 
result of getKey(inputRow).
    +     *
    +     * @param store The target StateStore instance.
    +     * @param row The input row.
    +     */
    +    def put(store: StateStore, row: UnsafeRow): Unit
    +
    +    /**
    +     * Commit all the updates that have been made to the target state 
store, and return the
    +     * new version.
    +     *
    +     * @param store The target StateStore instance.
    +     * @return The new state version.
    +     */
    +    def commit(store: StateStore): Long
    +
    +    /**
    +     * Remove a single non-null key from the target state store.
    +     *
    +     * @param store The target StateStore instance.
    +     * @param key The key whose associated value is to be returned.
    +     */
    +    def remove(store: StateStore, key: UnsafeRow): Unit
    +
    +    /**
    +     * Return an iterator containing all the key-value pairs in target 
state store.
    --- End diff --
    
    Will address.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to