Re: Support of per-key state after windowing

2020-08-23 Thread Dongwon Kim
Reuven and Kenneth,

Thanks for the tip!

Now I can get window information without having to modify the type of my
aggregator :-)

Best,

Dongwon

On Mon, Aug 24, 2020 at 3:16 AM Reuven Lax  wrote:

> Kenn - shouldn't the Reify happen before the rewindow?
>
> On Sun, Aug 23, 2020 at 11:08 AM Kenneth Knowles  wrote:
>
>>
>>
>> On Sun, Aug 23, 2020 at 1:12 PM Dongwon Kim 
>> wrote:
>>
>>> Hi Reuven,
>>>
>>> You and Kenneth are right; I thought GlobalWindows in unbounded streams
>>> need triggers.
>>>
>>> p.apply(WithKeys.of(...).withKeyType(...))   // (A)
   .apply(Window.into(FixedWindows.of(...)))// (B)
   .apply(Combine.perKey(new MyCombinFn()))  // (C)
   .apply(Window.into(new GlobalWindows()))  // (E)
   .apply(ParDo.of(new MyDoFn()))  // (D)
>>>
>>>
>>> So just adding (E) blurs windows and makes the state defined in MyDoFn
>>> (D) a per-key state.
>>> Hope I understand you and Kenneth correctly this time.
>>>
>>
>> That is correct. However, I think you may want:
>>
>> p.apply(WithKeys.of(...).withKeyType(...))   // (A)
>>>   .apply(Window.into(FixedWindows.of(...)))// (B)
>>>   .apply(Combine.perKey(new MyCombinFn()))  // (C)
>>>   .apply(Window.into(new GlobalWindows()))  // (E)
>>
>>
>> .apply(Reify.windowsInValue()
>> )
>>  // (G)
>>
>>
>>>   .apply(ParDo.of(new MyDoFn()))  // (D)
>>
>>
>> This will make the window information from (B) & (C) available to MyDoFn
>> in (D)
>>
>> Kenn
>>
>>
>>>
>>> Best,
>>>
>>> Dongwon
>>>
>>> On Mon, Aug 24, 2020 at 1:51 AM Reuven Lax  wrote:
>>>
 You could simply window into GlobalWindows and add a stateful DoFn
 afterwards. No need for the triggering and GroupByKey.

 On Sun, Aug 23, 2020 at 9:45 AM Dongwon Kim 
 wrote:

> Hi Kenneth,
>
> According to your suggestion, I modified my pipeline as follows:
>
> p.apply(WithKeys.of(...).withKeyType(...))
>>  // (A)
>>   .apply(Window.into(FixedWindows.of(...)))
>>// (B)
>>   .apply(Combine.perKey(new MyCombinFn()))//
>> (C)
>>   .apply(
>> Window
>>   .into(new GlobalWindows())
>>   // (E1)
>>   .triggering(
>> Repeatedly.forever(AfterPane.elementCountAtLeast(1)   // (E2)
>>   )
>>   .accumulatingFiredPanes()
>>// (E3)
>>   )
>>   .apply(GroupByKey.create())
>>// (F)
>>   .apply(ParDo.of(new MyDoFn()))
>>   // (D)
>
>
> I had to include (E1), (E2), (E3), and (F) so that MyDoFn (D) can
> iterate over a list of output records from (C) sharing the same key.
> This way I can achieve the same effect without having a per-key state
> at (D).
>
> Do I understand your intention correctly?
> If not, please advise me with some hints on it.
>
> Thanks,
>
> Dongwon
>
>
> On Sun, Aug 23, 2020 at 5:10 AM Kenneth Knowles 
> wrote:
>
>> Hi Dongwon,
>>
>> On Sat, Aug 22, 2020 at 2:46 PM Dongwon Kim 
>> wrote:
>>
>>> Hi all,
>>>
>>> I'm using Beam 2.23.0 with FlinkRunner and a part of my unbounded
>>> pipeline looks like below:
>>>
 p.apply(WithKeys.of(...).withKeyType(...)) // (A)
   .apply(Window.into(FixedWindows.of(...)))// (B)
>>>
>>>   .apply(Combine.perKey(new MyCombinFn()))  // (C)
>>>
>>>   .apply(ParDo.of(new MyDoFn()))  // (D)
>>>
>>>
>>> What I want to do is
>>> (1) to group data by key (A) and window (B),
>>> (2) to do some aggregation (C)
>>> (3) to perform the final computation on each group (D)
>>>
>>> I've noticed that a ValueState for a particular key is NULL whenever
>>> a new window for the key is arriving, which gives me a feeling that Beam
>>> seems to support only per-key+window state, not per-key state, after
>>> windowing.
>>>
>>> I usually work with Flink DataStream API and Flink supports both
>>> per-key state and per-key+window state [1].
>>>
>>> Does Beam support per-key states, not per-key+window states, after
>>> windowing (D)? If I miss something, please correct me.
>>>
>>
>> You understand correctly - Beam does not include per-key state that
>> crosses window boundaries. If I understand your goal correctly, you can
>> achieve the same effect by copying the window metadata into the element 
>> and
>> then re-windowing into the global window before (D).
>>
>> Kenn
>>
>>
>>>
>>> [1]
>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html#using-per-window-state-in-processwindowfunction
>>>
>>> Best,
>>>
>>> Dongwon
>>>
>>>


Re: Support of per-key state after windowing

2020-08-23 Thread Kenneth Knowles
Yes :-)

On Sun, Aug 23, 2020 at 2:16 PM Reuven Lax  wrote:

> Kenn - shouldn't the Reify happen before the rewindow?
>
> On Sun, Aug 23, 2020 at 11:08 AM Kenneth Knowles  wrote:
>
>>
>>
>> On Sun, Aug 23, 2020 at 1:12 PM Dongwon Kim 
>> wrote:
>>
>>> Hi Reuven,
>>>
>>> You and Kenneth are right; I thought GlobalWindows in unbounded streams
>>> need triggers.
>>>
>>> p.apply(WithKeys.of(...).withKeyType(...))   // (A)
   .apply(Window.into(FixedWindows.of(...)))// (B)
   .apply(Combine.perKey(new MyCombinFn()))  // (C)
   .apply(Window.into(new GlobalWindows()))  // (E)
   .apply(ParDo.of(new MyDoFn()))  // (D)
>>>
>>>
>>> So just adding (E) blurs windows and makes the state defined in MyDoFn
>>> (D) a per-key state.
>>> Hope I understand you and Kenneth correctly this time.
>>>
>>
>> That is correct. However, I think you may want:
>>
>> p.apply(WithKeys.of(...).withKeyType(...))   // (A)
>>>   .apply(Window.into(FixedWindows.of(...)))// (B)
>>>   .apply(Combine.perKey(new MyCombinFn()))  // (C)
>>>   .apply(Window.into(new GlobalWindows()))  // (E)
>>
>>
>> .apply(Reify.windowsInValue()
>> )
>>  // (G)
>>
>>
>>>   .apply(ParDo.of(new MyDoFn()))  // (D)
>>
>>
>> This will make the window information from (B) & (C) available to MyDoFn
>> in (D)
>>
>> Kenn
>>
>>
>>>
>>> Best,
>>>
>>> Dongwon
>>>
>>> On Mon, Aug 24, 2020 at 1:51 AM Reuven Lax  wrote:
>>>
 You could simply window into GlobalWindows and add a stateful DoFn
 afterwards. No need for the triggering and GroupByKey.

 On Sun, Aug 23, 2020 at 9:45 AM Dongwon Kim 
 wrote:

> Hi Kenneth,
>
> According to your suggestion, I modified my pipeline as follows:
>
> p.apply(WithKeys.of(...).withKeyType(...))
>>  // (A)
>>   .apply(Window.into(FixedWindows.of(...)))
>>// (B)
>>   .apply(Combine.perKey(new MyCombinFn()))//
>> (C)
>>   .apply(
>> Window
>>   .into(new GlobalWindows())
>>   // (E1)
>>   .triggering(
>> Repeatedly.forever(AfterPane.elementCountAtLeast(1)   // (E2)
>>   )
>>   .accumulatingFiredPanes()
>>// (E3)
>>   )
>>   .apply(GroupByKey.create())
>>// (F)
>>   .apply(ParDo.of(new MyDoFn()))
>>   // (D)
>
>
> I had to include (E1), (E2), (E3), and (F) so that MyDoFn (D) can
> iterate over a list of output records from (C) sharing the same key.
> This way I can achieve the same effect without having a per-key state
> at (D).
>
> Do I understand your intention correctly?
> If not, please advise me with some hints on it.
>
> Thanks,
>
> Dongwon
>
>
> On Sun, Aug 23, 2020 at 5:10 AM Kenneth Knowles 
> wrote:
>
>> Hi Dongwon,
>>
>> On Sat, Aug 22, 2020 at 2:46 PM Dongwon Kim 
>> wrote:
>>
>>> Hi all,
>>>
>>> I'm using Beam 2.23.0 with FlinkRunner and a part of my unbounded
>>> pipeline looks like below:
>>>
 p.apply(WithKeys.of(...).withKeyType(...)) // (A)
   .apply(Window.into(FixedWindows.of(...)))// (B)
>>>
>>>   .apply(Combine.perKey(new MyCombinFn()))  // (C)
>>>
>>>   .apply(ParDo.of(new MyDoFn()))  // (D)
>>>
>>>
>>> What I want to do is
>>> (1) to group data by key (A) and window (B),
>>> (2) to do some aggregation (C)
>>> (3) to perform the final computation on each group (D)
>>>
>>> I've noticed that a ValueState for a particular key is NULL whenever
>>> a new window for the key is arriving, which gives me a feeling that Beam
>>> seems to support only per-key+window state, not per-key state, after
>>> windowing.
>>>
>>> I usually work with Flink DataStream API and Flink supports both
>>> per-key state and per-key+window state [1].
>>>
>>> Does Beam support per-key states, not per-key+window states, after
>>> windowing (D)? If I miss something, please correct me.
>>>
>>
>> You understand correctly - Beam does not include per-key state that
>> crosses window boundaries. If I understand your goal correctly, you can
>> achieve the same effect by copying the window metadata into the element 
>> and
>> then re-windowing into the global window before (D).
>>
>> Kenn
>>
>>
>>>
>>> [1]
>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html#using-per-window-state-in-processwindowfunction
>>>
>>> Best,
>>>
>>> Dongwon
>>>
>>>


Re: Support of per-key state after windowing

2020-08-23 Thread Reuven Lax
Kenn - shouldn't the Reify happen before the rewindow?

On Sun, Aug 23, 2020 at 11:08 AM Kenneth Knowles  wrote:

>
>
> On Sun, Aug 23, 2020 at 1:12 PM Dongwon Kim  wrote:
>
>> Hi Reuven,
>>
>> You and Kenneth are right; I thought GlobalWindows in unbounded streams
>> need triggers.
>>
>> p.apply(WithKeys.of(...).withKeyType(...))   // (A)
>>>   .apply(Window.into(FixedWindows.of(...)))// (B)
>>>   .apply(Combine.perKey(new MyCombinFn()))  // (C)
>>>   .apply(Window.into(new GlobalWindows()))  // (E)
>>>   .apply(ParDo.of(new MyDoFn()))  // (D)
>>
>>
>> So just adding (E) blurs windows and makes the state defined in MyDoFn
>> (D) a per-key state.
>> Hope I understand you and Kenneth correctly this time.
>>
>
> That is correct. However, I think you may want:
>
> p.apply(WithKeys.of(...).withKeyType(...))   // (A)
>>   .apply(Window.into(FixedWindows.of(...)))// (B)
>>   .apply(Combine.perKey(new MyCombinFn()))  // (C)
>>   .apply(Window.into(new GlobalWindows()))  // (E)
>
>
> .apply(Reify.windowsInValue()
> )
>  // (G)
>
>
>>   .apply(ParDo.of(new MyDoFn()))  // (D)
>
>
> This will make the window information from (B) & (C) available to MyDoFn
> in (D)
>
> Kenn
>
>
>>
>> Best,
>>
>> Dongwon
>>
>> On Mon, Aug 24, 2020 at 1:51 AM Reuven Lax  wrote:
>>
>>> You could simply window into GlobalWindows and add a stateful DoFn
>>> afterwards. No need for the triggering and GroupByKey.
>>>
>>> On Sun, Aug 23, 2020 at 9:45 AM Dongwon Kim 
>>> wrote:
>>>
 Hi Kenneth,

 According to your suggestion, I modified my pipeline as follows:

 p.apply(WithKeys.of(...).withKeyType(...))
>// (A)
>   .apply(Window.into(FixedWindows.of(...)))
>// (B)
>   .apply(Combine.perKey(new MyCombinFn()))//
> (C)
>   .apply(
> Window
>   .into(new GlobalWindows())
>   // (E1)
>   .triggering(
> Repeatedly.forever(AfterPane.elementCountAtLeast(1)   // (E2)
>   )
>   .accumulatingFiredPanes()
>  // (E3)
>   )
>   .apply(GroupByKey.create())
>  // (F)
>   .apply(ParDo.of(new MyDoFn()))
>   // (D)


 I had to include (E1), (E2), (E3), and (F) so that MyDoFn (D) can
 iterate over a list of output records from (C) sharing the same key.
 This way I can achieve the same effect without having a per-key state
 at (D).

 Do I understand your intention correctly?
 If not, please advise me with some hints on it.

 Thanks,

 Dongwon


 On Sun, Aug 23, 2020 at 5:10 AM Kenneth Knowles 
 wrote:

> Hi Dongwon,
>
> On Sat, Aug 22, 2020 at 2:46 PM Dongwon Kim 
> wrote:
>
>> Hi all,
>>
>> I'm using Beam 2.23.0 with FlinkRunner and a part of my unbounded
>> pipeline looks like below:
>>
>>> p.apply(WithKeys.of(...).withKeyType(...)) // (A)
>>>   .apply(Window.into(FixedWindows.of(...)))// (B)
>>
>>   .apply(Combine.perKey(new MyCombinFn()))  // (C)
>>
>>   .apply(ParDo.of(new MyDoFn()))  // (D)
>>
>>
>> What I want to do is
>> (1) to group data by key (A) and window (B),
>> (2) to do some aggregation (C)
>> (3) to perform the final computation on each group (D)
>>
>> I've noticed that a ValueState for a particular key is NULL whenever
>> a new window for the key is arriving, which gives me a feeling that Beam
>> seems to support only per-key+window state, not per-key state, after
>> windowing.
>>
>> I usually work with Flink DataStream API and Flink supports both
>> per-key state and per-key+window state [1].
>>
>> Does Beam support per-key states, not per-key+window states, after
>> windowing (D)? If I miss something, please correct me.
>>
>
> You understand correctly - Beam does not include per-key state that
> crosses window boundaries. If I understand your goal correctly, you can
> achieve the same effect by copying the window metadata into the element 
> and
> then re-windowing into the global window before (D).
>
> Kenn
>
>
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html#using-per-window-state-in-processwindowfunction
>>
>> Best,
>>
>> Dongwon
>>
>>


Re: Support of per-key state after windowing

2020-08-23 Thread Kenneth Knowles
On Sun, Aug 23, 2020 at 1:12 PM Dongwon Kim  wrote:

> Hi Reuven,
>
> You and Kenneth are right; I thought GlobalWindows in unbounded streams
> need triggers.
>
> p.apply(WithKeys.of(...).withKeyType(...))   // (A)
>>   .apply(Window.into(FixedWindows.of(...)))// (B)
>>   .apply(Combine.perKey(new MyCombinFn()))  // (C)
>>   .apply(Window.into(new GlobalWindows()))  // (E)
>>   .apply(ParDo.of(new MyDoFn()))  // (D)
>
>
> So just adding (E) blurs windows and makes the state defined in MyDoFn (D)
> a per-key state.
> Hope I understand you and Kenneth correctly this time.
>

That is correct. However, I think you may want:

p.apply(WithKeys.of(...).withKeyType(...))   // (A)
>   .apply(Window.into(FixedWindows.of(...)))// (B)
>   .apply(Combine.perKey(new MyCombinFn()))  // (C)
>   .apply(Window.into(new GlobalWindows()))  // (E)


.apply(Reify.windowsInValue()
)
 // (G)


>   .apply(ParDo.of(new MyDoFn()))  // (D)


This will make the window information from (B) & (C) available to MyDoFn in
(D)

Kenn


>
> Best,
>
> Dongwon
>
> On Mon, Aug 24, 2020 at 1:51 AM Reuven Lax  wrote:
>
>> You could simply window into GlobalWindows and add a stateful DoFn
>> afterwards. No need for the triggering and GroupByKey.
>>
>> On Sun, Aug 23, 2020 at 9:45 AM Dongwon Kim 
>> wrote:
>>
>>> Hi Kenneth,
>>>
>>> According to your suggestion, I modified my pipeline as follows:
>>>
>>> p.apply(WithKeys.of(...).withKeyType(...))
// (A)
   .apply(Window.into(FixedWindows.of(...)))
  // (B)
   .apply(Combine.perKey(new MyCombinFn()))// (C)
   .apply(
 Window
   .into(new GlobalWindows())
 // (E1)
   .triggering(
 Repeatedly.forever(AfterPane.elementCountAtLeast(1)   // (E2)
   )
   .accumulatingFiredPanes()
  // (E3)
   )
   .apply(GroupByKey.create())
  // (F)
   .apply(ParDo.of(new MyDoFn()))
 // (D)
>>>
>>>
>>> I had to include (E1), (E2), (E3), and (F) so that MyDoFn (D) can
>>> iterate over a list of output records from (C) sharing the same key.
>>> This way I can achieve the same effect without having a per-key state at
>>> (D).
>>>
>>> Do I understand your intention correctly?
>>> If not, please advise me with some hints on it.
>>>
>>> Thanks,
>>>
>>> Dongwon
>>>
>>>
>>> On Sun, Aug 23, 2020 at 5:10 AM Kenneth Knowles  wrote:
>>>
 Hi Dongwon,

 On Sat, Aug 22, 2020 at 2:46 PM Dongwon Kim 
 wrote:

> Hi all,
>
> I'm using Beam 2.23.0 with FlinkRunner and a part of my unbounded
> pipeline looks like below:
>
>> p.apply(WithKeys.of(...).withKeyType(...)) // (A)
>>   .apply(Window.into(FixedWindows.of(...)))// (B)
>
>   .apply(Combine.perKey(new MyCombinFn()))  // (C)
>
>   .apply(ParDo.of(new MyDoFn()))  // (D)
>
>
> What I want to do is
> (1) to group data by key (A) and window (B),
> (2) to do some aggregation (C)
> (3) to perform the final computation on each group (D)
>
> I've noticed that a ValueState for a particular key is NULL whenever a
> new window for the key is arriving, which gives me a feeling that Beam
> seems to support only per-key+window state, not per-key state, after
> windowing.
>
> I usually work with Flink DataStream API and Flink supports both
> per-key state and per-key+window state [1].
>
> Does Beam support per-key states, not per-key+window states, after
> windowing (D)? If I miss something, please correct me.
>

 You understand correctly - Beam does not include per-key state that
 crosses window boundaries. If I understand your goal correctly, you can
 achieve the same effect by copying the window metadata into the element and
 then re-windowing into the global window before (D).

 Kenn


>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html#using-per-window-state-in-processwindowfunction
>
> Best,
>
> Dongwon
>
>


Re: Support of per-key state after windowing

2020-08-23 Thread Dongwon Kim
Hi Reuven,

You and Kenneth are right; I thought GlobalWindows in unbounded streams
need triggers.

p.apply(WithKeys.of(...).withKeyType(...))   // (A)
>   .apply(Window.into(FixedWindows.of(...)))// (B)
>   .apply(Combine.perKey(new MyCombinFn()))  // (C)
>   .apply(Window.into(new GlobalWindows()))  // (E)
>   .apply(ParDo.of(new MyDoFn()))  // (D)


So just adding (E) blurs windows and makes the state defined in MyDoFn (D)
a per-key state.
Hope I understand you and Kenneth correctly this time.

Best,

Dongwon

On Mon, Aug 24, 2020 at 1:51 AM Reuven Lax  wrote:

> You could simply window into GlobalWindows and add a stateful DoFn
> afterwards. No need for the triggering and GroupByKey.
>
> On Sun, Aug 23, 2020 at 9:45 AM Dongwon Kim  wrote:
>
>> Hi Kenneth,
>>
>> According to your suggestion, I modified my pipeline as follows:
>>
>> p.apply(WithKeys.of(...).withKeyType(...))
>>>  // (A)
>>>   .apply(Window.into(FixedWindows.of(...)))
>>>  // (B)
>>>   .apply(Combine.perKey(new MyCombinFn()))// (C)
>>>   .apply(
>>> Window
>>>   .into(new GlobalWindows())
>>> // (E1)
>>>   .triggering(
>>> Repeatedly.forever(AfterPane.elementCountAtLeast(1)   // (E2)
>>>   )
>>>   .accumulatingFiredPanes()
>>>// (E3)
>>>   )
>>>   .apply(GroupByKey.create())
>>>// (F)
>>>   .apply(ParDo.of(new MyDoFn()))
>>> // (D)
>>
>>
>> I had to include (E1), (E2), (E3), and (F) so that MyDoFn (D) can iterate
>> over a list of output records from (C) sharing the same key.
>> This way I can achieve the same effect without having a per-key state at
>> (D).
>>
>> Do I understand your intention correctly?
>> If not, please advise me with some hints on it.
>>
>> Thanks,
>>
>> Dongwon
>>
>>
>> On Sun, Aug 23, 2020 at 5:10 AM Kenneth Knowles  wrote:
>>
>>> Hi Dongwon,
>>>
>>> On Sat, Aug 22, 2020 at 2:46 PM Dongwon Kim 
>>> wrote:
>>>
 Hi all,

 I'm using Beam 2.23.0 with FlinkRunner and a part of my unbounded
 pipeline looks like below:

> p.apply(WithKeys.of(...).withKeyType(...)) // (A)
>   .apply(Window.into(FixedWindows.of(...)))// (B)

   .apply(Combine.perKey(new MyCombinFn()))  // (C)

   .apply(ParDo.of(new MyDoFn()))  // (D)


 What I want to do is
 (1) to group data by key (A) and window (B),
 (2) to do some aggregation (C)
 (3) to perform the final computation on each group (D)

 I've noticed that a ValueState for a particular key is NULL whenever a
 new window for the key is arriving, which gives me a feeling that Beam
 seems to support only per-key+window state, not per-key state, after
 windowing.

 I usually work with Flink DataStream API and Flink supports both
 per-key state and per-key+window state [1].

 Does Beam support per-key states, not per-key+window states, after
 windowing (D)? If I miss something, please correct me.

>>>
>>> You understand correctly - Beam does not include per-key state that
>>> crosses window boundaries. If I understand your goal correctly, you can
>>> achieve the same effect by copying the window metadata into the element and
>>> then re-windowing into the global window before (D).
>>>
>>> Kenn
>>>
>>>

 [1]
 https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html#using-per-window-state-in-processwindowfunction

 Best,

 Dongwon




Re: Support of per-key state after windowing

2020-08-23 Thread Reuven Lax
You could simply window into GlobalWindows and add a stateful DoFn
afterwards. No need for the triggering and GroupByKey.

On Sun, Aug 23, 2020 at 9:45 AM Dongwon Kim  wrote:

> Hi Kenneth,
>
> According to your suggestion, I modified my pipeline as follows:
>
> p.apply(WithKeys.of(...).withKeyType(...))
>>  // (A)
>>   .apply(Window.into(FixedWindows.of(...)))
>>  // (B)
>>   .apply(Combine.perKey(new MyCombinFn()))// (C)
>>   .apply(
>> Window
>>   .into(new GlobalWindows())
>>   // (E1)
>>   .triggering(
>> Repeatedly.forever(AfterPane.elementCountAtLeast(1)   // (E2)
>>   )
>>   .accumulatingFiredPanes()
>>// (E3)
>>   )
>>   .apply(GroupByKey.create())
>>// (F)
>>   .apply(ParDo.of(new MyDoFn()))
>>   // (D)
>
>
> I had to include (E1), (E2), (E3), and (F) so that MyDoFn (D) can iterate
> over a list of output records from (C) sharing the same key.
> This way I can achieve the same effect without having a per-key state at
> (D).
>
> Do I understand your intention correctly?
> If not, please advise me with some hints on it.
>
> Thanks,
>
> Dongwon
>
>
> On Sun, Aug 23, 2020 at 5:10 AM Kenneth Knowles  wrote:
>
>> Hi Dongwon,
>>
>> On Sat, Aug 22, 2020 at 2:46 PM Dongwon Kim 
>> wrote:
>>
>>> Hi all,
>>>
>>> I'm using Beam 2.23.0 with FlinkRunner and a part of my unbounded
>>> pipeline looks like below:
>>>
 p.apply(WithKeys.of(...).withKeyType(...)) // (A)
   .apply(Window.into(FixedWindows.of(...)))// (B)
>>>
>>>   .apply(Combine.perKey(new MyCombinFn()))  // (C)
>>>
>>>   .apply(ParDo.of(new MyDoFn()))  // (D)
>>>
>>>
>>> What I want to do is
>>> (1) to group data by key (A) and window (B),
>>> (2) to do some aggregation (C)
>>> (3) to perform the final computation on each group (D)
>>>
>>> I've noticed that a ValueState for a particular key is NULL whenever a
>>> new window for the key is arriving, which gives me a feeling that Beam
>>> seems to support only per-key+window state, not per-key state, after
>>> windowing.
>>>
>>> I usually work with Flink DataStream API and Flink supports both per-key
>>> state and per-key+window state [1].
>>>
>>> Does Beam support per-key states, not per-key+window states, after
>>> windowing (D)? If I miss something, please correct me.
>>>
>>
>> You understand correctly - Beam does not include per-key state that
>> crosses window boundaries. If I understand your goal correctly, you can
>> achieve the same effect by copying the window metadata into the element and
>> then re-windowing into the global window before (D).
>>
>> Kenn
>>
>>
>>>
>>> [1]
>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html#using-per-window-state-in-processwindowfunction
>>>
>>> Best,
>>>
>>> Dongwon
>>>
>>>


Re: Support of per-key state after windowing

2020-08-23 Thread Dongwon Kim
Hi Kenneth,

According to your suggestion, I modified my pipeline as follows:

p.apply(WithKeys.of(...).withKeyType(...))
>  // (A)
>   .apply(Window.into(FixedWindows.of(...)))
>  // (B)
>   .apply(Combine.perKey(new MyCombinFn()))// (C)
>   .apply(
> Window
>   .into(new GlobalWindows())
>   // (E1)
>   .triggering(
> Repeatedly.forever(AfterPane.elementCountAtLeast(1)   // (E2)
>   )
>   .accumulatingFiredPanes()
>  // (E3)
>   )
>   .apply(GroupByKey.create())
>  // (F)
>   .apply(ParDo.of(new MyDoFn()))
>   // (D)


I had to include (E1), (E2), (E3), and (F) so that MyDoFn (D) can iterate
over a list of output records from (C) sharing the same key.
This way I can achieve the same effect without having a per-key state at
(D).

Do I understand your intention correctly?
If not, please advise me with some hints on it.

Thanks,

Dongwon


On Sun, Aug 23, 2020 at 5:10 AM Kenneth Knowles  wrote:

> Hi Dongwon,
>
> On Sat, Aug 22, 2020 at 2:46 PM Dongwon Kim  wrote:
>
>> Hi all,
>>
>> I'm using Beam 2.23.0 with FlinkRunner and a part of my unbounded
>> pipeline looks like below:
>>
>>> p.apply(WithKeys.of(...).withKeyType(...)) // (A)
>>>   .apply(Window.into(FixedWindows.of(...)))// (B)
>>
>>   .apply(Combine.perKey(new MyCombinFn()))  // (C)
>>
>>   .apply(ParDo.of(new MyDoFn()))  // (D)
>>
>>
>> What I want to do is
>> (1) to group data by key (A) and window (B),
>> (2) to do some aggregation (C)
>> (3) to perform the final computation on each group (D)
>>
>> I've noticed that a ValueState for a particular key is NULL whenever a
>> new window for the key is arriving, which gives me a feeling that Beam
>> seems to support only per-key+window state, not per-key state, after
>> windowing.
>>
>> I usually work with Flink DataStream API and Flink supports both per-key
>> state and per-key+window state [1].
>>
>> Does Beam support per-key states, not per-key+window states, after
>> windowing (D)? If I miss something, please correct me.
>>
>
> You understand correctly - Beam does not include per-key state that
> crosses window boundaries. If I understand your goal correctly, you can
> achieve the same effect by copying the window metadata into the element and
> then re-windowing into the global window before (D).
>
> Kenn
>
>
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html#using-per-window-state-in-processwindowfunction
>>
>> Best,
>>
>> Dongwon
>>
>>


Re: Support of per-key state after windowing

2020-08-22 Thread Kenneth Knowles
Hi Dongwon,

On Sat, Aug 22, 2020 at 2:46 PM Dongwon Kim  wrote:

> Hi all,
>
> I'm using Beam 2.23.0 with FlinkRunner and a part of my unbounded pipeline
> looks like below:
>
>> p.apply(WithKeys.of(...).withKeyType(...)) // (A)
>>   .apply(Window.into(FixedWindows.of(...)))// (B)
>
>   .apply(Combine.perKey(new MyCombinFn()))  // (C)
>
>   .apply(ParDo.of(new MyDoFn()))  // (D)
>
>
> What I want to do is
> (1) to group data by key (A) and window (B),
> (2) to do some aggregation (C)
> (3) to perform the final computation on each group (D)
>
> I've noticed that a ValueState for a particular key is NULL whenever a new
> window for the key is arriving, which gives me a feeling that Beam seems to
> support only per-key+window state, not per-key state, after windowing.
>
> I usually work with Flink DataStream API and Flink supports both per-key
> state and per-key+window state [1].
>
> Does Beam support per-key states, not per-key+window states, after
> windowing (D)? If I miss something, please correct me.
>

You understand correctly - Beam does not include per-key state that crosses
window boundaries. If I understand your goal correctly, you can achieve the
same effect by copying the window metadata into the element and then
re-windowing into the global window before (D).

Kenn


>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html#using-per-window-state-in-processwindowfunction
>
> Best,
>
> Dongwon
>
>


Support of per-key state after windowing

2020-08-22 Thread Dongwon Kim
Hi all,

I'm using Beam 2.23.0 with FlinkRunner and a part of my unbounded pipeline
looks like below:

> p.apply(WithKeys.of(...).withKeyType(...)) // (A)
>   .apply(Window.into(FixedWindows.of(...)))// (B)

  .apply(Combine.perKey(new MyCombinFn()))  // (C)

  .apply(ParDo.of(new MyDoFn()))  // (D)


What I want to do is
(1) to group data by key (A) and window (B),
(2) to do some aggregation (C)
(3) to perform the final computation on each group (D)

I've noticed that a ValueState for a particular key is NULL whenever a new
window for the key is arriving, which gives me a feeling that Beam seems to
support only per-key+window state, not per-key state, after windowing.

I usually work with Flink DataStream API and Flink supports both per-key
state and per-key+window state [1].

Does Beam support per-key states, not per-key+window states, after
windowing (D)? If I miss something, please correct me.

[1]
https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html#using-per-window-state-in-processwindowfunction

Best,

Dongwon