GitHub user tdas opened a pull request:
https://github.com/apache/spark/pull/16850
[SPARK-19413][SS] MapGroupsWithState for arbitrary stateful operations
## What changes were proposed in this pull request?
`mapGroupsWithState` is a new API for arbitrary stateful operations in
Structured Streaming, similar to `DStream.mapWithState`
*Requirements*
- Users should be able to specify a function that can do the following
- Access the input row corresponding to a key
- Access the previous state corresponding to a key
- Optionally, update or remove the state
- Output any number of new rows (or none at all)
*Proposed API*
```
// ------------ New methods on KeyValueGroupedDataset ------------
class KeyValueGroupedDataset[K, V] {
// Scala friendly
def mapGroupsWithState[S: Encoder, U: Encoder](func: (K, Iterator[V],
KeyedState[S]) => U)
def flatMapGroupsWithState[S: Encode, U: Encoder](func: (K,
Iterator[V], KeyedState[S]) => Iterator[U])
// Java friendly
def mapGroupsWithState[S, U](func: MapGroupsWithStateFunction[K, V,
S, R], stateEncoder: Encoder[S], resultEncoder: Encoder[U])
def flatMapGroupsWithState[S, U](func:
FlatMapGroupsWithStateFunction[K, V, S, R], stateEncoder: Encoder[S],
resultEncoder: Encoder[U])
}
// ------------------- New Java-friendly function classes
-------------------
public interface MapGroupsWithStateFunction<K, V, S, R> extends
Serializable {
R call(K key, Iterator<V> values, state: KeyedState<S>) throws Exception;
}
public interface FlatMapGroupsWithStateFunction<K, V, S, R> extends
Serializable {
Iterator<R> call(K key, Iterator<V> values, state: KeyedState<S>) throws
Exception;
}
// ---------------------- Wrapper class for state data
----------------------
trait KeyedState[S] {
def exists(): Boolean
def get(): S // throws Exception is state does not
exist
def getOption(): Option[S]
def update(newState: S): Unit
def remove(): Unit // exists() will be false after this
}
```
Key Semantics of the State class
- The state can be null.
- If the state.remove() is called, then state.exists() will return false,
and getOption will returm None.
- After that state.update(newState) is called, then state.exists() will
return true, and getOption will return Some(...).
- None of the operations are thread-safe. This is to avoid memory barriers.
*Usage*
```
val stateFunc = (word: String, words: Iterator[String, runningCount:
KeyedState[Long]) => {
val newCount = words.size + runningCount.getOption.getOrElse(0L)
runningCount.update(newCount)
(word, newCount)
}
dataset // type
is Dataset[String]
.groupByKey[String](w => w) // generates
KeyValueGroupedDataset[String, String]
.mapGroupsWithState[Long, (String, Long)](stateFunc) // returns
Dataset[(String, Long)]
```
## How was this patch tested?
New unit tests.
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/tdas/spark mapWithState-branch-2.1
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/spark/pull/16850.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #16850
----
commit 5025cb7511a43e24cb3a181eb7b06c69b024479f
Author: Tathagata Das <[email protected]>
Date: 2017-02-08T04:21:00Z
[SPARK-19413][SS] MapGroupsWithState for arbitrary stateful operations
`mapGroupsWithState` is a new API for arbitrary stateful operations in
Structured Streaming, similar to `DStream.mapWithState`
*Requirements*
- Users should be able to specify a function that can do the following
- Access the input row corresponding to a key
- Access the previous state corresponding to a key
- Optionally, update or remove the state
- Output any number of new rows (or none at all)
*Proposed API*
```
// ------------ New methods on KeyValueGroupedDataset ------------
class KeyValueGroupedDataset[K, V] {
// Scala friendly
def mapGroupsWithState[S: Encoder, U: Encoder](func: (K, Iterator[V],
KeyedState[S]) => U)
def flatMapGroupsWithState[S: Encode, U: Encoder](func: (K,
Iterator[V], KeyedState[S]) => Iterator[U])
// Java friendly
def mapGroupsWithState[S, U](func: MapGroupsWithStateFunction[K, V,
S, R], stateEncoder: Encoder[S], resultEncoder: Encoder[U])
def flatMapGroupsWithState[S, U](func:
FlatMapGroupsWithStateFunction[K, V, S, R], stateEncoder: Encoder[S],
resultEncoder: Encoder[U])
}
// ------------------- New Java-friendly function classes
-------------------
public interface MapGroupsWithStateFunction<K, V, S, R> extends
Serializable {
R call(K key, Iterator<V> values, state: KeyedState<S>) throws Exception;
}
public interface FlatMapGroupsWithStateFunction<K, V, S, R> extends
Serializable {
Iterator<R> call(K key, Iterator<V> values, state: KeyedState<S>) throws
Exception;
}
// ---------------------- Wrapper class for state data
----------------------
trait State[S] {
def exists(): Boolean
def get(): S // throws Exception is state does not
exist
def getOption(): Option[S]
def update(newState: S): Unit
def remove(): Unit // exists() will be false after this
}
```
Key Semantics of the State class
- The state can be null.
- If the state.remove() is called, then state.exists() will return false,
and getOption will returm None.
- After that state.update(newState) is called, then state.exists() will
return true, and getOption will return Some(...).
- None of the operations are thread-safe. This is to avoid memory barriers.
*Usage*
```
val stateFunc = (word: String, words: Iterator[String, runningCount:
KeyedState[Long]) => {
val newCount = words.size + runningCount.getOption.getOrElse(0L)
runningCount.update(newCount)
(word, newCount)
}
dataset // type
is Dataset[String]
.groupByKey[String](w => w) // generates
KeyValueGroupedDataset[String, String]
.mapGroupsWithState[Long, (String, Long)](stateFunc) // returns
Dataset[(String, Long)]
```
New unit tests.
Author: Tathagata Das <[email protected]>
Closes #16758 from tdas/mapWithState.
----
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]