Thanks Lukasz for quick reply. On Tue, Dec 4, 2018 at 4:20 PM Lukasz Cwik <[email protected]> wrote:
> Is HBase only updated by the output within your pipeline or can an > external system also update the HBase data? If no, then querying HBase > within processElement is your best bet since your effectively trying to do > a sparse lookup with slowly changing data. > > > > On Tue, Dec 4, 2018 at 11:59 AM Chandan Biswas <[email protected]> > wrote: > >> Also I forgot to mention that keys will not be repeating frequently in a >> window. >> >> Thanks, >> Chandan >> >> On Tue, Dec 4, 2018 at 1:49 PM Chandan Biswas <[email protected]> >> wrote: >> >>> Thanks Lukasz and Steve for replying quickly. Sorry for not be clear >>> enough. But my use case is something like Steve mentioned. So when I am >>> reading the data from stream, I need to figure out that the data is coming >>> from stream is not duplicate for the key. So I need to check the all the >>> historical data for that key stored in Hbase and the table size is like >>> 1TB. The output of the processing is stored in the same Hbase table. Please >>> let me know if you need more context. >>> >>> Thanks, >>> Chandan >>> >>> On Tue, Dec 4, 2018 at 11:32 AM Steve Niemitz <[email protected]> >>> wrote: >>> >>>> interesting to know that the state scales so well! >>>> >>>> On Tue, Dec 4, 2018 at 12:21 PM Lukasz Cwik <[email protected]> wrote: >>>> >>>>> Your correct in saying that StatefulDoFn is pointless if you only see >>>>> every key+window once. The users description wasn't exactly clear but it >>>>> seemed to me they were reading from a stream and wanted to store all old >>>>> values that they had seen implying they see keys more then once. The user >>>>> would need to ensure that the windowing strategy triggers more then once >>>>> for my suggestion to be useful (e.g. global window with after element >>>>> count >>>>> trigger) but without further details my suggestion is a guess. >>>>> >>>>> Also. the implementation for state storage is Runner dependent but I >>>>> am aware of users storing very large amounts (>> 1 TiB) within state on >>>>> Dataflow and in general scales very well with the number of keys and >>>>> windows. >>>>> >>>>> On Tue, Dec 4, 2018 at 8:01 AM Steve Niemitz <[email protected]> >>>>> wrote: >>>>> >>>>>> We have a similar use case, except with BigtableIO instead of HBase. >>>>>> >>>>>> We ended up building a custom transform that was basically >>>>>> PCollection[ByteString] -> PCollection[BigtableRow], and would fetch rows >>>>>> from Bigtable based on the input, however it's tricky to get right >>>>>> because >>>>>> of batching, etc. >>>>>> >>>>>> I'm curious how a StatefulDoFn would help here, it seems like it's >>>>>> more of just a cache than an actual join (and in my use-case we're never >>>>>> reading a key more than once so a cache wouldn't help here anyways). >>>>>> Also >>>>>> I'd be interested to see how the state storage performs with "large" >>>>>> amounts of state. We're reading ~1 TB of data from Bigtable in a run, >>>>>> and >>>>>> it doesn't seem reasonable to store that all in a DoFn's state. >>>>>> >>>>>> >>>>>> >>>>>> On Tue, Dec 4, 2018 at 1:23 AM Lukasz Cwik <[email protected]> wrote: >>>>>> >>>>>>> What about a StatefulDoFn where you append the value(s) in a bag >>>>>>> state as you see them? >>>>>>> >>>>>>> If you need to seed the state information, you could do a one time >>>>>>> lookup in processElement for each key to HBase if the key hasn't yet >>>>>>> been >>>>>>> seen (storing the fact that you loaded the data in a boolean) but >>>>>>> afterwards you would rely on reading the value(s) from the bag state. >>>>>>> >>>>>>> processElement(...) { >>>>>>> Value newValue = ... >>>>>>> Iterable<Value> values; >>>>>>> if (!hasSeenKeyBooleanValueState.read()) { >>>>>>> values = ... load from HBase ... >>>>>>> valuesBagState.append(values); >>>>>>> hasSeenKeyBooleanValueState.set(true); >>>>>>> } else { >>>>>>> values = valuesBagState.read(); >>>>>>> } >>>>>>> ... perform processing using values ... >>>>>>> >>>>>>> valuesBagState.append(newValue); >>>>>>> } >>>>>>> >>>>>>> This blog post[1] has a good example. >>>>>>> >>>>>>> 1: https://beam.apache.org/blog/2017/02/13/stateful-processing.html >>>>>>> >>>>>>> On Mon, Dec 3, 2018 at 12:48 PM Chandan Biswas < >>>>>>> [email protected]> wrote: >>>>>>> >>>>>>>> Hello All, >>>>>>>> I have a use case where I have PCollection<KV<Key,Value>> data >>>>>>>> coming from Kafka source. When processing each record (KV<Key,Value>) I >>>>>>>> need all old values for that Key stored in a hbase table. The naive >>>>>>>> approach is to do HBase lookup in the DoFn.processElement. I considered >>>>>>>> sideinput but it' not going to work because of large dataset. Any >>>>>>>> suggestion? >>>>>>>> >>>>>>>> Thanks, >>>>>>>> Chandan >>>>>>>> >>>>>>> >>> >>> -- >>> Thanks, >>> *Chandan Biswas* >>> >> >> >> -- >> Thanks, >> *Chandan Biswas* >> > -- Thanks, *Chandan Biswas*
