Hi,
I think what you might be looking for is "stateful processing", please
have a look at [1]. Note that input to stateful DoFn must be of type
KV<K, V>, which then ensures similar behavior to Flink's keyBy.
Best,
Jan
[1] https://beam.apache.org/blog/stateful-processing/
On 12/13/20 6:27 AM, Tao Li wrote:
Sorry I think I had some misunderstanding on keyBy API from Flink.
It’s not exactly equivalent to GroupByKey from Beam. So please ignore
my question and this email thread. Thanks for help though 😊
*From: *Tao Li <[email protected]>
*Date: *Friday, December 11, 2020 at 7:29 PM
*To: *"[email protected]" <[email protected]>, Reuven Lax
<[email protected]>
*Cc: *Mehmet Emre Sahin <[email protected]>, Ying-Chang Cheng
<[email protected]>
*Subject: *Re: Question regarding GoupByKey operator on unbounded data
Would Combine.PerKey work for my case? Seems like it does not require
a window function.
At the same time it seems that this operator is typically used to
generate some aggregated output (e.g. count) instead of the value
list. So I am not sure if it’s suitable for my use case.
Please advise. Thanks!
*From: *Tao Li <[email protected]>
*Reply-To: *"[email protected]" <[email protected]>
*Date: *Friday, December 11, 2020 at 10:29 AM
*To: *"[email protected]" <[email protected]>, Reuven Lax
<[email protected]>
*Cc: *Mehmet Emre Sahin <[email protected]>, Ying-Chang Cheng
<[email protected]>
*Subject: *Re: Question regarding GoupByKey operator on unbounded data
Hi @Reuven Lax <mailto:[email protected]> basically we have a flink app
that does a stream processing. It uses a KeyBy operation to generate a
keyed stream. Since we need to query all historical data of the input,
we are not specifying a window function or a trigger in this flink
app, which is fine.
Now we would like to convert this flink app to a beam app. The problem
is that for a unbounded PCollection, beam requires either a non-global
windowing or an aggregation trigger to perform a GroupByKey operation.
I was thinking about applying a sliding window with a huge size (say 1
year) to accommodate this Beam requirement. But not sure if this is
feasible or a good practice.
So what’s your recommendation to solve this problem? Thanks!
*From: *Reuven Lax <[email protected]>
*Reply-To: *"[email protected]" <[email protected]>
*Date: *Thursday, December 10, 2020 at 3:07 PM
*To: *user <[email protected]>
*Cc: *Mehmet Emre Sahin <[email protected]>, Ying-Chang Cheng
<[email protected]>
*Subject: *Re: Question regarding GoupByKey operator on unbounded data
Can you explain more about what exactly you are trying to do?
On Thu, Dec 10, 2020 at 2:51 PM Tao Li <[email protected]
<mailto:[email protected]>> wrote:
Hi Beam community,
I got a quick question about GoupByKey operator. According to this
doc
<https://nam11.safelinks.protection.outlook.com/?url=https%3A%2F%2Fbeam.apache.org%2Fdocumentation%2Fprogramming-guide%2F%23groupbykey&data=04%7C01%7Ctaol%40zillow.com%7C8e269ce605e246c1103708d89d60650e%7C033464830d1840e7a5883784ac50e16f%7C0%7C0%7C637432384625379438%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000&sdata=gpajZ9AnwwOJAZJE9OwaIaEx5BGWzdhuR6wi67OPP3A%3D&reserved=0>,
if we are using unbounded PCollection, it’s required to specify
either non-global windowing
<https://nam11.safelinks.protection.outlook.com/?url=https%3A%2F%2Fbeam.apache.org%2Fdocumentation%2Fprogramming-guide%2F%23setting-your-pcollections-windowing-function&data=04%7C01%7Ctaol%40zillow.com%7C8e269ce605e246c1103708d89d60650e%7C033464830d1840e7a5883784ac50e16f%7C0%7C0%7C637432384625379438%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000&sdata=cWJBj0EbyCThofC3Gu386kXiBVwQgdTVXKg%2B4SHQOlw%3D&reserved=0>
or
an aggregation trigger
<https://nam11.safelinks.protection.outlook.com/?url=https%3A%2F%2Fbeam.apache.org%2Fdocumentation%2Fprogramming-guide%2F%23triggers&data=04%7C01%7Ctaol%40zillow.com%7C8e269ce605e246c1103708d89d60650e%7C033464830d1840e7a5883784ac50e16f%7C0%7C0%7C637432384625389432%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000&sdata=CD41nwXkDndqTk8Ct3A%2FhToUaG%2BJiCnGIRmttHKdwEI%3D&reserved=0>
in
order to perform a GroupByKey operation.
In comparison, KeyBy
<https://nam11.safelinks.protection.outlook.com/?url=https%3A%2F%2Fci.apache.org%2Fprojects%2Fflink%2Fflink-docs-stable%2Fdev%2Fstream%2Foperators%2F&data=04%7C01%7Ctaol%40zillow.com%7C8e269ce605e246c1103708d89d60650e%7C033464830d1840e7a5883784ac50e16f%7C0%7C0%7C637432384625389432%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000&sdata=J3I%2B5bnnHJ7bIXqjeMIf2vWlbfgb1zIiidhho7L%2FdpE%3D&reserved=0>
operator from flink does not have such a hard requirement for
streamed data.
In our use case, we do need to query all historical streamed data
and group by keys. KeyBy from flink satisfies our need, but Beam
GoupByKey does not satisfy this need. I thought about applying a
sliding window with a very large size (say 1 year), thus we can
query the past 1 year’s data. But not sure if this is feasible or
a good practice.
So what would the Beam solution be to implement this business
logic? Is there a support from beam to process a relative long
history of a unbounded PCollection?
Thanks so much!