Kenn - shouldn't the Reify happen before the rewindow?

On Sun, Aug 23, 2020 at 11:08 AM Kenneth Knowles <[email protected]> wrote:

>
>
> On Sun, Aug 23, 2020 at 1:12 PM Dongwon Kim <[email protected]> wrote:
>
>> Hi Reuven,
>>
>> You and Kenneth are right; I thought GlobalWindows in unbounded streams
>> need triggers.
>>
>> p.apply(WithKeys.of(...).withKeyType(...))           // (A)
>>>   .apply(Window.into(FixedWindows.of(...)))        // (B)
>>>   .apply(Combine.perKey(new MyCombinFn()))  // (C)
>>>   .apply(Window.into(new GlobalWindows()))      // (E)
>>>   .apply(ParDo.of(new MyDoFn()))                      // (D)
>>
>>
>> So just adding (E) blurs windows and makes the state defined in MyDoFn
>> (D) a per-key state.
>> Hope I understand you and Kenneth correctly this time.
>>
>
> That is correct. However, I think you may want:
>
> p.apply(WithKeys.of(...).withKeyType(...))           // (A)
>>   .apply(Window.into(FixedWindows.of(...)))        // (B)
>>   .apply(Combine.perKey(new MyCombinFn()))  // (C)
>>   .apply(Window.into(new GlobalWindows()))      // (E)
>
>
>     .apply(Reify.windowsInValue()
> <https://beam.apache.org/releases/javadoc/2.23.0/index.html?org/apache/beam/sdk/transforms/Reify.html>)
>                          // (G)
>
>
>>   .apply(ParDo.of(new MyDoFn()))                      // (D)
>
>
> This will make the window information from (B) & (C) available to MyDoFn
> in (D)
>
> Kenn
>
>
>>
>> Best,
>>
>> Dongwon
>>
>> On Mon, Aug 24, 2020 at 1:51 AM Reuven Lax <[email protected]> wrote:
>>
>>> You could simply window into GlobalWindows and add a stateful DoFn
>>> afterwards. No need for the triggering and GroupByKey.
>>>
>>> On Sun, Aug 23, 2020 at 9:45 AM Dongwon Kim <[email protected]>
>>> wrote:
>>>
>>>> Hi Kenneth,
>>>>
>>>> According to your suggestion, I modified my pipeline as follows:
>>>>
>>>> p.apply(WithKeys.of(...).withKeyType(...))
>>>>>    // (A)
>>>>>   .apply(Window.into(FixedWindows.of(...)))
>>>>>    // (B)
>>>>>   .apply(Combine.perKey(new MyCombinFn()))                        //
>>>>> (C)
>>>>>   .apply(
>>>>>     Window
>>>>>       .into(new GlobalWindows())
>>>>>           // (E1)
>>>>>       .triggering(
>>>>>         Repeatedly.forever(AfterPane.elementCountAtLeast(1)   // (E2)
>>>>>       )
>>>>>       .accumulatingFiredPanes()
>>>>>          // (E3)
>>>>>   )
>>>>>   .apply(GroupByKey.create())
>>>>>          // (F)
>>>>>   .apply(ParDo.of(new MyDoFn()))
>>>>>       // (D)
>>>>
>>>>
>>>> I had to include (E1), (E2), (E3), and (F) so that MyDoFn (D) can
>>>> iterate over a list of output records from (C) sharing the same key.
>>>> This way I can achieve the same effect without having a per-key state
>>>> at (D).
>>>>
>>>> Do I understand your intention correctly?
>>>> If not, please advise me with some hints on it.
>>>>
>>>> Thanks,
>>>>
>>>> Dongwon
>>>>
>>>>
>>>> On Sun, Aug 23, 2020 at 5:10 AM Kenneth Knowles <[email protected]>
>>>> wrote:
>>>>
>>>>> Hi Dongwon,
>>>>>
>>>>> On Sat, Aug 22, 2020 at 2:46 PM Dongwon Kim <[email protected]>
>>>>> wrote:
>>>>>
>>>>>> Hi all,
>>>>>>
>>>>>> I'm using Beam 2.23.0 with FlinkRunner and a part of my unbounded
>>>>>> pipeline looks like below:
>>>>>>
>>>>>>> p.apply(WithKeys.of(...).withKeyType(...)) // (A)
>>>>>>>   .apply(Window.into(FixedWindows.of(...)))        // (B)
>>>>>>
>>>>>>   .apply(Combine.perKey(new MyCombinFn()))  // (C)
>>>>>>
>>>>>>   .apply(ParDo.of(new MyDoFn()))                      // (D)
>>>>>>
>>>>>>
>>>>>> What I want to do is
>>>>>> (1) to group data by key (A) and window (B),
>>>>>> (2) to do some aggregation (C)
>>>>>> (3) to perform the final computation on each group (D)
>>>>>>
>>>>>> I've noticed that a ValueState for a particular key is NULL whenever
>>>>>> a new window for the key is arriving, which gives me a feeling that Beam
>>>>>> seems to support only per-key+window state, not per-key state, after
>>>>>> windowing.
>>>>>>
>>>>>> I usually work with Flink DataStream API and Flink supports both
>>>>>> per-key state and per-key+window state [1].
>>>>>>
>>>>>> Does Beam support per-key states, not per-key+window states, after
>>>>>> windowing (D)? If I miss something, please correct me.
>>>>>>
>>>>>
>>>>> You understand correctly - Beam does not include per-key state that
>>>>> crosses window boundaries. If I understand your goal correctly, you can
>>>>> achieve the same effect by copying the window metadata into the element 
>>>>> and
>>>>> then re-windowing into the global window before (D).
>>>>>
>>>>> Kenn
>>>>>
>>>>>
>>>>>>
>>>>>> [1]
>>>>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html#using-per-window-state-in-processwindowfunction
>>>>>>
>>>>>> Best,
>>>>>>
>>>>>> Dongwon
>>>>>>
>>>>>>

Reply via email to