Re: PipelineOptions at execution time from DirectRunner
Thanks Ahmet! These are illustrative explanations. I still wonder about one question: > >> Getting it as pcoll.pipeline.options in the expand(self, pcoll) call is a >> possiblity, but it seems like that's not ideal. Any other suggestions? >> > Is this an appropriate way of obtaining an option that is not explicitly passed by the user? It prints a warning. "What's the context?" - I'm working on a transform that writes to big query, and table destinations can come in the form "dataset.table", or "project:dataset.table". Because these are parsed at runtime (destinations are dynamic), the ptransform checks if a project was provided in pipelineoptions via value providers. Thanks! -P. >> Should we simply support RuntimeValueProvider in direct runner? >> > > This is a bit tricky for python. Because it is possible to run multiple > pipeline with DirectRunner in the same process (e.g. call run and do not > block on results.) RuntimeValueProvider works by setting a global variable > and in the case of direct runner multiple pipelines could share the same > process, it gets tricky to support this. > > >> Best >> -P. >> >
Re: PipelineOptions at execution time from DirectRunner
On Thu, Mar 21, 2019 at 4:20 PM Pablo Estrada wrote: > Hi all, > The DirectRunner does not seem to support RuntimeValueProvider. Is there a > suggestion for DirectRunner pipelines to access arguments passed in as > pipeline options(but not necessarily passed explicitly by users) at > pipeline execution time? > RuntimeValueProvider's are useful when some pipeline options are not set at the time of pipeline construction but available at execution time. In case of DirectRunner what is available at execution time is same as what is available at construction time. Am I missing something, why do we need RuntimeValueProvider support in DirectRunner? > > Getting it as pcoll.pipeline.options in the expand(self, pcoll) call is a > possiblity, but it seems like that's not ideal. Any other suggestions? > > Should we simply support RuntimeValueProvider in direct runner? > This is a bit tricky for python. Because it is possible to run multiple pipeline with DirectRunner in the same process (e.g. call run and do not block on results.) RuntimeValueProvider works by setting a global variable and in the case of direct runner multiple pipelines could share the same process, it gets tricky to support this. > Best > -P. >
PipelineOptions at execution time from DirectRunner
Hi all, The DirectRunner does not seem to support RuntimeValueProvider. Is there a suggestion for DirectRunner pipelines to access arguments passed in as pipeline options(but not necessarily passed explicitly by users) at pipeline execution time? Getting it as pcoll.pipeline.options in the expand(self, pcoll) call is a possiblity, but it seems like that's not ideal. Any other suggestions? Should we simply support RuntimeValueProvider in direct runner? Best -P.
Re: Stateful transforms and dropped late data
OK, I've now read your pipeline. In the minimal pipeline, are you saying that you see logs with "STATE " but then never a corresponding "STATE TIMER"? There are couple other things to mention: - reading data from Pubsub and then using WithTimestamps is not as good as using the timestamp attribute control on PubsubIO. So you will create late / dropped data potentially. - you have a processing time timer, but when a window expires it will not fire, so you also need to set an event time timer for the end of the window + allowed lateness Kenn On Sun, Mar 17, 2019 at 6:53 AM Amit Ziv-Kenet wrote: > Hello, > > I've been able to produce a (relatively) minimal pipeline which reproduces > the behavior I've previously noticed in one of our production pipelines. > > The minimal pipeline repo can be found here: > https://github.com/azk/late-samples-state > The actual pipeline is implemented in one big class here: > https://github.com/azk/late-samples-state/blob/master/src/main/java/late/samples/state/LateSamplesState.java > > Basically, this pipeline consumes events from a pubsub topic, which are > very simple json objects. The events are: > * deserialized > * each element's timestamp is extracted from the json payload > * the elements are keyed and then windowed in a one-minute fixed window > with an allowed lateness of 7 days, with processing time triggers and > accumulating panes. > > The pipeline then branches into 2 branches: > 1. A simple GroupByKey, followed by a logging step which records which > panes were fired. > 2. A stateful ParDo, which collects elements in a BagState, logs what is > currently in the Bag and sets a one minute timer, which also logs the > contents of the BagState. > > When running this pipeline in Dataflow, and publishing a few "very late" > events through the script which is part of the repo: > https://github.com/azk/late-samples-state/blob/master/scripts/publish_events.py > , > the following behavior is observed: > 1. The very late elements never appear after the GroupBy step, and are > dropped as expected. > 2. The very late elements are recorded in the stateful transform and added > to the BagState. > 3. When the timer expires, the BagState is empty and the elements that > were previously in the Bag seem to disappear. > > Is this an expected behavior or is there some subtle issue going on? > I'd be more than happy to deep dive into this with anyone interested in > looking into this behavior. > > Thank you, > Amit. > > > On Wed, Mar 6, 2019 at 11:14 PM Amit Ziv-Kenet > wrote: > >> Hi Kenn, >> Thank you for the the explanation! >> >> These points make total sense, that's why I was surprised with the >> observed behavior, which breaks at least points 3 and 4. >> I'll try to extract and share a minimal working example which >> demonstrates this behavior. >> >> Thank you, >> Amit. >> >> On Wed, Mar 6, 2019 at 8:11 PM Kenneth Knowles wrote: >> >>> What you describe is not expected. Here are the relevant points, I think: >>> >>> - A window is expired when the watermark is past the end of the >>> window + allowed lateness >>> - An element is droppable when it is associated to an expired window >>> - All droppable elements should be dropped before reaching the stateful >>> ParDo step >>> - The state and processing time timers for a particular key+window pair >>> are garbage collected when the window is expired, because it is known that >>> nothing can cause that state to be read >>> - A key+window state is not cleared until all event time timers have >>> been dispatched >>> >>> Do these make sense? I'd love to see more detail of your pipeline code. >>> One thing to note is that an element being behind the watermark doesn't >>> really matter. What matters is how the watermark relates to its window. >>> >>> Kenn >>> >>> On Wed, Mar 6, 2019 at 4:28 AM Amit Ziv-Kenet >>> wrote: >>> Hello, I'm encountering an unexpected behavior in one of our pipeline, I hope you might help me make sense of it. This is a streaming pipeline implemented with the Java SDK (2.10) and running on Dataflow. * In the pipeline a FixedWindow is applied to the data with an allowed lateness of a week (doesn't really matter). * The windowed PCollection is then used in 2 separate branches: one regular GroupByKey and and a second one which is Transform which makes have use of State and Timers. * In the stateful transform, incoming elements are added to a BagState container until some condition on the elements is reached, and the elements in the BagState are dispatched downstream. A timer makes sure that the BagState is dispatched even if the condition is not met after some timeout has expired. Occasionally very late data enters the pipeline, with timestamps older than the allowed lateness. In these cases, the GroupByKey transform behaves as expected, and there aren't any panes with the late data
Re: joda-time dependency version
+dev@ I don't know of any special reason we are using an old version. Kenn On Thu, Mar 21, 2019, 09:38 Ismaël Mejía wrote: > Does anyone have any context on why we have such an old version of > Joda time (2.4 released on 2014!) and if there is any possible issue > upgrading it? If not maybe we can try to upgrade it.. > > On Thu, Mar 21, 2019 at 5:35 PM Ismaël Mejía wrote: > > > > Mmmm interesting issue. There is also a plan to use a vendored version > > of joda-time not sure on the progress on that one. > > https://issues.apache.org/jira/browse/BEAM-5827 > > > > For Beam 3 that's the idea but so far there is not at ETA for Beam 3. > > https://issues.apache.org/jira/browse/BEAM-5530 > > > > On Thu, Mar 21, 2019 at 4:15 PM rahul patwari > > wrote: > > > > > > Hi David, > > > > > > The only incompatibility we have come across is this: > > > We have some timestamp format conversions in our project, where we are > converting from a timestamp format to another. > > > > > > With joda-time 2.4: > > > If we convert "2019-03-15 13:56:12" which is in "-MM-dd HH:mm:ss" > format, to "hh:mm:ss yy-MMM-dd z" format, the converted value is "01:56:12 > 19-Mar-15 -07:00". > > > > > > Whereas with joda-time 2.9.3: > > > If we convert "2019-03-15 13:56:12" which is in "-MM-dd HH:mm:ss" > format, to "hh:mm:ss yy-MMM-dd z" format, the converted value is "01:56:12 > 19-Mar-15 PDT". > > > > > > The javadoc for both the versions doesn't seem different though, for > 'z' DateTimeFormat. > > > > > > Even though the javadoc says - Zone names: Time zone names ('z') > cannot be parsed for both the versions, we are able to parse it in > joda-time 2.9.3. > > > > > > Also, joda-time will be replaced with java time with Beam 3? > > > > > > Thanks, > > > Rahul > > > > > > On Thu, Mar 21, 2019 at 5:37 PM David Morávek > wrote: > > >> > > >> Hello Rahul, are there any incompatibilities you are running into > with spark version? These versions should be backward compatible. > > >> > > >> For jodatime doc: > > >> The main public API will remain backwards compatible for both source > and binary in the 2.x stream. > > >> > > >> This means you should be able to safely use Spark's version. > > >> > > >> D. > > >> > > >> On Thu, Mar 21, 2019 at 5:45 AM rahul patwari < > rahulpatwari8...@gmail.com> wrote: > > >>> > > >>> Hi Ismael, > > >>> > > >>> We are using Beam with Spark Runner and Spark 2.4 has joda-time > 2.9.3 as a dependency. So, we have used joda-time 2.9.3 in our shaded > artifact set. As Beam has joda-time 2.4 as a dependency, I was wondering > whether it would break anything in Beam. > > >>> > > >>> Will joda-time be replaced with java time in Beam 3? What is the > expected release date of Beam 3? > > >>> > > >>> Thanks, > > >>> Rahul > > >>> > > >>> On Wed, Mar 20, 2019 at 7:23 PM Ismaël Mejía > wrote: > > > > Hello, > > > > The long term goal would be to get rid of joda-time but that won't > > happen until Beam 3. > > Any 'particular' reason or motivation to push the upgrade? > > > > Regards, > > Ismaël > > > > On Wed, Mar 20, 2019 at 11:53 AM rahul patwari > > wrote: > > > > > > Hi, > > > > > > Is there a plan to upgrade the dependency version of joda-time to > 2.9.3 or latest version? > > > > > > > > > Thanks, > > > Rahul >
Re: joda-time dependency version
Does anyone have any context on why we have such an old version of Joda time (2.4 released on 2014!) and if there is any possible issue upgrading it? If not maybe we can try to upgrade it.. On Thu, Mar 21, 2019 at 5:35 PM Ismaël Mejía wrote: > > Mmmm interesting issue. There is also a plan to use a vendored version > of joda-time not sure on the progress on that one. > https://issues.apache.org/jira/browse/BEAM-5827 > > For Beam 3 that's the idea but so far there is not at ETA for Beam 3. > https://issues.apache.org/jira/browse/BEAM-5530 > > On Thu, Mar 21, 2019 at 4:15 PM rahul patwari > wrote: > > > > Hi David, > > > > The only incompatibility we have come across is this: > > We have some timestamp format conversions in our project, where we are > > converting from a timestamp format to another. > > > > With joda-time 2.4: > > If we convert "2019-03-15 13:56:12" which is in "-MM-dd HH:mm:ss" > > format, to "hh:mm:ss yy-MMM-dd z" format, the converted value is "01:56:12 > > 19-Mar-15 -07:00". > > > > Whereas with joda-time 2.9.3: > > If we convert "2019-03-15 13:56:12" which is in "-MM-dd HH:mm:ss" > > format, to "hh:mm:ss yy-MMM-dd z" format, the converted value is "01:56:12 > > 19-Mar-15 PDT". > > > > The javadoc for both the versions doesn't seem different though, for 'z' > > DateTimeFormat. > > > > Even though the javadoc says - Zone names: Time zone names ('z') cannot be > > parsed for both the versions, we are able to parse it in joda-time 2.9.3. > > > > Also, joda-time will be replaced with java time with Beam 3? > > > > Thanks, > > Rahul > > > > On Thu, Mar 21, 2019 at 5:37 PM David Morávek > > wrote: > >> > >> Hello Rahul, are there any incompatibilities you are running into with > >> spark version? These versions should be backward compatible. > >> > >> For jodatime doc: > >> The main public API will remain backwards compatible for both source and > >> binary in the 2.x stream. > >> > >> This means you should be able to safely use Spark's version. > >> > >> D. > >> > >> On Thu, Mar 21, 2019 at 5:45 AM rahul patwari > >> wrote: > >>> > >>> Hi Ismael, > >>> > >>> We are using Beam with Spark Runner and Spark 2.4 has joda-time 2.9.3 as > >>> a dependency. So, we have used joda-time 2.9.3 in our shaded artifact > >>> set. As Beam has joda-time 2.4 as a dependency, I was wondering whether > >>> it would break anything in Beam. > >>> > >>> Will joda-time be replaced with java time in Beam 3? What is the expected > >>> release date of Beam 3? > >>> > >>> Thanks, > >>> Rahul > >>> > >>> On Wed, Mar 20, 2019 at 7:23 PM Ismaël Mejía wrote: > > Hello, > > The long term goal would be to get rid of joda-time but that won't > happen until Beam 3. > Any 'particular' reason or motivation to push the upgrade? > > Regards, > Ismaël > > On Wed, Mar 20, 2019 at 11:53 AM rahul patwari > wrote: > > > > Hi, > > > > Is there a plan to upgrade the dependency version of joda-time to > > 2.9.3 or latest version? > > > > > > Thanks, > > Rahul
Re: joda-time dependency version
Mmmm interesting issue. There is also a plan to use a vendored version of joda-time not sure on the progress on that one. https://issues.apache.org/jira/browse/BEAM-5827 For Beam 3 that's the idea but so far there is not at ETA for Beam 3. https://issues.apache.org/jira/browse/BEAM-5530 On Thu, Mar 21, 2019 at 4:15 PM rahul patwari wrote: > > Hi David, > > The only incompatibility we have come across is this: > We have some timestamp format conversions in our project, where we are > converting from a timestamp format to another. > > With joda-time 2.4: > If we convert "2019-03-15 13:56:12" which is in "-MM-dd HH:mm:ss" format, > to "hh:mm:ss yy-MMM-dd z" format, the converted value is "01:56:12 19-Mar-15 > -07:00". > > Whereas with joda-time 2.9.3: > If we convert "2019-03-15 13:56:12" which is in "-MM-dd HH:mm:ss" format, > to "hh:mm:ss yy-MMM-dd z" format, the converted value is "01:56:12 19-Mar-15 > PDT". > > The javadoc for both the versions doesn't seem different though, for 'z' > DateTimeFormat. > > Even though the javadoc says - Zone names: Time zone names ('z') cannot be > parsed for both the versions, we are able to parse it in joda-time 2.9.3. > > Also, joda-time will be replaced with java time with Beam 3? > > Thanks, > Rahul > > On Thu, Mar 21, 2019 at 5:37 PM David Morávek wrote: >> >> Hello Rahul, are there any incompatibilities you are running into with spark >> version? These versions should be backward compatible. >> >> For jodatime doc: >> The main public API will remain backwards compatible for both source and >> binary in the 2.x stream. >> >> This means you should be able to safely use Spark's version. >> >> D. >> >> On Thu, Mar 21, 2019 at 5:45 AM rahul patwari >> wrote: >>> >>> Hi Ismael, >>> >>> We are using Beam with Spark Runner and Spark 2.4 has joda-time 2.9.3 as a >>> dependency. So, we have used joda-time 2.9.3 in our shaded artifact set. As >>> Beam has joda-time 2.4 as a dependency, I was wondering whether it would >>> break anything in Beam. >>> >>> Will joda-time be replaced with java time in Beam 3? What is the expected >>> release date of Beam 3? >>> >>> Thanks, >>> Rahul >>> >>> On Wed, Mar 20, 2019 at 7:23 PM Ismaël Mejía wrote: Hello, The long term goal would be to get rid of joda-time but that won't happen until Beam 3. Any 'particular' reason or motivation to push the upgrade? Regards, Ismaël On Wed, Mar 20, 2019 at 11:53 AM rahul patwari wrote: > > Hi, > > Is there a plan to upgrade the dependency version of joda-time to 2.9.3 > or latest version? > > > Thanks, > Rahul
Re: joda-time dependency version
Hi David, The only incompatibility we have come across is this: We have some timestamp format conversions in our project, where we are converting from a timestamp format to another. With joda-time 2.4: If we convert "2019-03-15 13:56:12" which is in "-MM-dd HH:mm:ss" format, to "hh:mm:ss yy-MMM-dd z" format, the converted value is "01:56:12 19-Mar-15 -07:00". Whereas with joda-time 2.9.3: If we convert "2019-03-15 13:56:12" which is in "-MM-dd HH:mm:ss" format, to "hh:mm:ss yy-MMM-dd z" format, the converted value is "01:56:12 19-Mar-15 PDT". The javadoc for both the versions doesn't seem different though, for 'z' DateTimeFormat. Even though the javadoc says - *Zone names*: Time zone names ('z') cannot be parsed for both the versions, we are able to parse it in joda-time 2.9.3. Also, joda-time will be replaced with java time with Beam 3? Thanks, Rahul On Thu, Mar 21, 2019 at 5:37 PM David Morávek wrote: > Hello Rahul, are there any incompatibilities you are running into with > spark version? These versions should be backward compatible. > > For jodatime doc: > The main public API will remain *backwards compatible* for both source > and binary in the 2.x stream. > > This means you should be able to safely use Spark's version. > > D. > > On Thu, Mar 21, 2019 at 5:45 AM rahul patwari > wrote: > >> Hi Ismael, >> >> We are using Beam with Spark Runner and Spark 2.4 has joda-time 2.9.3 as >> a dependency. So, we have used joda-time 2.9.3 in our shaded artifact set. >> As Beam has joda-time 2.4 as a dependency, I was wondering whether it would >> break anything in Beam. >> >> Will joda-time be replaced with java time in Beam 3? What is the expected >> release date of Beam 3? >> >> Thanks, >> Rahul >> >> On Wed, Mar 20, 2019 at 7:23 PM Ismaël Mejía wrote: >> >>> Hello, >>> >>> The long term goal would be to get rid of joda-time but that won't >>> happen until Beam 3. >>> Any 'particular' reason or motivation to push the upgrade? >>> >>> Regards, >>> Ismaël >>> >>> On Wed, Mar 20, 2019 at 11:53 AM rahul patwari >>> wrote: >>> > >>> > Hi, >>> > >>> > Is there a plan to upgrade the dependency version of joda-time to >>> 2.9.3 or latest version? >>> > >>> > >>> > Thanks, >>> > Rahul >>> >>
Re: joda-time dependency version
Hello Rahul, are there any incompatibilities you are running into with spark version? These versions should be backward compatible. For jodatime doc: The main public API will remain *backwards compatible* for both source and binary in the 2.x stream. This means you should be able to safely use Spark's version. D. On Thu, Mar 21, 2019 at 5:45 AM rahul patwari wrote: > Hi Ismael, > > We are using Beam with Spark Runner and Spark 2.4 has joda-time 2.9.3 as a > dependency. So, we have used joda-time 2.9.3 in our shaded artifact set. As > Beam has joda-time 2.4 as a dependency, I was wondering whether it would > break anything in Beam. > > Will joda-time be replaced with java time in Beam 3? What is the expected > release date of Beam 3? > > Thanks, > Rahul > > On Wed, Mar 20, 2019 at 7:23 PM Ismaël Mejía wrote: > >> Hello, >> >> The long term goal would be to get rid of joda-time but that won't >> happen until Beam 3. >> Any 'particular' reason or motivation to push the upgrade? >> >> Regards, >> Ismaël >> >> On Wed, Mar 20, 2019 at 11:53 AM rahul patwari >> wrote: >> > >> > Hi, >> > >> > Is there a plan to upgrade the dependency version of joda-time to 2.9.3 >> or latest version? >> > >> > >> > Thanks, >> > Rahul >> >
Is AvroCoder the right coder for me?
Hi I am trying out Beam to do some data aggregations. Many of the inputs/outputs of my transforms are complex objects (not super complex, but containing Maps/Lists/Sets sometimes) so when I was prompted to defined a coder to these objects I added the annotation @DefaultCoder(AvroCoder.class) and things worked in my development environment. Now that I am trying to run in on "real" data I notice that after I deployed it to a spark runner and looking at some thread dumps, many of the threads were blocked on the following method on the Avro library (ReflectData.getAccessorsFor). So my question is, did I do the wrong thing by using the AvroCoder or is there some other coder that easily can solve my problem? Best regards, Augusto