Re: PipelineOptions at execution time from DirectRunner

2019-03-21 Thread Pablo Estrada
Thanks Ahmet! These are illustrative explanations.

I still wonder about one question:

>
>> Getting it as pcoll.pipeline.options in the expand(self, pcoll) call is a
>> possiblity, but it seems like that's not ideal. Any other suggestions?
>>
> Is this an appropriate way of obtaining an option that is not explicitly
passed by the user? It prints a warning.

"What's the context?" - I'm working on a transform that writes to big
query, and table destinations can come in the form "dataset.table", or
"project:dataset.table". Because these are parsed at runtime (destinations
are dynamic), the ptransform checks if a project was provided in
pipelineoptions via value providers.

Thanks!
-P.



>> Should we simply support RuntimeValueProvider in direct runner?
>>
>
> This is a bit tricky for python. Because it is possible to run multiple
> pipeline with DirectRunner in the same process (e.g. call run and do not
> block on results.) RuntimeValueProvider works by setting a global variable
> and in the case of direct runner multiple pipelines could share the same
> process, it gets tricky to support this.
>
>
>> Best
>> -P.
>>
>


Re: PipelineOptions at execution time from DirectRunner

2019-03-21 Thread Ahmet Altay
On Thu, Mar 21, 2019 at 4:20 PM Pablo Estrada  wrote:

> Hi all,
> The DirectRunner does not seem to support RuntimeValueProvider. Is there a
> suggestion for DirectRunner pipelines to access arguments passed in as
> pipeline options(but not necessarily passed explicitly by users) at
> pipeline execution time?
>

RuntimeValueProvider's are useful when some pipeline options are not set at
the time of pipeline construction but available at execution time. In case
of DirectRunner what is available at execution time is same as what is
available at construction time. Am I missing something, why do we
need RuntimeValueProvider support in DirectRunner?


>
> Getting it as pcoll.pipeline.options in the expand(self, pcoll) call is a
> possiblity, but it seems like that's not ideal. Any other suggestions?
>

> Should we simply support RuntimeValueProvider in direct runner?
>

This is a bit tricky for python. Because it is possible to run multiple
pipeline with DirectRunner in the same process (e.g. call run and do not
block on results.) RuntimeValueProvider works by setting a global variable
and in the case of direct runner multiple pipelines could share the same
process, it gets tricky to support this.


> Best
> -P.
>


PipelineOptions at execution time from DirectRunner

2019-03-21 Thread Pablo Estrada
Hi all,
The DirectRunner does not seem to support RuntimeValueProvider. Is there a
suggestion for DirectRunner pipelines to access arguments passed in as
pipeline options(but not necessarily passed explicitly by users) at
pipeline execution time?

Getting it as pcoll.pipeline.options in the expand(self, pcoll) call is a
possiblity, but it seems like that's not ideal. Any other suggestions?

Should we simply support RuntimeValueProvider in direct runner?
Best
-P.


Re: Stateful transforms and dropped late data

2019-03-21 Thread Kenneth Knowles
OK, I've now read your pipeline. In the minimal pipeline, are you saying
that you see logs with "STATE " but then never a corresponding
"STATE TIMER"?

There are couple other things to mention:

 - reading data from Pubsub and then using WithTimestamps is not as good as
using the timestamp attribute control on PubsubIO. So you will create late
/ dropped data potentially.
 - you have a processing time timer, but when a window expires it will not
fire, so you also need to set an event time timer for the end of the
window + allowed lateness

Kenn

On Sun, Mar 17, 2019 at 6:53 AM Amit Ziv-Kenet 
wrote:

> Hello,
>
> I've been able to produce a (relatively) minimal pipeline which reproduces
> the behavior I've previously noticed in one of our production pipelines.
>
> The minimal pipeline repo can be found here:
> https://github.com/azk/late-samples-state
> The actual pipeline is implemented in one big class here:
> https://github.com/azk/late-samples-state/blob/master/src/main/java/late/samples/state/LateSamplesState.java
>
> Basically, this pipeline consumes events from a pubsub topic, which are
> very simple json objects. The events are:
> * deserialized
> * each element's timestamp is extracted from the json payload
> * the elements are keyed and then windowed in a one-minute fixed window
> with an allowed lateness of 7 days, with processing time triggers and
> accumulating panes.
>
> The pipeline then branches into 2 branches:
> 1. A simple GroupByKey, followed by a logging step which records which
> panes were fired.
> 2. A stateful ParDo, which collects elements in a BagState, logs what is
> currently in the Bag and sets a one minute timer, which also logs the
> contents of the BagState.
>
> When running this pipeline in Dataflow, and publishing a few "very late"
> events through the script which is part of the repo:
> https://github.com/azk/late-samples-state/blob/master/scripts/publish_events.py
>  ,
> the following behavior is observed:
> 1. The very late elements never appear after the GroupBy step, and are
> dropped as expected.
> 2. The very late elements are recorded in the stateful transform and added
> to the BagState.
> 3. When the timer expires, the BagState is empty and the elements that
> were previously in the Bag seem to disappear.
>
> Is this an expected behavior or is there some subtle issue going on?
> I'd be more than happy to deep dive into this with anyone interested in
> looking into this behavior.
>
> Thank you,
> Amit.
>
>
> On Wed, Mar 6, 2019 at 11:14 PM Amit Ziv-Kenet 
> wrote:
>
>> Hi Kenn,
>> Thank you for the the explanation!
>>
>> These points make total sense, that's why I was surprised with the
>> observed behavior, which breaks at least points 3 and 4.
>> I'll try to extract and share a minimal working example which
>> demonstrates this behavior.
>>
>> Thank you,
>> Amit.
>>
>> On Wed, Mar 6, 2019 at 8:11 PM Kenneth Knowles  wrote:
>>
>>> What you describe is not expected. Here are the relevant points, I think:
>>>
>>>  - A window is expired when the watermark is past the end of the
>>> window + allowed lateness
>>>  - An element is droppable when it is associated to an expired window
>>>  - All droppable elements should be dropped before reaching the stateful
>>> ParDo step
>>>  - The state and processing time timers for a particular key+window pair
>>> are garbage collected when the window is expired, because it is known that
>>> nothing can cause that state to be read
>>>  - A key+window state is not cleared until all event time timers have
>>> been dispatched
>>>
>>> Do these make sense? I'd love to see more detail of your pipeline code.
>>> One thing to note is that an element being behind the watermark doesn't
>>> really matter. What matters is how the watermark relates to its window.
>>>
>>> Kenn
>>>
>>> On Wed, Mar 6, 2019 at 4:28 AM Amit Ziv-Kenet 
>>> wrote:
>>>
 Hello,

 I'm encountering an unexpected behavior in one of our pipeline, I hope
 you might help me make sense of it.

 This is a streaming pipeline implemented with the Java SDK (2.10) and
 running on Dataflow.

 * In the pipeline a FixedWindow is applied to the data with an allowed
 lateness of a week (doesn't really matter).
 * The windowed PCollection is then used in 2 separate branches: one
 regular GroupByKey and and a second one which is Transform which makes have
 use of State and Timers.
 * In the stateful transform, incoming elements are added to a BagState
 container until some condition on the elements is reached, and the elements
 in the BagState are dispatched downstream. A timer makes sure that the
 BagState is dispatched even if the condition is not met after some timeout
 has expired.

 Occasionally very late data enters the pipeline, with timestamps older
 than the allowed lateness.
 In these cases, the GroupByKey transform behaves as expected, and there
 aren't any panes with the late data 

Re: joda-time dependency version

2019-03-21 Thread Kenneth Knowles
+dev@

I don't know of any special reason we are using an old version.

Kenn

On Thu, Mar 21, 2019, 09:38 Ismaël Mejía  wrote:

> Does anyone have any context on why we have such an old version of
> Joda time (2.4 released on  2014!) and if there is any possible issue
> upgrading it? If not maybe we can try to upgrade it..
>
> On Thu, Mar 21, 2019 at 5:35 PM Ismaël Mejía  wrote:
> >
> > Mmmm interesting issue. There is also a plan to use a vendored version
> > of joda-time not sure on the progress on that one.
> > https://issues.apache.org/jira/browse/BEAM-5827
> >
> > For Beam 3 that's the idea but  so far there is not at ETA for Beam 3.
> > https://issues.apache.org/jira/browse/BEAM-5530
> >
> > On Thu, Mar 21, 2019 at 4:15 PM rahul patwari
> >  wrote:
> > >
> > > Hi David,
> > >
> > > The only incompatibility we have come across is this:
> > > We have some timestamp format conversions in our project, where we are
> converting from a timestamp format to another.
> > >
> > > With joda-time 2.4:
> > > If we convert "2019-03-15 13:56:12" which is in "-MM-dd HH:mm:ss"
> format, to "hh:mm:ss yy-MMM-dd z" format, the converted value is "01:56:12
> 19-Mar-15 -07:00".
> > >
> > > Whereas with joda-time 2.9.3:
> > > If we convert "2019-03-15 13:56:12" which is in "-MM-dd HH:mm:ss"
> format, to "hh:mm:ss yy-MMM-dd z" format, the converted value is "01:56:12
> 19-Mar-15 PDT".
> > >
> > > The javadoc for both the versions doesn't seem different though, for
> 'z' DateTimeFormat.
> > >
> > > Even though the javadoc says - Zone names: Time zone names ('z')
> cannot be parsed for both the versions, we are able to parse it in
> joda-time 2.9.3.
> > >
> > > Also, joda-time will be replaced with java time with Beam 3?
> > >
> > > Thanks,
> > > Rahul
> > >
> > > On Thu, Mar 21, 2019 at 5:37 PM David Morávek 
> wrote:
> > >>
> > >> Hello Rahul, are there any incompatibilities you are running into
> with spark version? These versions should be backward compatible.
> > >>
> > >> For jodatime doc:
> > >> The main public API will remain backwards compatible for both source
> and binary in the 2.x stream.
> > >>
> > >> This means you should be able to safely use Spark's version.
> > >>
> > >> D.
> > >>
> > >> On Thu, Mar 21, 2019 at 5:45 AM rahul patwari <
> rahulpatwari8...@gmail.com> wrote:
> > >>>
> > >>> Hi Ismael,
> > >>>
> > >>> We are using Beam with Spark Runner and Spark 2.4 has joda-time
> 2.9.3 as a dependency. So, we have used joda-time 2.9.3 in our shaded
> artifact set. As Beam has joda-time 2.4 as a dependency, I was wondering
> whether it would break anything in Beam.
> > >>>
> > >>> Will joda-time be replaced with java time in Beam 3? What is the
> expected release date of Beam 3?
> > >>>
> > >>> Thanks,
> > >>> Rahul
> > >>>
> > >>> On Wed, Mar 20, 2019 at 7:23 PM Ismaël Mejía 
> wrote:
> > 
> >  Hello,
> > 
> >  The long term goal would be to get rid of joda-time but that won't
> >  happen until Beam 3.
> >  Any 'particular' reason or motivation to push the upgrade?
> > 
> >  Regards,
> >  Ismaël
> > 
> >  On Wed, Mar 20, 2019 at 11:53 AM rahul patwari
> >   wrote:
> >  >
> >  > Hi,
> >  >
> >  > Is there a plan to upgrade the dependency version of joda-time to
> 2.9.3 or latest version?
> >  >
> >  >
> >  > Thanks,
> >  > Rahul
>


Re: joda-time dependency version

2019-03-21 Thread Ismaël Mejía
Does anyone have any context on why we have such an old version of
Joda time (2.4 released on  2014!) and if there is any possible issue
upgrading it? If not maybe we can try to upgrade it..

On Thu, Mar 21, 2019 at 5:35 PM Ismaël Mejía  wrote:
>
> Mmmm interesting issue. There is also a plan to use a vendored version
> of joda-time not sure on the progress on that one.
> https://issues.apache.org/jira/browse/BEAM-5827
>
> For Beam 3 that's the idea but  so far there is not at ETA for Beam 3.
> https://issues.apache.org/jira/browse/BEAM-5530
>
> On Thu, Mar 21, 2019 at 4:15 PM rahul patwari
>  wrote:
> >
> > Hi David,
> >
> > The only incompatibility we have come across is this:
> > We have some timestamp format conversions in our project, where we are 
> > converting from a timestamp format to another.
> >
> > With joda-time 2.4:
> > If we convert "2019-03-15 13:56:12" which is in "-MM-dd HH:mm:ss" 
> > format, to "hh:mm:ss yy-MMM-dd z" format, the converted value is "01:56:12 
> > 19-Mar-15 -07:00".
> >
> > Whereas with joda-time 2.9.3:
> > If we convert "2019-03-15 13:56:12" which is in "-MM-dd HH:mm:ss" 
> > format, to "hh:mm:ss yy-MMM-dd z" format, the converted value is "01:56:12 
> > 19-Mar-15 PDT".
> >
> > The javadoc for both the versions doesn't seem different though, for 'z' 
> > DateTimeFormat.
> >
> > Even though the javadoc says - Zone names: Time zone names ('z') cannot be 
> > parsed for both the versions, we are able to parse it in joda-time 2.9.3.
> >
> > Also, joda-time will be replaced with java time with Beam 3?
> >
> > Thanks,
> > Rahul
> >
> > On Thu, Mar 21, 2019 at 5:37 PM David Morávek  
> > wrote:
> >>
> >> Hello Rahul, are there any incompatibilities you are running into with 
> >> spark version? These versions should be backward compatible.
> >>
> >> For jodatime doc:
> >> The main public API will remain backwards compatible for both source and 
> >> binary in the 2.x stream.
> >>
> >> This means you should be able to safely use Spark's version.
> >>
> >> D.
> >>
> >> On Thu, Mar 21, 2019 at 5:45 AM rahul patwari  
> >> wrote:
> >>>
> >>> Hi Ismael,
> >>>
> >>> We are using Beam with Spark Runner and Spark 2.4 has joda-time 2.9.3 as 
> >>> a dependency. So, we have used joda-time 2.9.3 in our shaded artifact 
> >>> set. As Beam has joda-time 2.4 as a dependency, I was wondering whether 
> >>> it would break anything in Beam.
> >>>
> >>> Will joda-time be replaced with java time in Beam 3? What is the expected 
> >>> release date of Beam 3?
> >>>
> >>> Thanks,
> >>> Rahul
> >>>
> >>> On Wed, Mar 20, 2019 at 7:23 PM Ismaël Mejía  wrote:
> 
>  Hello,
> 
>  The long term goal would be to get rid of joda-time but that won't
>  happen until Beam 3.
>  Any 'particular' reason or motivation to push the upgrade?
> 
>  Regards,
>  Ismaël
> 
>  On Wed, Mar 20, 2019 at 11:53 AM rahul patwari
>   wrote:
>  >
>  > Hi,
>  >
>  > Is there a plan to upgrade the dependency version of joda-time to 
>  > 2.9.3 or latest version?
>  >
>  >
>  > Thanks,
>  > Rahul


Re: joda-time dependency version

2019-03-21 Thread Ismaël Mejía
Mmmm interesting issue. There is also a plan to use a vendored version
of joda-time not sure on the progress on that one.
https://issues.apache.org/jira/browse/BEAM-5827

For Beam 3 that's the idea but  so far there is not at ETA for Beam 3.
https://issues.apache.org/jira/browse/BEAM-5530

On Thu, Mar 21, 2019 at 4:15 PM rahul patwari
 wrote:
>
> Hi David,
>
> The only incompatibility we have come across is this:
> We have some timestamp format conversions in our project, where we are 
> converting from a timestamp format to another.
>
> With joda-time 2.4:
> If we convert "2019-03-15 13:56:12" which is in "-MM-dd HH:mm:ss" format, 
> to "hh:mm:ss yy-MMM-dd z" format, the converted value is "01:56:12 19-Mar-15 
> -07:00".
>
> Whereas with joda-time 2.9.3:
> If we convert "2019-03-15 13:56:12" which is in "-MM-dd HH:mm:ss" format, 
> to "hh:mm:ss yy-MMM-dd z" format, the converted value is "01:56:12 19-Mar-15 
> PDT".
>
> The javadoc for both the versions doesn't seem different though, for 'z' 
> DateTimeFormat.
>
> Even though the javadoc says - Zone names: Time zone names ('z') cannot be 
> parsed for both the versions, we are able to parse it in joda-time 2.9.3.
>
> Also, joda-time will be replaced with java time with Beam 3?
>
> Thanks,
> Rahul
>
> On Thu, Mar 21, 2019 at 5:37 PM David Morávek  wrote:
>>
>> Hello Rahul, are there any incompatibilities you are running into with spark 
>> version? These versions should be backward compatible.
>>
>> For jodatime doc:
>> The main public API will remain backwards compatible for both source and 
>> binary in the 2.x stream.
>>
>> This means you should be able to safely use Spark's version.
>>
>> D.
>>
>> On Thu, Mar 21, 2019 at 5:45 AM rahul patwari  
>> wrote:
>>>
>>> Hi Ismael,
>>>
>>> We are using Beam with Spark Runner and Spark 2.4 has joda-time 2.9.3 as a 
>>> dependency. So, we have used joda-time 2.9.3 in our shaded artifact set. As 
>>> Beam has joda-time 2.4 as a dependency, I was wondering whether it would 
>>> break anything in Beam.
>>>
>>> Will joda-time be replaced with java time in Beam 3? What is the expected 
>>> release date of Beam 3?
>>>
>>> Thanks,
>>> Rahul
>>>
>>> On Wed, Mar 20, 2019 at 7:23 PM Ismaël Mejía  wrote:

 Hello,

 The long term goal would be to get rid of joda-time but that won't
 happen until Beam 3.
 Any 'particular' reason or motivation to push the upgrade?

 Regards,
 Ismaël

 On Wed, Mar 20, 2019 at 11:53 AM rahul patwari
  wrote:
 >
 > Hi,
 >
 > Is there a plan to upgrade the dependency version of joda-time to 2.9.3 
 > or latest version?
 >
 >
 > Thanks,
 > Rahul


Re: joda-time dependency version

2019-03-21 Thread rahul patwari
Hi David,

The only incompatibility we have come across is this:
We have some timestamp format conversions in our project, where we are
converting from a timestamp format to another.

With joda-time 2.4:
If we convert "2019-03-15 13:56:12" which is in "-MM-dd HH:mm:ss"
format, to "hh:mm:ss yy-MMM-dd z" format, the converted value is "01:56:12
19-Mar-15 -07:00".

Whereas with joda-time 2.9.3:
If we convert "2019-03-15 13:56:12" which is in "-MM-dd HH:mm:ss"
format, to "hh:mm:ss yy-MMM-dd z" format, the converted value is "01:56:12
19-Mar-15 PDT".

The javadoc for both the versions doesn't seem different though, for 'z'
DateTimeFormat.

Even though the javadoc says - *Zone names*: Time zone names ('z') cannot
be parsed for both the versions, we are able to parse it in joda-time 2.9.3.


Also, joda-time will be replaced with java time with Beam 3?

Thanks,
Rahul

On Thu, Mar 21, 2019 at 5:37 PM David Morávek 
wrote:

> Hello Rahul, are there any incompatibilities you are running into with
> spark version? These versions should be backward compatible.
>
> For jodatime doc:
> The main public API will remain *backwards compatible* for both source
> and binary in the 2.x stream.
>
> This means you should be able to safely use Spark's version.
>
> D.
>
> On Thu, Mar 21, 2019 at 5:45 AM rahul patwari 
> wrote:
>
>> Hi Ismael,
>>
>> We are using Beam with Spark Runner and Spark 2.4 has joda-time 2.9.3 as
>> a dependency. So, we have used joda-time 2.9.3 in our shaded artifact set.
>> As Beam has joda-time 2.4 as a dependency, I was wondering whether it would
>> break anything in Beam.
>>
>> Will joda-time be replaced with java time in Beam 3? What is the expected
>> release date of Beam 3?
>>
>> Thanks,
>> Rahul
>>
>> On Wed, Mar 20, 2019 at 7:23 PM Ismaël Mejía  wrote:
>>
>>> Hello,
>>>
>>> The long term goal would be to get rid of joda-time but that won't
>>> happen until Beam 3.
>>> Any 'particular' reason or motivation to push the upgrade?
>>>
>>> Regards,
>>> Ismaël
>>>
>>> On Wed, Mar 20, 2019 at 11:53 AM rahul patwari
>>>  wrote:
>>> >
>>> > Hi,
>>> >
>>> > Is there a plan to upgrade the dependency version of joda-time to
>>> 2.9.3 or latest version?
>>> >
>>> >
>>> > Thanks,
>>> > Rahul
>>>
>>


Re: joda-time dependency version

2019-03-21 Thread David Morávek
Hello Rahul, are there any incompatibilities you are running into with
spark version? These versions should be backward compatible.

For jodatime doc:
The main public API will remain *backwards compatible* for both source and
binary in the 2.x stream.

This means you should be able to safely use Spark's version.

D.

On Thu, Mar 21, 2019 at 5:45 AM rahul patwari 
wrote:

> Hi Ismael,
>
> We are using Beam with Spark Runner and Spark 2.4 has joda-time 2.9.3 as a
> dependency. So, we have used joda-time 2.9.3 in our shaded artifact set. As
> Beam has joda-time 2.4 as a dependency, I was wondering whether it would
> break anything in Beam.
>
> Will joda-time be replaced with java time in Beam 3? What is the expected
> release date of Beam 3?
>
> Thanks,
> Rahul
>
> On Wed, Mar 20, 2019 at 7:23 PM Ismaël Mejía  wrote:
>
>> Hello,
>>
>> The long term goal would be to get rid of joda-time but that won't
>> happen until Beam 3.
>> Any 'particular' reason or motivation to push the upgrade?
>>
>> Regards,
>> Ismaël
>>
>> On Wed, Mar 20, 2019 at 11:53 AM rahul patwari
>>  wrote:
>> >
>> > Hi,
>> >
>> > Is there a plan to upgrade the dependency version of joda-time to 2.9.3
>> or latest version?
>> >
>> >
>> > Thanks,
>> > Rahul
>>
>


Is AvroCoder the right coder for me?

2019-03-21 Thread augusto . mcc
Hi

I am trying out Beam to do some data aggregations. Many of the inputs/outputs 
of my transforms are complex objects (not super complex, but containing 
Maps/Lists/Sets sometimes) so when I was prompted to defined a coder to these 
objects I added the annotation @DefaultCoder(AvroCoder.class) and things worked 
in my development environment.

Now that I am trying to run in on "real" data I notice that after I deployed it 
to a spark runner and looking at some thread dumps, many of the threads were 
blocked on the following method on the Avro library 
(ReflectData.getAccessorsFor). So my question is, did I do the wrong thing by 
using the AvroCoder or is there some other coder that easily can solve my 
problem?

Best regards,
Augusto