Thanks Lukasz and Steve for replying quickly. Sorry for not be clear
enough. But my use case is something like Steve mentioned. So when I am
reading the data from stream, I need to figure out that the data is coming
from stream is not duplicate for the key. So I need to check the all the
historical data for that key stored in Hbase and the table size is like
1TB. The output of the processing is stored in the same Hbase table. Please
let me know if you need more context.

Thanks,
Chandan

On Tue, Dec 4, 2018 at 11:32 AM Steve Niemitz <[email protected]> wrote:

> interesting to know that the state scales so well!
>
> On Tue, Dec 4, 2018 at 12:21 PM Lukasz Cwik <[email protected]> wrote:
>
>> Your correct in saying that StatefulDoFn is pointless if you only see
>> every key+window once. The users description wasn't exactly clear but it
>> seemed to me they were reading from a stream and wanted to store all old
>> values that they had seen implying they see keys more then once. The user
>> would need to ensure that the windowing strategy triggers more then once
>> for my suggestion to be useful (e.g. global window with after element count
>> trigger) but without further details my suggestion is a guess.
>>
>> Also. the implementation for state storage is Runner dependent but I am
>> aware of users storing very large amounts (>> 1 TiB) within state on
>> Dataflow and in general scales very well with the number of keys and
>> windows.
>>
>> On Tue, Dec 4, 2018 at 8:01 AM Steve Niemitz <[email protected]> wrote:
>>
>>> We have a similar use case, except with BigtableIO instead of HBase.
>>>
>>> We ended up building a custom transform that was basically
>>> PCollection[ByteString] -> PCollection[BigtableRow], and would fetch rows
>>> from Bigtable based on the input, however it's tricky to get right because
>>> of batching, etc.
>>>
>>> I'm curious how a StatefulDoFn would help here, it seems like it's more
>>> of just a cache than an actual join (and in my use-case we're never reading
>>> a key more than once so a cache wouldn't help here anyways).  Also I'd be
>>> interested to see how the state storage performs with "large" amounts of
>>> state.  We're reading ~1 TB of data from Bigtable in a run, and it doesn't
>>> seem reasonable to store that all in a DoFn's state.
>>>
>>>
>>>
>>> On Tue, Dec 4, 2018 at 1:23 AM Lukasz Cwik <[email protected]> wrote:
>>>
>>>> What about a StatefulDoFn where you append the value(s) in a bag state
>>>> as you see them?
>>>>
>>>> If you need to seed the state information, you could do a one time
>>>> lookup in processElement for each key to HBase if the key hasn't yet been
>>>> seen (storing the fact that you loaded the data in a boolean) but
>>>> afterwards you would rely on reading the value(s) from the bag state.
>>>>
>>>> processElement(...) {
>>>>   Value newValue = ...
>>>>   Iterable<Value> values;
>>>>   if (!hasSeenKeyBooleanValueState.read()) {
>>>>     values = ... load from HBase ...
>>>>     valuesBagState.append(values);
>>>>     hasSeenKeyBooleanValueState.set(true);
>>>>   } else {
>>>>     values = valuesBagState.read();
>>>>   }
>>>>   ... perform processing using values ...
>>>>
>>>>    valuesBagState.append(newValue);
>>>> }
>>>>
>>>> This blog post[1] has a good example.
>>>>
>>>> 1: https://beam.apache.org/blog/2017/02/13/stateful-processing.html
>>>>
>>>> On Mon, Dec 3, 2018 at 12:48 PM Chandan Biswas <[email protected]>
>>>> wrote:
>>>>
>>>>> Hello All,
>>>>> I have a use case where I have PCollection<KV<Key,Value>> data coming
>>>>> from Kafka source. When processing each record (KV<Key,Value>) I need all
>>>>> old values for that Key stored in a hbase table. The naive approach is to
>>>>> do HBase lookup in the DoFn.processElement. I considered sideinput but it'
>>>>> not going to work because of large dataset. Any suggestion?
>>>>>
>>>>> Thanks,
>>>>> Chandan
>>>>>
>>>>

-- 
Thanks,
*Chandan Biswas*

Reply via email to