Re: Question on late data handling in Beam streaming mode
Thanks folks. This is really informative! From: Kenneth Knowles Reply-To: "user@beam.apache.org" Date: Friday, April 23, 2021 at 9:34 AM To: Reuven Lax Cc: user , Kenneth Knowles , Kelly Smith , Lian Jiang Subject: Re: Question on late data handling in Beam streaming mode Reuven's answer will result in a group by key (but not window) where no data is dropped and you get deltas for each key. Downstream consumers can recombine the deltas to get per-key aggregation. So instead of putting the time interval into the window, you put it into the key, and then you get the same grouped aggregation. There are (at least) two other ways to do this: 1. You can set allowed lateness to a high value. 2. You can use a ParDo and outputWithTimestamp [1] to set the timestamps to arrival time. I illustrated this in some older talks [2]. Kenn [1] https://github.com/apache/beam/blob/dc636be57900c8ad9b6b9e50b08dad64be8aee40/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java#L184<https://nam11.safelinks.protection.outlook.com/?url=https%3A%2F%2Fgithub.com%2Fapache%2Fbeam%2Fblob%2Fdc636be57900c8ad9b6b9e50b08dad64be8aee40%2Fsdks%2Fjava%2Fcore%2Fsrc%2Fmain%2Fjava%2Forg%2Fapache%2Fbeam%2Fsdk%2Ftransforms%2FDoFn.java%23L184=04%7C01%7Ctaol%40zillow.com%7C7c11d6f8809f4f46887108d90675a90a%7C033464830d1840e7a5883784ac50e16f%7C0%7C0%7C637547924683482682%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000=vs9%2FRa%2B8ya5%2FIWxtMUa8KUuRvEH4vUbfyDAr%2BbJN3IM%3D=0> [2] https://docs.google.com/presentation/d/1smGXb-0GGX_Fid1z3WWzZJWtyBjBA3Mo3t4oeRjJoZI/present?slide=id.g142c2fd96f_0_134<https://nam11.safelinks.protection.outlook.com/?url=https%3A%2F%2Fdocs.google.com%2Fpresentation%2Fd%2F1smGXb-0GGX_Fid1z3WWzZJWtyBjBA3Mo3t4oeRjJoZI%2Fpresent%3Fslide%3Did.g142c2fd96f_0_134=04%7C01%7Ctaol%40zillow.com%7C7c11d6f8809f4f46887108d90675a90a%7C033464830d1840e7a5883784ac50e16f%7C0%7C0%7C637547924683492644%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000=W6p9rfGk9vLqEW3p%2FlTN2c9Jbf%2B1qacEzu4wX36OVoE%3D=0> On Fri, Apr 23, 2021 at 8:32 AM Reuven Lax mailto:re...@google.com>> wrote: You can definitely group by processing time. The way to do this in Beam is as follows Window.into(new GlobalWindows()) .triggering(AfterWatermark.pastEndOfWindow() .withEarlyFirings(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardSeconds(30)) .discardingFiredPanes()); The syntax is a bit unfortunately wordy, but the idea is that you are creating a single event-time window that encompasses all time, and "triggering" an aggregation every 30 seconds based on processing time. On Fri, Apr 23, 2021 at 8:14 AM Tao Li mailto:t...@zillow.com>> wrote: Thanks @Kenneth Knowles<mailto:k...@apache.org>. I understand we need to specify a window for groupby so that the app knowns when processing is “done” to output result. Is it possible to specify a event arrival/processing time based window for groupby? The purpose is to avoid dropping of late events. With a event processing time based window, the app will periodically output the result based on all events that arrived in that window, and a late arriving event will fall into whatever window covers its arrival time and thus that late data will not get lost. Does Beam support this kind of mechanism? Thanks. From: Kenneth Knowles mailto:k...@apache.org>> Reply-To: "user@beam.apache.org<mailto:user@beam.apache.org>" mailto:user@beam.apache.org>> Date: Thursday, April 22, 2021 at 1:49 PM To: user mailto:user@beam.apache.org>> Cc: Kelly Smith mailto:kell...@zillowgroup.com>>, Lian Jiang mailto:li...@zillowgroup.com>> Subject: Re: Question on late data handling in Beam streaming mode Hello! In a streaming app, you have two choices: wait forever and never have any output OR use some method to decide that aggregation is "done". In Beam, the way you decide that aggregation is "done" is the watermark. When the watermark predicts no more data for an aggregation, then the aggregation is done. For example GROUP BY is "done" when no more data will arrive for that minute. At this point, your result is produced. More data may arrive, and it is ignored. The watermark is determined by the IO connector to be the best heuristic available. You can configure "allowed lateness" for an aggregation to allow out of order data. Kenn On Thu, Apr 22, 2021 at 1:26 PM Tao Li mailto:t...@zillow.com>> wrote: Hi Beam community, I am wondering if there is a risk of losing late data from a Beam stream app due to watermarking? I just went through this design doc and noticed the “droppable” definition there: https://docs.google.com/document/d/12r7frmxNickxB5tbpuEh_n35_IJeVZn1peOrBrhhP6Y/edit#<https://nam11.safelinks.protection.o
Re: Question on late data handling in Beam streaming mode
Reuven's answer will result in a group by key (but not window) where no data is dropped and you get deltas for each key. Downstream consumers can recombine the deltas to get per-key aggregation. So instead of putting the time interval into the window, you put it into the key, and then you get the same grouped aggregation. There are (at least) two other ways to do this: 1. You can set allowed lateness to a high value. 2. You can use a ParDo and outputWithTimestamp [1] to set the timestamps to arrival time. I illustrated this in some older talks [2]. Kenn [1] https://github.com/apache/beam/blob/dc636be57900c8ad9b6b9e50b08dad64be8aee40/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java#L184 [2] https://docs.google.com/presentation/d/1smGXb-0GGX_Fid1z3WWzZJWtyBjBA3Mo3t4oeRjJoZI/present?slide=id.g142c2fd96f_0_134 On Fri, Apr 23, 2021 at 8:32 AM Reuven Lax wrote: > You can definitely group by processing time. The way to do this in Beam is > as follows > > Window.into(new GlobalWindows()) > .triggering(AfterWatermark.pastEndOfWindow() > .withEarlyFirings(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardSeconds(30)) > .discardingFiredPanes()); > > The syntax is a bit unfortunately wordy, but the idea is that you are > creating a single event-time window that encompasses all time, and > "triggering" an aggregation every 30 seconds based on processing time. > > On Fri, Apr 23, 2021 at 8:14 AM Tao Li wrote: > >> Thanks @Kenneth Knowles . I understand we need to >> specify a window for groupby so that the app knowns when processing is >> “done” to output result. >> >> >> >> Is it possible to specify a event arrival/processing time based window >> for groupby? The purpose is to avoid dropping of late events. With a event >> processing time based window, the app will periodically output the result >> based on all events that arrived in that window, and a late arriving event >> will fall into whatever window covers its arrival time and thus that late >> data will not get lost. >> >> >> >> Does Beam support this kind of mechanism? Thanks. >> >> >> >> *From: *Kenneth Knowles >> *Reply-To: *"user@beam.apache.org" >> *Date: *Thursday, April 22, 2021 at 1:49 PM >> *To: *user >> *Cc: *Kelly Smith , Lian Jiang < >> li...@zillowgroup.com> >> *Subject: *Re: Question on late data handling in Beam streaming mode >> >> >> >> Hello! >> >> >> >> In a streaming app, you have two choices: wait forever and never have any >> output OR use some method to decide that aggregation is "done". >> >> >> >> In Beam, the way you decide that aggregation is "done" is the watermark. >> When the watermark predicts no more data for an aggregation, then the >> aggregation is done. For example GROUP BY is "done" when no more >> data will arrive for that minute. At this point, your result is produced. >> More data may arrive, and it is ignored. The watermark is determined by the >> IO connector to be the best heuristic available. You can configure "allowed >> lateness" for an aggregation to allow out of order data. >> >> >> >> Kenn >> >> >> >> On Thu, Apr 22, 2021 at 1:26 PM Tao Li wrote: >> >> Hi Beam community, >> >> >> >> I am wondering if there is a risk of losing late data from a Beam stream >> app due to watermarking? >> >> >> >> I just went through this design doc and noticed the “droppable” >> definition there: >> https://docs.google.com/document/d/12r7frmxNickxB5tbpuEh_n35_IJeVZn1peOrBrhhP6Y/edit# >> <https://nam11.safelinks.protection.outlook.com/?url=https%3A%2F%2Fdocs.google.com%2Fdocument%2Fd%2F12r7frmxNickxB5tbpuEh_n35_IJeVZn1peOrBrhhP6Y%2Fedit%23=04%7C01%7Ctaol%40zillow.com%7C5f68c051a16843dc6e5f08d905d016dc%7C033464830d1840e7a5883784ac50e16f%7C0%7C0%7C637547213557227210%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000=2Gjz8DNW5JDbFUie010%2FhrEiKajPR7sMMb67lC8vHrU%3D=0> >> >> >> >> Can you please confirm if it’s possible for us to lose some data in a >> stream app in practice? If that’s possible, what would be the best practice >> to avoid data loss? Thanks! >> >> >> >>
Re: Question on late data handling in Beam streaming mode
You can definitely group by processing time. The way to do this in Beam is as follows Window.into(new GlobalWindows()) .triggering(AfterWatermark.pastEndOfWindow() .withEarlyFirings(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardSeconds(30)) .discardingFiredPanes()); The syntax is a bit unfortunately wordy, but the idea is that you are creating a single event-time window that encompasses all time, and "triggering" an aggregation every 30 seconds based on processing time. On Fri, Apr 23, 2021 at 8:14 AM Tao Li wrote: > Thanks @Kenneth Knowles . I understand we need to > specify a window for groupby so that the app knowns when processing is > “done” to output result. > > > > Is it possible to specify a event arrival/processing time based window for > groupby? The purpose is to avoid dropping of late events. With a event > processing time based window, the app will periodically output the result > based on all events that arrived in that window, and a late arriving event > will fall into whatever window covers its arrival time and thus that late > data will not get lost. > > > > Does Beam support this kind of mechanism? Thanks. > > > > *From: *Kenneth Knowles > *Reply-To: *"user@beam.apache.org" > *Date: *Thursday, April 22, 2021 at 1:49 PM > *To: *user > *Cc: *Kelly Smith , Lian Jiang < > li...@zillowgroup.com> > *Subject: *Re: Question on late data handling in Beam streaming mode > > > > Hello! > > > > In a streaming app, you have two choices: wait forever and never have any > output OR use some method to decide that aggregation is "done". > > > > In Beam, the way you decide that aggregation is "done" is the watermark. > When the watermark predicts no more data for an aggregation, then the > aggregation is done. For example GROUP BY is "done" when no more > data will arrive for that minute. At this point, your result is produced. > More data may arrive, and it is ignored. The watermark is determined by the > IO connector to be the best heuristic available. You can configure "allowed > lateness" for an aggregation to allow out of order data. > > > > Kenn > > > > On Thu, Apr 22, 2021 at 1:26 PM Tao Li wrote: > > Hi Beam community, > > > > I am wondering if there is a risk of losing late data from a Beam stream > app due to watermarking? > > > > I just went through this design doc and noticed the “droppable” definition > there: > https://docs.google.com/document/d/12r7frmxNickxB5tbpuEh_n35_IJeVZn1peOrBrhhP6Y/edit# > <https://nam11.safelinks.protection.outlook.com/?url=https%3A%2F%2Fdocs.google.com%2Fdocument%2Fd%2F12r7frmxNickxB5tbpuEh_n35_IJeVZn1peOrBrhhP6Y%2Fedit%23=04%7C01%7Ctaol%40zillow.com%7C5f68c051a16843dc6e5f08d905d016dc%7C033464830d1840e7a5883784ac50e16f%7C0%7C0%7C637547213557227210%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000=2Gjz8DNW5JDbFUie010%2FhrEiKajPR7sMMb67lC8vHrU%3D=0> > > > > Can you please confirm if it’s possible for us to lose some data in a > stream app in practice? If that’s possible, what would be the best practice > to avoid data loss? Thanks! > > > >
Re: Question on late data handling in Beam streaming mode
Thanks @Kenneth Knowles<mailto:k...@apache.org>. I understand we need to specify a window for groupby so that the app knowns when processing is “done” to output result. Is it possible to specify a event arrival/processing time based window for groupby? The purpose is to avoid dropping of late events. With a event processing time based window, the app will periodically output the result based on all events that arrived in that window, and a late arriving event will fall into whatever window covers its arrival time and thus that late data will not get lost. Does Beam support this kind of mechanism? Thanks. From: Kenneth Knowles Reply-To: "user@beam.apache.org" Date: Thursday, April 22, 2021 at 1:49 PM To: user Cc: Kelly Smith , Lian Jiang Subject: Re: Question on late data handling in Beam streaming mode Hello! In a streaming app, you have two choices: wait forever and never have any output OR use some method to decide that aggregation is "done". In Beam, the way you decide that aggregation is "done" is the watermark. When the watermark predicts no more data for an aggregation, then the aggregation is done. For example GROUP BY is "done" when no more data will arrive for that minute. At this point, your result is produced. More data may arrive, and it is ignored. The watermark is determined by the IO connector to be the best heuristic available. You can configure "allowed lateness" for an aggregation to allow out of order data. Kenn On Thu, Apr 22, 2021 at 1:26 PM Tao Li mailto:t...@zillow.com>> wrote: Hi Beam community, I am wondering if there is a risk of losing late data from a Beam stream app due to watermarking? I just went through this design doc and noticed the “droppable” definition there: https://docs.google.com/document/d/12r7frmxNickxB5tbpuEh_n35_IJeVZn1peOrBrhhP6Y/edit#<https://nam11.safelinks.protection.outlook.com/?url=https%3A%2F%2Fdocs.google.com%2Fdocument%2Fd%2F12r7frmxNickxB5tbpuEh_n35_IJeVZn1peOrBrhhP6Y%2Fedit%23=04%7C01%7Ctaol%40zillow.com%7C5f68c051a16843dc6e5f08d905d016dc%7C033464830d1840e7a5883784ac50e16f%7C0%7C0%7C637547213557227210%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000=2Gjz8DNW5JDbFUie010%2FhrEiKajPR7sMMb67lC8vHrU%3D=0> Can you please confirm if it’s possible for us to lose some data in a stream app in practice? If that’s possible, what would be the best practice to avoid data loss? Thanks!
Re: Question on late data handling in Beam streaming mode
Hello! In a streaming app, you have two choices: wait forever and never have any output OR use some method to decide that aggregation is "done". In Beam, the way you decide that aggregation is "done" is the watermark. When the watermark predicts no more data for an aggregation, then the aggregation is done. For example GROUP BY is "done" when no more data will arrive for that minute. At this point, your result is produced. More data may arrive, and it is ignored. The watermark is determined by the IO connector to be the best heuristic available. You can configure "allowed lateness" for an aggregation to allow out of order data. Kenn On Thu, Apr 22, 2021 at 1:26 PM Tao Li wrote: > Hi Beam community, > > > > I am wondering if there is a risk of losing late data from a Beam stream > app due to watermarking? > > > > I just went through this design doc and noticed the “droppable” definition > there: > https://docs.google.com/document/d/12r7frmxNickxB5tbpuEh_n35_IJeVZn1peOrBrhhP6Y/edit# > > > > Can you please confirm if it’s possible for us to lose some data in a > stream app in practice? If that’s possible, what would be the best practice > to avoid data loss? Thanks! > > >
Question on late data handling in Beam streaming mode
Hi Beam community, I am wondering if there is a risk of losing late data from a Beam stream app due to watermarking? I just went through this design doc and noticed the “droppable” definition there: https://docs.google.com/document/d/12r7frmxNickxB5tbpuEh_n35_IJeVZn1peOrBrhhP6Y/edit# Can you please confirm if it’s possible for us to lose some data in a stream app in practice? If that’s possible, what would be the best practice to avoid data loss? Thanks!