Re: python precommit error - google-auth depenedency?

2020-06-11 Thread Udi Meiri
BTW, the new pip resolver seems to do the right thing by installing
rsa==4.0 instead of 4.2 in this case (upgrade pip to 20.1.1):

pip --unstable-feature=resolver install apache-beam[gcp]

https://discuss.python.org/t/an-update-on-pip-and-dependency-resolution/1898/4

On Thu, Jun 11, 2020 at 2:40 PM Valentyn Tymofieiev 
wrote:

> > In python 2 oauth2client's rsa>3.14 requirement will resolve to
> latest python2 supporting version of rsa (4.0?)
>
> Unfortunately rsa 4.1 didn't set a python_requires stanza to prevent the
> breakage of Py2 users, opened:
> https://github.com/sybrenstuvel/python-rsa/issues/152.
>
> On Wed, Jun 10, 2020 at 7:14 PM Ahmet Altay  wrote:
>
>>
>>
>> On Wed, Jun 10, 2020 at 7:11 PM Bu Sun Kim  wrote:
>>
>>> Hi,
>>>
>>> google-auth has been released (with the wider pin
>>> 
>>>  on
>>> rsa).
>>>
>>
>> Thank you! Much appreciated!
>>
>>
>>>
>>> On Wed, Jun 10, 2020 at 6:07 PM Ahmet Altay  wrote:
>>>


 On Wed, Jun 10, 2020 at 4:07 PM Kyle Weaver 
 wrote:

> The fix to google-auth has been merged. Is the plan just to wait until
> a new version of google-auth is released and ignore the failing tests 
> until
> then? (btw I filed a JIRA for this before I realized it was already being
> discussed here: https://issues.apache.org/jira/browse/BEAM-10232)
>

 Could we add it as a test dependency? Or if that is not possible, add
 it but remove it before next release?

 It seems like there is a release PR on google-auth (
 https://github.com/googleapis/google-auth-library-python/pull/525). I
 asked +Bu Sun Kim  on the PR, they usually
 release pretty quickly.


>
> On Wed, Jun 10, 2020 at 3:21 PM Udi Meiri  wrote:
>
>> Yes you're right, Py2 envs are still using 4.0.
>>
>> On Wed, Jun 10, 2020 at 3:03 PM Ahmet Altay  wrote:
>>
>>>
>>>
>>> On Wed, Jun 10, 2020 at 2:25 PM Udi Meiri  wrote:
>>>
 4.1 drops Python 2 support, so I'm not sure if we're ready for that
 yet.

>>>
>>> Wouldn't that work by default? In python 2 oauth2client's rsa>3.14
>>> requirement will resolve to latest python2 supporting version of rsa 
>>> (4.0?)
>>>
>>>

 On Wed, Jun 10, 2020 at 2:20 PM Ahmet Altay 
 wrote:

> Looks like there is an attempt to fix this:
> https://github.com/googleapis/google-auth-library-python/pull/524
>
> On Wed, Jun 10, 2020 at 2:07 PM Udi Meiri 
> wrote:
>
>>
>>
>> On Wed, Jun 10, 2020 at 1:59 PM Ahmet Altay 
>> wrote:
>>
>>>
>>>
>>> On Wed, Jun 10, 2020 at 1:29 PM Kenneth Knowles 
>>> wrote:
>>>
 You may be interested in following
 https://github.com/pypa/pip/issues/988 if you are not already.

 Kenn

 On Wed, Jun 10, 2020 at 12:17 PM Udi Meiri 
 wrote:

> Seems like manually installing rsa==4.0 satisfies deps, but
> pip doesn't do transitive deps well.
>
> Would it be right to put a direct dependency
> on rsa<4.1,>=3.1.4 in setup.py?
>

>>> Did you find where the google-auth dependency is coming from? We
>>> might try to fix the problem at the source of that dependency 
>>> instead of
>>> adding rsa to beam's setup.py.
>>>
>>
>> oauth2client depends on rsa>=3.14 with no upper limit. rsa 4.1
>> was released today.
>> The places that require rsa<4.1 are deeper in the dependency
>> tree. For example:
>>
>> google-cloud-bigquery==1.24.0
>>   - google-api-core [required: >=1.15.0,<2.0dev, installed:
>> 1.20.0]
>> - google-auth [required: >=1.14.0,<2.0dev, installed: 1.16.1]
>>   - rsa [required: >=3.1.4,<4.1, installed: 4.1]
>>
>>
>>>

> On Wed, Jun 10, 2020 at 11:48 AM Udi Meiri 
> wrote:
>
>> Thanks, that helped in an unexpected way. :)
>> I should have used the "gcp" extra instead of "cloud" in my
>> pip install command above.
>>
>> On Wed, Jun 10, 2020 at 11:37 AM Valentyn Tymofieiev <
>> valen...@google.com> wrote:
>>
>>> > Any ideas on how to debug where this requirement is coming
>>> from?
>>> You could try installing and calling pipdeptree [1] from a
>>> Jenkins job, and see if it helps.
>>>
>>> [1] https://pypi.org/project/pipdeptree/
>>> On Wed, Jun 10, 2020 at 11:00 AM Udi Meiri 
>>> wrote

Re: python precommit error - google-auth depenedency?

2020-06-11 Thread Valentyn Tymofieiev
> In python 2 oauth2client's rsa>3.14 requirement will resolve to
latest python2 supporting version of rsa (4.0?)

Unfortunately rsa 4.1 didn't set a python_requires stanza to prevent the
breakage of Py2 users, opened:
https://github.com/sybrenstuvel/python-rsa/issues/152.

On Wed, Jun 10, 2020 at 7:14 PM Ahmet Altay  wrote:

>
>
> On Wed, Jun 10, 2020 at 7:11 PM Bu Sun Kim  wrote:
>
>> Hi,
>>
>> google-auth has been released (with the wider pin
>> 
>>  on
>> rsa).
>>
>
> Thank you! Much appreciated!
>
>
>>
>> On Wed, Jun 10, 2020 at 6:07 PM Ahmet Altay  wrote:
>>
>>>
>>>
>>> On Wed, Jun 10, 2020 at 4:07 PM Kyle Weaver  wrote:
>>>
 The fix to google-auth has been merged. Is the plan just to wait until
 a new version of google-auth is released and ignore the failing tests until
 then? (btw I filed a JIRA for this before I realized it was already being
 discussed here: https://issues.apache.org/jira/browse/BEAM-10232)

>>>
>>> Could we add it as a test dependency? Or if that is not possible, add it
>>> but remove it before next release?
>>>
>>> It seems like there is a release PR on google-auth (
>>> https://github.com/googleapis/google-auth-library-python/pull/525). I
>>> asked +Bu Sun Kim  on the PR, they usually release
>>> pretty quickly.
>>>
>>>

 On Wed, Jun 10, 2020 at 3:21 PM Udi Meiri  wrote:

> Yes you're right, Py2 envs are still using 4.0.
>
> On Wed, Jun 10, 2020 at 3:03 PM Ahmet Altay  wrote:
>
>>
>>
>> On Wed, Jun 10, 2020 at 2:25 PM Udi Meiri  wrote:
>>
>>> 4.1 drops Python 2 support, so I'm not sure if we're ready for that
>>> yet.
>>>
>>
>> Wouldn't that work by default? In python 2 oauth2client's rsa>3.14
>> requirement will resolve to latest python2 supporting version of rsa 
>> (4.0?)
>>
>>
>>>
>>> On Wed, Jun 10, 2020 at 2:20 PM Ahmet Altay 
>>> wrote:
>>>
 Looks like there is an attempt to fix this:
 https://github.com/googleapis/google-auth-library-python/pull/524

 On Wed, Jun 10, 2020 at 2:07 PM Udi Meiri  wrote:

>
>
> On Wed, Jun 10, 2020 at 1:59 PM Ahmet Altay 
> wrote:
>
>>
>>
>> On Wed, Jun 10, 2020 at 1:29 PM Kenneth Knowles 
>> wrote:
>>
>>> You may be interested in following
>>> https://github.com/pypa/pip/issues/988 if you are not already.
>>>
>>> Kenn
>>>
>>> On Wed, Jun 10, 2020 at 12:17 PM Udi Meiri 
>>> wrote:
>>>
 Seems like manually installing rsa==4.0 satisfies deps, but pip
 doesn't do transitive deps well.

 Would it be right to put a direct dependency on rsa<4.1,>=3.1.4
 in setup.py?

>>>
>> Did you find where the google-auth dependency is coming from? We
>> might try to fix the problem at the source of that dependency 
>> instead of
>> adding rsa to beam's setup.py.
>>
>
> oauth2client depends on rsa>=3.14 with no upper limit. rsa 4.1 was
> released today.
> The places that require rsa<4.1 are deeper in the dependency tree.
> For example:
>
> google-cloud-bigquery==1.24.0
>   - google-api-core [required: >=1.15.0,<2.0dev, installed: 1.20.0]
> - google-auth [required: >=1.14.0,<2.0dev, installed: 1.16.1]
>   - rsa [required: >=3.1.4,<4.1, installed: 4.1]
>
>
>>
>>>
 On Wed, Jun 10, 2020 at 11:48 AM Udi Meiri 
 wrote:

> Thanks, that helped in an unexpected way. :)
> I should have used the "gcp" extra instead of "cloud" in my
> pip install command above.
>
> On Wed, Jun 10, 2020 at 11:37 AM Valentyn Tymofieiev <
> valen...@google.com> wrote:
>
>> > Any ideas on how to debug where this requirement is coming
>> from?
>> You could try installing and calling pipdeptree [1] from a
>> Jenkins job, and see if it helps.
>>
>> [1] https://pypi.org/project/pipdeptree/
>> On Wed, Jun 10, 2020 at 11:00 AM Udi Meiri 
>> wrote:
>>
>>> Hi,
>>> I'm trying to understand these "pip check" failures:
>>>
>>> ERROR: google-auth 1.16.1 has requirement rsa<4.1,>=3.1.4, but 
>>> you'll have rsa 4.1 which is incompatible
>>>
>>>
>>>
>>> https://builds.apache.org/job/beam_PreCommit_Python_Cron/2860/console
>>>
>>> However, when I do
>>> pip install dist/apache-beam-2.23.0.dev0.tar.

Re: Remove EOL'd Runners

2020-06-11 Thread Kenneth Knowles
I have "archived" the `runner-gearpump` Jira component. This makes it so
new issues cannot be associated. It can be unarchived any time. It does not
close or hide associated issues (makes sense: in Jira's data model, issues
are not "in" components, but associated with one or more)

Kenn

On Wed, Jun 10, 2020 at 12:07 PM Tyson Hamilton  wrote:

> Sounds good, thanks.
>
> I removed Gearpump first and will move on to Apex later today. When that
> PR is merged we can clean up the Jenkins jobs in one swoop for both removed
> runners.
>
> On Wed, Jun 10, 2020, 11:18 AM Luke Cwik  wrote:
>
>> The jobs won't be deleted but will be disabled. I can help delete the
>> jobs from Jenkins once the jenkins configurations are removed either ping
>> me directly or update this thread when that should be done.
>>
>> On Wed, Jun 10, 2020 at 10:38 AM Kenneth Knowles  wrote:
>>
>>> +1
>>>
>>> All Jenkins configs are in the repo. There's a lag between merge and run
>>> of the "seed job" that syncs our configs. We can do a manual run of it, or
>>> just not worry about the temporary redness in the jobs that will be deleted
>>> anyhow.
>>>
>>> On Wed, Jun 10, 2020 at 8:57 AM Jan Lukavský  wrote:
>>>
 +1
 On 6/10/20 5:51 PM, David Morávek wrote:

 +1

 On Tue, Jun 9, 2020 at 7:43 PM Ahmet Altay  wrote:

> Thank you Tyson!
>
> On Tue, Jun 9, 2020 at 10:20 AM Thomas Weise  wrote:
>
>> +1
>>
>>
>> On Tue, Jun 9, 2020 at 9:41 AM Robert Bradshaw 
>> wrote:
>>
>>> Makes sense to me.
>>>
>>> On Tue, Jun 9, 2020 at 8:45 AM Maximilian Michels 
>>> wrote:
>>>
 Thanks of the heads-up, Tyson! It's a sensible decision to remove
 unsupported runners.

 -Max

 On 09.06.20 16:51, Tyson Hamilton wrote:
 > Hi All,
 >
 > As part of the Fixit [1] I'd like to remove EOL'd runners, Apex
 and Gearpump, as described in BEAM- [2]. This will be a big PR I 
 think
 and didn't want anyone to be surprised. There is already some 
 agreement in
 the linked Jira issue. If there are no objections I'll get started 
 later
 today or tomorrow.
 >
 > -Tyson
 >
 >
 > [1]:
 https://lists.apache.org/thread.html/r9ddc77a8fee58ad02f68e2d9a7f054aab3e55717cc88ad1d5bc49311%40%3Cdev.beam.apache.org%3E
 > [2]: https://issues.apache.org/jira/browse/BEAM-
 >

>>>


Re: Automation for Jira

2020-06-11 Thread Kenneth Knowles
Yes, my inbox is hit as well. I'm enjoying going through some old bugs
actually. One takeaway is that we have a lot of early Jiras that are still
relevant, and also that there are a lot of duplicates. I think some
automation to help find duplicates might be helpful.

Also, some accidental automation humor:
https://issues.apache.org/jira/browse/BEAM-6414

Kenn

On Tue, Jun 2, 2020 at 8:39 AM Brian Hulette  wrote:

> RIP my inbox :)
> This is overwhelming, but I think it will be very good. Thanks for setting
> this up Kenn.
>
> Brian
>
> On Mon, Jun 1, 2020 at 9:57 PM Kenneth Knowles  wrote:
>
>> I have now added modified 4:
>>
>> 4a. labeling stale-P2 for unassigned 60 day old jiras
>> 4b. after 14 days downgrading stale-P2 labeled jiras to P3
>>
>> On Mon, Jun 1, 2020 at 9:06 PM Kenneth Knowles  wrote:
>>
>>> I just added 3a and 3b. The comments will appear to be coming from me.
>>> That is a misconfiguration that I have now fixed. In the future they will
>>> come from the "Beam Jira Bot". There were 1119 stale-assigned issues.
>>>
>>> Kenn
>>>
>>> On Fri, May 1, 2020 at 1:41 PM Kenneth Knowles  wrote:
>>>
 Based on the mild consensus and my availability, I just did #1. I have
 not done any others. It seems #2 may be infeasible [1] and I am convinced
 that we should not auto-close. I'll update again in a bit...

 Kenn

 [1] https://jira.atlassian.com/browse/JRACLOUD-28064

 On Wed, Apr 29, 2020 at 2:54 PM Ahmet Altay  wrote:

> +1 for the automations. I agree with concerns related to #4. Auto
> closing issues is not a good experience. A person goes through the work of
> reporting an issue. This might very well be their first contribution.
> Automatically closing these issues with no human comments might make the
> reporter feel ignored. Auto-lowering the priority is a good suggestion.
>
> I wonder if we can also do a spring cleaning up reviewing jira
> components/their default owners. If we can break the jira into more
> components, we could have more people as component owners, triaging 
> smaller
> per-component backlogs.
> On Wed, Apr 29, 2020 at 11:17 AM Tyson Hamilton 
> wrote:
>
>> +1 for automation.
>>
>> Regarding #4, what about adding the constraint that this rule only
>> applies to issues that are incomplete and require more information from 
>> the
>> reporter?
>>
>> Unfortunately it would require a human to triage issues to determine
>> this and apply an appropriate label. Triage should happen regularly
>> anyways, ideally even periodically for old issues, though this may be
>> asking a bit too much.
>>
>
> Even with automation, manual triaging would be a valuable action. If
> the automation can reduce the backlog for manual reviewers, doing manual
> triage would be easier to do, incremental work.
>
>
>>
>> Regarding #5 & #6, having some SLO for P0/P1 issues for both updates
>> and closures would be helpful in setting expectations. A daily P0 
>> violation
>> email to dev@ sounds right, for P1 weekly. What would the Slack
>> notification look like? It would be neat if it could ping the assignee
>> directly. What group would be victims for the auto-assigner?
>>
>
> I agree with this. Email, or a dashboard would work equally well. (We
> need to first agree on SLOs though.)
>
>
>
>>
>> On 2020/04/29 17:15:48, Brian Hulette  wrote:
>> > Agree I think this all sounds good except for 4.
>> >
>> > I like the idea of using automation to help tame the backlog of
>> jiras, but
>> > I worry that 4 could lead to a bad experience for users. Say they
>> file a
>> > jira and maybe get it assigned, and then watch as it bounces all
>> the way
>> > down to closed as obsolete because it was ignored.
>> > The status quo (the bug just gets ignored anyway) isn't great, but
>> at least
>> > the user doesn't have automation working against them.
>> >
>> > Is there something else we can do to make sure these bugs get
>> attention?
>> >
>> > Brian
>> >
>> >
>> > On Wed, Apr 29, 2020 at 10:00 AM Robert Bradshaw <
>> rober...@google.com>
>> > wrote:
>> >
>> > > +1 to more automation.
>> > >
>> > > I'm in favor of all but 4, I think it's quite common for issues
>> to be
>> > > noticed but not worked on for 60+ days. Most of the time when a
>> developer
>> > > files an issue they either (1) are working on it right now or (2)
>> are
>> > > filing it away because it's something they're not working on, but
>> should
>> > > get fixed. (Case in point, beginner issues that are not urgent
>> but nice to
>> > > have.) What we could do however is lower the priority after a set
>> amount of
>> > > time. (I suppose issues are a mix of blockers and backlog, a

Application For Google Summer Of Docs

2020-06-11 Thread Ayush Grover
Sir,
First of all, I am Ayush Grover and I have been a technical content writer,
for now, two years now and this year I was very excited to apply at Gsod
and especially in Apache Beam
If you be so kind, can you help me out just by briefing me about what all I
can do to make the application look better?

Apart from that, is there anything else I have to do or contribute to open
source to make my application stand apart?

Regards
Ayush Grover


Re: [External] Re: Ensuring messages are processed and emitted in-order

2020-06-11 Thread Jan Lukavský

Hi,

I'm afraid @RequiresTimeSortedInput currently does not fit this 
requirement, because it works on event _timestamps_ only. Assigning 
Kafka offsets as event timestamps is probably not a good idea. In the 
original proposal [1] there is mention that it would be good to 
implement sorting by some other field (which would be required to 
correlate with timestamps, but that should be the case for Kafka 
offsets). Unfortunately, that is not yet implemented. If you run your 
pipeline in streaming mode, it should be possible to implement this 
yourself using BagState and Timer that will fire periodically, gather 
all elements with timestamp less than 'timerTimestamp - allowed 
lateness', sort them by Kafka offset and process. I wouldn't recommend 
relying on Kafka offsets wouldn't have gaps, because they can (at least 
for compacted topics, but very likely in other cases as well).


Jan

[1] 
https://docs.google.com/document/d/1ObLVUFsf1NcG8ZuIZE4aVy2RYKx2FfyMhkZYWPnI9-c/edit?usp=sharing


On 6/11/20 5:30 PM, Reuven Lax wrote:
I would not recommend using RequiresTimeSortedInput in this way. I 
also would not use ingestion time, as in a distributed environment, 
time skew between workers might mess up the order.


I will ping the discussion on the sorted state API and add you. My 
hope is that this can be implemented efficiently soon, though 
efficient implementation will definitely be runner dependent. If you 
are using Flink, we'll have to figure how to implement this 
efficiently using Flink. Dataflow is planning on providing native 
support for sorted state. I don't know if Flink has native support for 
this, so it might have to be emulated using its existing state primitives.


In the meanwhile, I would suggest using bagstate along with timers. 
The timer can periodically pull sorted messages out of the bag (you 
can use watermark timers here) and write back the messages that still 
have gaps in them.


Reuven

On Wed, Jun 10, 2020 at 6:34 PM Catlyn Kong > wrote:


Thank y’all for the input!



About the RequiresTimeSortedInput, we were thinking of the
following 2 potential approaches:

1.

Assign kafka offset as the timestamp while doing a GroupByKey
on partition_id in a GlobalWindow

2.

Rely on the fact that Flink consumes from kafka partitions in
offset order and assign ingestion time as the timestamp.
(We're using our own non-KafkaIO based Kafka consumer extended
from FlinkKafkaConsumer011 and thus have direct control over
timestamp and watermark assignment)

We find it non-trivial to reason about watermark assignment
especially when taking into consideration that:

1.

there might be restarts at any given time and

2.

advancing watermark in one kafka partition might result in:

1.

dropping elements from other kafka partitions (if we’re
not following native flink approach where we take the
lowest watermark when merging streams) or

2.

delay output from other kafka partitions since they’ll be
buffered.

Is there any recommendation on how this should be handled? In the
direction of using a StatefulDoFn to buffer and reorder, we’re
concerned about performance since we need to serialize and
deserialize the entire BagState (with all the messages) everytime
we process a message. And potentially insert this StatefulDoFn in
multiple places in the pipeline. Is there any benchmark result of
a pipeline that does something similar for us to reference?The
proposal for a sorted state API sounds promising, is there a
ticket/doc that we can follow?



On Wed, Jun 10, 2020 at 1:28 PM Reuven Lax mailto:re...@google.com>> wrote:

I don't know how well RequiresTimeSortedInput will work for
any late data.

I think you will want to include the Kafka offset in your
records (unless the records have their own sequence number)
and then use state to buffer and sort. There is a proposal
(and work in progress) for a sorted state API, which will make
this easier and more efficient.

Reuven

On Wed, Jun 10, 2020 at 1:25 PM Luke Cwik mailto:lc...@google.com>> wrote:

For runners that support @RequiresTimeSortedInput, all
your input will come time sorted (as long as your
element's timestamp tracks the order that you want).
For runners that don't support this, you need to build a
StatefulDoFn that buffers out of order events and reorders
them to the order that you need.

@Pablo Estrada  Any other
suggestions for supporting CDC type pipelines?

On Tue, Jun 9, 2020 at 6:59 PM Catlyn Kong
mailto:catl...@yelp.com>> wrote:

Thanks a lot for the response!

   

Re: [External] Re: Ensuring messages are processed and emitted in-order

2020-06-11 Thread Reuven Lax
I would not recommend using RequiresTimeSortedInput in this way. I also
would not use ingestion time, as in a distributed environment, time skew
between workers might mess up the order.

I will ping the discussion on the sorted state API and add you. My hope is
that this can be implemented efficiently soon, though efficient
implementation will definitely be runner dependent. If you are using Flink,
we'll have to figure how to implement this efficiently using Flink.
Dataflow is planning on providing native support for sorted state. I don't
know if Flink has native support for this, so it might have to be emulated
using its existing state primitives.

In the meanwhile, I would suggest using bagstate along with timers. The
timer can periodically pull sorted messages out of the bag (you can use
watermark timers here) and write back the messages that still have gaps in
them.

Reuven

On Wed, Jun 10, 2020 at 6:34 PM Catlyn Kong  wrote:

> Thank y’all for the input!
>
>
> About the RequiresTimeSortedInput, we were thinking of the following 2
> potential approaches:
>
>1.
>
>Assign kafka offset as the timestamp while doing a GroupByKey on
>partition_id in a GlobalWindow
>2.
>
>Rely on the fact that Flink consumes from kafka partitions in offset
>order and assign ingestion time as the timestamp. (We're using our own
>non-KafkaIO based Kafka consumer extended from FlinkKafkaConsumer011 and
>thus have direct control over timestamp and watermark assignment)
>
> We find it non-trivial to reason about watermark assignment especially
> when taking into consideration that:
>
>1.
>
>there might be restarts at any given time and
>2.
>
>advancing watermark in one kafka partition might result in:
>1.
>
>   dropping elements from other kafka partitions (if we’re not
>   following native flink approach where we take the lowest watermark when
>   merging streams) or
>   2.
>
>   delay output from other kafka partitions since they’ll be buffered.
>
> Is there any recommendation on how this should be handled?
>
> In the direction of using a StatefulDoFn to buffer and reorder, we’re
> concerned about performance since we need to serialize and deserialize the
> entire BagState (with all the messages) everytime we process a message. And
> potentially insert this StatefulDoFn in multiple places in the pipeline. Is
> there any benchmark result of a pipeline that does something similar for us
> to reference?
>
> The proposal for a sorted state API sounds promising, is there a
> ticket/doc that we can follow?
>
>
> On Wed, Jun 10, 2020 at 1:28 PM Reuven Lax  wrote:
>
>> I don't know how well RequiresTimeSortedInput will work for any late data.
>>
>> I think you will want to include the Kafka offset in your records (unless
>> the records have their own sequence number) and then use state to buffer
>> and sort. There is a proposal (and work in progress) for a sorted state
>> API, which will make this easier and more efficient.
>>
>> Reuven
>>
>> On Wed, Jun 10, 2020 at 1:25 PM Luke Cwik  wrote:
>>
>>> For runners that support @RequiresTimeSortedInput, all your input will
>>> come time sorted (as long as your element's timestamp tracks the order that
>>> you want).
>>> For runners that don't support this, you need to build a StatefulDoFn
>>> that buffers out of order events and reorders them to the order that you
>>> need.
>>>
>>> @Pablo Estrada  Any other suggestions for
>>> supporting CDC type pipelines?
>>>
>>> On Tue, Jun 9, 2020 at 6:59 PM Catlyn Kong  wrote:
>>>
 Thanks a lot for the response!

 We have several business use cases that rely strongly on ordering by
 Kafka offset:
 1) streaming unwindowed inner join: say we want to join users with
 reviews on user_id. Here are the schemas for two streams:
 user:

- user_id
- name
- timestamp

 reviews:

- review_id
- user_id
- timestamp

 Here are the messages in each stream ordered by kafka offset:
 user:
 (1, name_a, 60), (2, name_b, 120), (1, name_c, 240)
 reviews:
 (ABC, 1, 90), (DEF, 2, 360)
 I would expect to receive following output messages:
 (1, name_a, ABC) at timestamp 90
 (1, name_c, ABC) at timestamp 240
 (2, name_b, DEF) at timestamp 360
 This can be done in native Flink since Flink kafka consumer reads from
 each partition sequentially. But without an ordering guarantee, we can end
 up with arbitrary results. So how would we implement this in Beam?
 2) unwindowed aggregation: aggregate all the employees for every
 organization. Say we have a new employee stream with the following schema:
 new_employee:

- organization_id
- employee_name

 And here are messaged ordered by kafka offset:
 (1, name_a), (2, name_b), (2, name_c), (1, name_d)
 I would expect the output to be:
 

Re: DoFnSignature#isStateful deprecated

2020-06-11 Thread Jan Lukavský


On 6/11/20 5:02 PM, Reuven Lax wrote:



On Thu, Jun 11, 2020 at 1:26 AM Jan Lukavský > wrote:


Hi,

I'd propose the following:

 - delete all DoFnSignatures.{usesState,usesTimers,...} helpers
*except* for DoFnSignatures.isStateful

Why? Actually seems like maybe the opposite  is better? (remove 
isStateful and keep the others). There are cases where it might be 
useful to know if just timers are used.
I had a feeling there is consensus in this thread to keep only (the 
non-static) DoFnSignature#usesState(), DoFnSignature#usesTimers(), etc. 
The isStateful is needed, because (as mentioned several times here as 
well), a DoFn might require being run as stateful despite it contains no 
user state or timers (but is annotated as @RequiresTimeSortedInput, 
which implies statefulness).


 - DoFnSignatures.isStateful would be equal to
'signature.usesState() || signature.usesTimers() ||
signature.processElement().requiresTimeSortedInput()'

requiresTimeSortedInput does not imply isStateful in general - that 
seems like a runner-dependent thing.


 - fix all _relevant_ places in all runners where are currently
checks for statefulness like
'doFnSignature.stateDeclarations().size() > 0' or
'doFnSignature.usesState()', but with sematics
'DoFnSignatures.isStateful()`

WDYT?

On 5/31/20 2:27 PM, Jan Lukavský wrote:

On 5/30/20 5:39 AM, Kenneth Knowles wrote:

Agree to delete them, though for different reasons. I think this
code comes from a desire to have methods that can be called on a
DoFn directly. And from reviewing the code history I think they
are copied in from another class. So that's why they are the way
they are. Accepting a DoFnSignature would be more appropriate to
the "plural-class-name companion class" pattern. But I doubt the
perf impact of this is ever measurable, and of course not
relative to a big data processing job. If we really wanted the
current API, a cache is trivial, but also not important so we
shouldn't add one.

Reason I think they should be deleted:
1. They seem to exist as a shortcut to people don't forget to
call both DoFnSignatures#usesState and DoFnSignatures#usesTimers
[1]. But now if another relevant method is added, the new method
doesn't include it, so the problem of not forgetting to call all
relevant methods is not solved.


There are multiple ways runners test for "statefulness" of a
DoFn. Some use DoFnSignature#usesState(), some
DoFnSignatures#usesState(), some DoFnSignatures#isStateful() and
some even DoFnSignature.stateDeclarations() > 0. Having so many
ways for a simple check that DoFn needs to be executed as a
stateful seems to be suboptimal.

I don't see anything weird on definition of "stateful dofn",
which is any DoFn, that has the following requirements:

 a) is keyed

 b) requires shuffling same keys to same workers

 c) requires support for both state and timers


2. They seem incorrect [2]. Just because something requires time
sorted input *does not* mean it uses bag state.


Yes, this is unfortunate. What makes the DoFn use bag state is
"when the runner executes the DoFn using default expansion". I
agree this is not the same, but the correct solution seems again
routed to the discussion about pipeline requirements vs. runner
capabilities vs. default and overridden expansions. It would be
better to use the standard expansion mechanism, but AFAIK it is
not possible currently, because it is not possible to simply wrap
two stateful dofns one inside another (that would require dynamic
states).

Jan



Kenn

[1]

https://github.com/apache/beam/blob/dba5f2b9d8625a3be3dae026858ecacf20947616/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java#L2432
[2]

https://github.com/apache/beam/blob/dba5f2b9d8625a3be3dae026858ecacf20947616/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java#L2449

On Fri, May 29, 2020 at 8:46 AM Luke Cwik mailto:lc...@google.com>> wrote:

To go back to your original question.

I would remove the static convenience methods in
DoFnSignatures since they construct a DoFnSignature and then
throw it away. This construction is pretty involved, nothing
as large as an IO call but it would become noticeable if it
was abused. We can already see that it is being used
multiple times in a row [1, 2].

Runners should create their own derived properties based
upon knowledge of how they are implemented and we shouldn't
create derived properties for different concepts (e.g.
merging isStateful and @RequiresTimeSortedInput). If there
is a common implementation that is shared across
multiple runners, it could "translate" a DoFnSignature bas

Re: DoFnSignature#isStateful deprecated

2020-06-11 Thread Reuven Lax
On Thu, Jun 11, 2020 at 1:26 AM Jan Lukavský  wrote:

> Hi,
>
> I'd propose the following:
>
>  - delete all DoFnSignatures.{usesState,usesTimers,...} helpers *except*
> for DoFnSignatures.isStateful
>
Why? Actually seems like maybe the opposite  is better? (remove isStateful
and keep the others). There are cases where it might be useful to know if
just timers are used.

 - DoFnSignatures.isStateful would be equal to 'signature.usesState() ||
> signature.usesTimers() ||
> signature.processElement().requiresTimeSortedInput()'
>
requiresTimeSortedInput does not imply isStateful in general - that seems
like a runner-dependent thing.


>  - fix all _relevant_ places in all runners where are currently checks for
> statefulness like 'doFnSignature.stateDeclarations().size() > 0' or
> 'doFnSignature.usesState()', but with sematics 'DoFnSignatures.isStateful()`
>
> WDYT?
> On 5/31/20 2:27 PM, Jan Lukavský wrote:
>
> On 5/30/20 5:39 AM, Kenneth Knowles wrote:
>
> Agree to delete them, though for different reasons. I think this code
> comes from a desire to have methods that can be called on a DoFn directly.
> And from reviewing the code history I think they are copied in from another
> class. So that's why they are the way they are. Accepting a DoFnSignature
> would be more appropriate to the "plural-class-name companion class"
> pattern. But I doubt the perf impact of this is ever measurable, and of
> course not relative to a big data processing job. If we really wanted the
> current API, a cache is trivial, but also not important so we shouldn't add
> one.
>
>
> Reason I think they should be deleted:
> 1. They seem to exist as a shortcut to people don't forget to call both
> DoFnSignatures#usesState and DoFnSignatures#usesTimers [1]. But now if
> another relevant method is added, the new method doesn't include it, so the
> problem of not forgetting to call all relevant methods is not solved.
>
> There are multiple ways runners test for "statefulness" of a DoFn. Some
> use DoFnSignature#usesState(), some DoFnSignatures#usesState(), some
> DoFnSignatures#isStateful() and some even DoFnSignature.stateDeclarations()
> > 0. Having so many ways for a simple check that DoFn needs to be executed
> as a stateful seems to be suboptimal.
>
> I don't see anything weird on definition of "stateful dofn", which is any
> DoFn, that has the following requirements:
>
>  a) is keyed
>
>  b) requires shuffling same keys to same workers
>
>  c) requires support for both state and timers
>
> 2. They seem incorrect [2]. Just because something requires time sorted
> input *does not* mean it uses bag state.
>
> Yes, this is unfortunate. What makes the DoFn use bag state is "when the
> runner executes the DoFn using default expansion". I agree this is not the
> same, but the correct solution seems again routed to the discussion about
> pipeline requirements vs. runner capabilities vs. default and overridden
> expansions. It would be better to use the standard expansion mechanism, but
> AFAIK it is not possible currently, because it is not possible to simply
> wrap two stateful dofns one inside another (that would require dynamic
> states).
>
> Jan
>
>
> Kenn
>
> [1]
> https://github.com/apache/beam/blob/dba5f2b9d8625a3be3dae026858ecacf20947616/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java#L2432
> [2]
> https://github.com/apache/beam/blob/dba5f2b9d8625a3be3dae026858ecacf20947616/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java#L2449
>
> On Fri, May 29, 2020 at 8:46 AM Luke Cwik  wrote:
>
>> To go back to your original question.
>>
>> I would remove the static convenience methods in DoFnSignatures since
>> they construct a DoFnSignature and then throw it away. This construction is
>> pretty involved, nothing as large as an IO call but it would become
>> noticeable if it was abused. We can already see that it is being used
>> multiple times in a row [1, 2].
>>
>> Runners should create their own derived properties based upon knowledge
>> of how they are implemented and we shouldn't create derived properties for
>> different concepts (e.g. merging isStateful and @RequiresTimeSortedInput).
>> If there is a common implementation that is shared across multiple runners,
>> it could "translate" a DoFnSignature based upon how it is implemented
>> and/or define its own thing.
>>
>> 1:
>> https://github.com/apache/beam/blob/0addd1f08a2e3f424199c1054c06f363bb77a019/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ParDoTranslator.java#L61
>> 2:
>> https://github.com/apache/beam/blob/0addd1f08a2e3f424199c1054c06f363bb77a019/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ParDoTranslatorBatch.java#L73
>>
>> On Wed, May 27, 2020 at 3:16 AM Jan Lukavský  wrote:
>>
>>> Right, this might be about a definition of what these methods really
>>> should return. Currently, the most visible issue is [1]. When a DoFn has 

Re: DoFnSignature#isStateful deprecated

2020-06-11 Thread Jan Lukavský

Hi,

I'd propose the following:

 - delete all DoFnSignatures.{usesState,usesTimers,...} helpers 
*except* for DoFnSignatures.isStateful


 - DoFnSignatures.isStateful would be equal to 'signature.usesState() 
|| signature.usesTimers() || 
signature.processElement().requiresTimeSortedInput()'


 - fix all _relevant_ places in all runners where are currently checks 
for statefulness like 'doFnSignature.stateDeclarations().size() > 0' or 
'doFnSignature.usesState()', but with sematics 'DoFnSignatures.isStateful()`


WDYT?

On 5/31/20 2:27 PM, Jan Lukavský wrote:

On 5/30/20 5:39 AM, Kenneth Knowles wrote:
Agree to delete them, though for different reasons. I think this code 
comes from a desire to have methods that can be called on a DoFn 
directly. And from reviewing the code history I think they are copied 
in from another class. So that's why they are the way they are. 
Accepting a DoFnSignature would be more appropriate to the 
"plural-class-name companion class" pattern. But I doubt the perf 
impact of this is ever measurable, and of course not relative to a 
big data processing job. If we really wanted the current API, a cache 
is trivial, but also not important so we shouldn't add one.


Reason I think they should be deleted:
1. They seem to exist as a shortcut to people don't forget to call 
both DoFnSignatures#usesState and DoFnSignatures#usesTimers [1]. But 
now if another relevant method is added, the new method doesn't 
include it, so the problem of not forgetting to call all relevant 
methods is not solved.


There are multiple ways runners test for "statefulness" of a DoFn. 
Some use DoFnSignature#usesState(), some DoFnSignatures#usesState(), 
some DoFnSignatures#isStateful() and some even 
DoFnSignature.stateDeclarations() > 0. Having so many ways for a 
simple check that DoFn needs to be executed as a stateful seems to be 
suboptimal.


I don't see anything weird on definition of "stateful dofn", which is 
any DoFn, that has the following requirements:


 a) is keyed

 b) requires shuffling same keys to same workers

 c) requires support for both state and timers

2. They seem incorrect [2]. Just because something requires time 
sorted input *does not* mean it uses bag state.


Yes, this is unfortunate. What makes the DoFn use bag state is "when 
the runner executes the DoFn using default expansion". I agree this is 
not the same, but the correct solution seems again routed to the 
discussion about pipeline requirements vs. runner capabilities vs. 
default and overridden expansions. It would be better to use the 
standard expansion mechanism, but AFAIK it is not possible currently, 
because it is not possible to simply wrap two stateful dofns one 
inside another (that would require dynamic states).


Jan



Kenn

[1] 
https://github.com/apache/beam/blob/dba5f2b9d8625a3be3dae026858ecacf20947616/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java#L2432
[2] 
https://github.com/apache/beam/blob/dba5f2b9d8625a3be3dae026858ecacf20947616/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java#L2449


On Fri, May 29, 2020 at 8:46 AM Luke Cwik > wrote:


To go back to your original question.

I would remove the static convenience methods in DoFnSignatures
since they construct a DoFnSignature and then throw it away. This
construction is pretty involved, nothing as large as an IO call
but it would become noticeable if it was abused. We can already
see that it is being used multiple times in a row [1, 2].

Runners should create their own derived properties based upon
knowledge of how they are implemented and we shouldn't create
derived properties for different concepts (e.g. merging
isStateful and @RequiresTimeSortedInput). If there is a common
implementation that is shared across multiple runners, it could
"translate" a DoFnSignature based upon how it is implemented
and/or define its own thing.

1:

https://github.com/apache/beam/blob/0addd1f08a2e3f424199c1054c06f363bb77a019/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ParDoTranslator.java#L61
2:

https://github.com/apache/beam/blob/0addd1f08a2e3f424199c1054c06f363bb77a019/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ParDoTranslatorBatch.java#L73

On Wed, May 27, 2020 at 3:16 AM Jan Lukavský mailto:je...@seznam.cz>> wrote:

Right, this might be about a definition of what these methods
really should return. Currently, the most visible issue is
[1]. When a DoFn has no state or timer, but is annotated with
@RequiresTimeSortedInput this annotation is silently ignored,
because DoFnSignature#usesState returns false and the ParDo
is executed as stateless.

I agree that there are two points - what user declares and
what runner effectively needs to execute a DoFn.