Github user HeartSaVioR commented on a diff in the pull request:
https://github.com/apache/spark/pull/21733#discussion_r208602231
--- Diff:
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/package.scala
---
@@ -81,4 +85,221 @@ package object state {
storeCoordinator)
}
}
+
+ /**
+ * Base trait for state manager purposed to be used from streaming
aggregations.
+ */
+ sealed trait StreamingAggregationStateManager extends Serializable {
+
+ /**
+ * Extract columns consisting key from input row, and return the new
row for key columns.
+ *
+ * @param row The input row.
+ * @return The row instance which only contains key columns.
+ */
+ def getKey(row: InternalRow): UnsafeRow
+
+ /**
+ * Calculate schema for the value of state. The schema is mainly
passed to the StateStoreRDD.
+ *
+ * @return An instance of StructType representing schema for the value
of state.
+ */
+ def getStateValueSchema: StructType
+
+ /**
+ * Get the current value of a non-null key from the target state store.
+ *
+ * @param store The target StateStore instance.
+ * @param key The key whose associated value is to be returned.
+ * @return A non-null row if the key exists in the store, otherwise
null.
+ */
+ def get(store: StateStore, key: UnsafeRow): UnsafeRow
+
+ /**
+ * Put a new value for a non-null key to the target state store. Note
that key will be
+ * extracted from the input row, and the key would be same as the
result of getKey(inputRow).
+ *
+ * @param store The target StateStore instance.
+ * @param row The input row.
+ */
+ def put(store: StateStore, row: UnsafeRow): Unit
+
+ /**
+ * Commit all the updates that have been made to the target state
store, and return the
+ * new version.
+ *
+ * @param store The target StateStore instance.
+ * @return The new state version.
+ */
+ def commit(store: StateStore): Long
+
+ /**
+ * Remove a single non-null key from the target state store.
+ *
+ * @param store The target StateStore instance.
+ * @param key The key whose associated value is to be returned.
+ */
+ def remove(store: StateStore, key: UnsafeRow): Unit
+
+ /**
+ * Return an iterator containing all the key-value pairs in target
state store.
--- End diff --
Will address.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]