Also I forgot to mention that keys will not be repeating frequently in a window.
Thanks, Chandan On Tue, Dec 4, 2018 at 1:49 PM Chandan Biswas <[email protected]> wrote: > Thanks Lukasz and Steve for replying quickly. Sorry for not be clear > enough. But my use case is something like Steve mentioned. So when I am > reading the data from stream, I need to figure out that the data is coming > from stream is not duplicate for the key. So I need to check the all the > historical data for that key stored in Hbase and the table size is like > 1TB. The output of the processing is stored in the same Hbase table. Please > let me know if you need more context. > > Thanks, > Chandan > > On Tue, Dec 4, 2018 at 11:32 AM Steve Niemitz <[email protected]> > wrote: > >> interesting to know that the state scales so well! >> >> On Tue, Dec 4, 2018 at 12:21 PM Lukasz Cwik <[email protected]> wrote: >> >>> Your correct in saying that StatefulDoFn is pointless if you only see >>> every key+window once. The users description wasn't exactly clear but it >>> seemed to me they were reading from a stream and wanted to store all old >>> values that they had seen implying they see keys more then once. The user >>> would need to ensure that the windowing strategy triggers more then once >>> for my suggestion to be useful (e.g. global window with after element count >>> trigger) but without further details my suggestion is a guess. >>> >>> Also. the implementation for state storage is Runner dependent but I am >>> aware of users storing very large amounts (>> 1 TiB) within state on >>> Dataflow and in general scales very well with the number of keys and >>> windows. >>> >>> On Tue, Dec 4, 2018 at 8:01 AM Steve Niemitz <[email protected]> >>> wrote: >>> >>>> We have a similar use case, except with BigtableIO instead of HBase. >>>> >>>> We ended up building a custom transform that was basically >>>> PCollection[ByteString] -> PCollection[BigtableRow], and would fetch rows >>>> from Bigtable based on the input, however it's tricky to get right because >>>> of batching, etc. >>>> >>>> I'm curious how a StatefulDoFn would help here, it seems like it's more >>>> of just a cache than an actual join (and in my use-case we're never reading >>>> a key more than once so a cache wouldn't help here anyways). Also I'd be >>>> interested to see how the state storage performs with "large" amounts of >>>> state. We're reading ~1 TB of data from Bigtable in a run, and it doesn't >>>> seem reasonable to store that all in a DoFn's state. >>>> >>>> >>>> >>>> On Tue, Dec 4, 2018 at 1:23 AM Lukasz Cwik <[email protected]> wrote: >>>> >>>>> What about a StatefulDoFn where you append the value(s) in a bag state >>>>> as you see them? >>>>> >>>>> If you need to seed the state information, you could do a one time >>>>> lookup in processElement for each key to HBase if the key hasn't yet been >>>>> seen (storing the fact that you loaded the data in a boolean) but >>>>> afterwards you would rely on reading the value(s) from the bag state. >>>>> >>>>> processElement(...) { >>>>> Value newValue = ... >>>>> Iterable<Value> values; >>>>> if (!hasSeenKeyBooleanValueState.read()) { >>>>> values = ... load from HBase ... >>>>> valuesBagState.append(values); >>>>> hasSeenKeyBooleanValueState.set(true); >>>>> } else { >>>>> values = valuesBagState.read(); >>>>> } >>>>> ... perform processing using values ... >>>>> >>>>> valuesBagState.append(newValue); >>>>> } >>>>> >>>>> This blog post[1] has a good example. >>>>> >>>>> 1: https://beam.apache.org/blog/2017/02/13/stateful-processing.html >>>>> >>>>> On Mon, Dec 3, 2018 at 12:48 PM Chandan Biswas <[email protected]> >>>>> wrote: >>>>> >>>>>> Hello All, >>>>>> I have a use case where I have PCollection<KV<Key,Value>> data coming >>>>>> from Kafka source. When processing each record (KV<Key,Value>) I need all >>>>>> old values for that Key stored in a hbase table. The naive approach is to >>>>>> do HBase lookup in the DoFn.processElement. I considered sideinput but >>>>>> it' >>>>>> not going to work because of large dataset. Any suggestion? >>>>>> >>>>>> Thanks, >>>>>> Chandan >>>>>> >>>>> > > -- > Thanks, > *Chandan Biswas* > -- Thanks, *Chandan Biswas*
