interesting to know that the state scales so well!

On Tue, Dec 4, 2018 at 12:21 PM Lukasz Cwik <[email protected]> wrote:

> Your correct in saying that StatefulDoFn is pointless if you only see
> every key+window once. The users description wasn't exactly clear but it
> seemed to me they were reading from a stream and wanted to store all old
> values that they had seen implying they see keys more then once. The user
> would need to ensure that the windowing strategy triggers more then once
> for my suggestion to be useful (e.g. global window with after element count
> trigger) but without further details my suggestion is a guess.
>
> Also. the implementation for state storage is Runner dependent but I am
> aware of users storing very large amounts (>> 1 TiB) within state on
> Dataflow and in general scales very well with the number of keys and
> windows.
>
> On Tue, Dec 4, 2018 at 8:01 AM Steve Niemitz <[email protected]> wrote:
>
>> We have a similar use case, except with BigtableIO instead of HBase.
>>
>> We ended up building a custom transform that was basically
>> PCollection[ByteString] -> PCollection[BigtableRow], and would fetch rows
>> from Bigtable based on the input, however it's tricky to get right because
>> of batching, etc.
>>
>> I'm curious how a StatefulDoFn would help here, it seems like it's more
>> of just a cache than an actual join (and in my use-case we're never reading
>> a key more than once so a cache wouldn't help here anyways).  Also I'd be
>> interested to see how the state storage performs with "large" amounts of
>> state.  We're reading ~1 TB of data from Bigtable in a run, and it doesn't
>> seem reasonable to store that all in a DoFn's state.
>>
>>
>>
>> On Tue, Dec 4, 2018 at 1:23 AM Lukasz Cwik <[email protected]> wrote:
>>
>>> What about a StatefulDoFn where you append the value(s) in a bag state
>>> as you see them?
>>>
>>> If you need to seed the state information, you could do a one time
>>> lookup in processElement for each key to HBase if the key hasn't yet been
>>> seen (storing the fact that you loaded the data in a boolean) but
>>> afterwards you would rely on reading the value(s) from the bag state.
>>>
>>> processElement(...) {
>>>   Value newValue = ...
>>>   Iterable<Value> values;
>>>   if (!hasSeenKeyBooleanValueState.read()) {
>>>     values = ... load from HBase ...
>>>     valuesBagState.append(values);
>>>     hasSeenKeyBooleanValueState.set(true);
>>>   } else {
>>>     values = valuesBagState.read();
>>>   }
>>>   ... perform processing using values ...
>>>
>>>    valuesBagState.append(newValue);
>>> }
>>>
>>> This blog post[1] has a good example.
>>>
>>> 1: https://beam.apache.org/blog/2017/02/13/stateful-processing.html
>>>
>>> On Mon, Dec 3, 2018 at 12:48 PM Chandan Biswas <[email protected]>
>>> wrote:
>>>
>>>> Hello All,
>>>> I have a use case where I have PCollection<KV<Key,Value>> data coming
>>>> from Kafka source. When processing each record (KV<Key,Value>) I need all
>>>> old values for that Key stored in a hbase table. The naive approach is to
>>>> do HBase lookup in the DoFn.processElement. I considered sideinput but it'
>>>> not going to work because of large dataset. Any suggestion?
>>>>
>>>> Thanks,
>>>> Chandan
>>>>
>>>

Reply via email to