interesting to know that the state scales so well! On Tue, Dec 4, 2018 at 12:21 PM Lukasz Cwik <[email protected]> wrote:
> Your correct in saying that StatefulDoFn is pointless if you only see > every key+window once. The users description wasn't exactly clear but it > seemed to me they were reading from a stream and wanted to store all old > values that they had seen implying they see keys more then once. The user > would need to ensure that the windowing strategy triggers more then once > for my suggestion to be useful (e.g. global window with after element count > trigger) but without further details my suggestion is a guess. > > Also. the implementation for state storage is Runner dependent but I am > aware of users storing very large amounts (>> 1 TiB) within state on > Dataflow and in general scales very well with the number of keys and > windows. > > On Tue, Dec 4, 2018 at 8:01 AM Steve Niemitz <[email protected]> wrote: > >> We have a similar use case, except with BigtableIO instead of HBase. >> >> We ended up building a custom transform that was basically >> PCollection[ByteString] -> PCollection[BigtableRow], and would fetch rows >> from Bigtable based on the input, however it's tricky to get right because >> of batching, etc. >> >> I'm curious how a StatefulDoFn would help here, it seems like it's more >> of just a cache than an actual join (and in my use-case we're never reading >> a key more than once so a cache wouldn't help here anyways). Also I'd be >> interested to see how the state storage performs with "large" amounts of >> state. We're reading ~1 TB of data from Bigtable in a run, and it doesn't >> seem reasonable to store that all in a DoFn's state. >> >> >> >> On Tue, Dec 4, 2018 at 1:23 AM Lukasz Cwik <[email protected]> wrote: >> >>> What about a StatefulDoFn where you append the value(s) in a bag state >>> as you see them? >>> >>> If you need to seed the state information, you could do a one time >>> lookup in processElement for each key to HBase if the key hasn't yet been >>> seen (storing the fact that you loaded the data in a boolean) but >>> afterwards you would rely on reading the value(s) from the bag state. >>> >>> processElement(...) { >>> Value newValue = ... >>> Iterable<Value> values; >>> if (!hasSeenKeyBooleanValueState.read()) { >>> values = ... load from HBase ... >>> valuesBagState.append(values); >>> hasSeenKeyBooleanValueState.set(true); >>> } else { >>> values = valuesBagState.read(); >>> } >>> ... perform processing using values ... >>> >>> valuesBagState.append(newValue); >>> } >>> >>> This blog post[1] has a good example. >>> >>> 1: https://beam.apache.org/blog/2017/02/13/stateful-processing.html >>> >>> On Mon, Dec 3, 2018 at 12:48 PM Chandan Biswas <[email protected]> >>> wrote: >>> >>>> Hello All, >>>> I have a use case where I have PCollection<KV<Key,Value>> data coming >>>> from Kafka source. When processing each record (KV<Key,Value>) I need all >>>> old values for that Key stored in a hbase table. The naive approach is to >>>> do HBase lookup in the DoFn.processElement. I considered sideinput but it' >>>> not going to work because of large dataset. Any suggestion? >>>> >>>> Thanks, >>>> Chandan >>>> >>>
