Re: Question related to running unit tests in IDE

2019-10-22 Thread Saikat Maitra
Hi Michal, Alexey

Thank you for your email. I am using macOS Catalina and JDK 8 with IntelliJ
IDEA 2019.1

I will try to setup IntelliJ from scratch and see if the error resolves.

Regards,
Saikat




On Tue, Oct 22, 2019 at 7:05 AM Alexey Romanenko 
wrote:

> Hi,
>
> Thank you for your interest to contribute!
>
> Did you properly imported a project (as explained on page [1]) and all
> deps were resolved successfully?
>
> [1]
> https://cwiki.apache.org/confluence/display/BEAM/Set+up+IntelliJ+from+scratch
>
> On 22 Oct 2019, at 02:28, Saikat Maitra  wrote:
>
> Hi,
>
> I am interested to contribute to this issue
>
> https://issues.apache.org/jira/browse/BEAM-3658
>
> I have followed the contribution guide and was able to build the project
> locally using gradlew commands.
>
> I wanted to debug and trace the issue further by running the tests locally
> using Intellij Idea but I am getting following errors. I looked up the docs
> related to running tests (
> https://cwiki.apache.org/confluence/display/BEAM/Run+a+single+unit+test)
> and common IDE errors (
> https://cwiki.apache.org/confluence/display/BEAM/%28FAQ%29+Recovering+from+common+IDE+errors)
> but have not found similar errors.
>
> Error:(632, 17) java: cannot find symbol
>   symbol:   method
> apply(org.apache.beam.sdk.transforms.Values)
>   location: interface org.apache.beam.sdk.values.POutput
>
> Error:(169, 26) java: cannot find symbol
>   symbol:   class PCollection
>   location: class org.apache.beam.sdk.transforms.Watch
>
> Error:(169, 59) java: cannot find symbol
>   symbol:   class KV
>   location: class org.apache.beam.sdk.transforms.Watch
>
> Please let me know if you have feedback.
>
> Regards,
> Saikat
>
>
>


Re: Proposal: Dynamic timer support (BEAM-6857)

2019-10-22 Thread Reza Rokni
+1 on this, having the ability to create timers based on data would make a
bunch of use cases easier to write.

Any thoughts on having a isSet() / read() / setMinimum(timeStamp) type
ability?

On Wed, 23 Oct 2019 at 00:52, Reuven Lax  wrote:

> Kenn:
> +1 to using TimerFamily instead of TimerId and TimerMap.
>
> Jan:
> This is definitely not just for DSLs. I've definitely seen cases where the
> user wants different timers based on input data, so they cannot be defined
> statically. As a thought experiment: one stated goal of state + timers was
> to provide the low-level tools we use to implement windowing. However to
> implement windowing you need a dynamic set of timers, not just a single
> one. Now most users don't need to reimplement windowing (though we have had
> some users who had that need, when they wanted something slightly different
> than what native Beam windowing provided), however the need for dynamic
> timers is not unheard of.
>
> +1 to allowing dynamic state. However I think this is separate enough from
> timers that it doesn't need to be coupled in this discussion. Dynamic state
> also raises the wrinkle of pipeline compatibility (as you mentioned),
> which I think is a bit less of an issue for dynamic timers.
>
> Allowing a DSL to specify a DoFnSignature does not quite solve this
> problem. The DSL still needs a way to set and process the timers. It also
> does not solve the problem where the timers are based on input data
> elements, so cannot be known at pipeline construction time. However what
> might be more important is statically defining the timer families, and a
> DSL could do this by specifying a DoFnSignature (and something similar
> could be done with state). Also as mentioned above, this is useful to
> normal Beam users as well, and we shouldn't force normal users to start
> dealing with DoFnSignatures and DoFnInvokers.
>
>
>
>
>
>
> On Tue, Oct 22, 2019 at 7:56 AM Jan Lukavský  wrote:
>
>> Hi Max,
>>
>> wouldn't that be actually the same as
>>
>> class MyDoFn extends DoFn {
>>
>>
>>@ProcessElement
>>public void process(
>>ProcessContext context) {
>>  // "get" would register a new TimerSpec
>>  Timer timer1 = context.getTimer("timer1");
>>  Timer timer2 = context.getTimer("timer2");
>>  timers.set(...);
>>  timers.set(...);
>>}
>>
>> That is - no need to declare anything? One more concern about that - if
>> we allow registration of timers (or even state) dynamically like that it
>> might be harder to perform validation of pipeline upon upgrades.
>>
>> Jan
>>
>> On 10/22/19 4:47 PM, Maximilian Michels wrote:
>> > The idea makes sense to me. I really like that Beam gives upfront
>> > specs for timer and state, but it is not flexible enough for
>> > timer-based libraries or for users which want to dynamically generate
>> > timers.
>> >
>> > I'm not sure about the proposed API yet. Shouldn't we separate the
>> > timer specs from setting actual timers?
>> >
>> > Suggestion:
>> >
>> > class MyDoFn extends DoFn {
>> >   @TimerMap TimerMap timers = TimerSpecs.timerMap();
>> >
>> >   @ProcessElement
>> >   public void process(
>> >   @Element String e,
>> >   @TimerMap TimerMap timers)) {
>> > // "get" would register a new TimerSpec
>> > Timer timer1 = timers.get("timer1");
>> > Timer timer2 = timers.get("timer2");
>> > timers.set(...);
>> > timers.set(...);
>> >   }
>> >
>> >   // No args for "@OnTimer" => use generic TimerMap
>> >   @OnTimer
>> >   public void onTimer(
>> >   @TimerId String timerFired,
>> >   @Timestamp Instant timerTs,
>> >   @TimerMap TimerMap timers) {
>> >  // Timer firing
>> >  ...
>> >  // Set this timer (or another)
>> >  Timer timer = timers.get(timerFired);
>> >  timer.set(...);
>> >   }
>> > }
>> >
>> > What do you think?
>> >
>> > -Max
>> >
>> > On 22.10.19 10:35, Jan Lukavský wrote:
>> >> Hi Kenn,
>> >>
>> >> On 10/22/19 2:48 AM, Kenneth Knowles wrote:
>> >>> This seems extremely useful.
>> >>>
>> >>> I assume you mean `@OnTimer("timers")` in your example. I would
>> >>> suggest that the parameter annotation be something other
>> >>> than @TimerId since that annotation is already used for a very
>> >>> similar but different purpose; they are close enough that it is
>> >>> tempting to pun them, but it is clearer to keep them distinct IMO.
>> >>> Perhaps @TimerName or @TimerKey or some such. Alternatively,
>> >>> keep @TimerId in the parameter list and change the declaration
>> >>> to @TimerFamily("timers"). I think "family" or "group" may be more
>> >>> clear naming than "map".
>> >>>
>> >>> At the portability level, this API does seem to be pretty close to a
>> >>> noop in terms of the messages that needs to be sent over the Fn API,
>> >>> so it makes sense to loosen the protos. By the time the Fn API is in
>> >>> play, all of our desires to catch errors prior to execution are
>> >>> irrelevant anyhow.
>> >>>
>> >>> On the other hand, I think DSLs have 

Re: Hi

2019-10-22 Thread Saikat Maitra
Thank you Aizhamal

Regards,
Saikat

On Tue, 22 Oct 2019 at 7:00 AM, Aizhamal Nurmamat kyzy 
wrote:

> Welcome Saikat!
>
> On Sun, Oct 20, 2019, 5:55 AM Saikat Maitra 
> wrote:
>
>> Hi Kenneth,
>>
>> Thank you, much appreciate your help.
>>
>> Warm regards,
>> Saikat
>>
>> On Sat, Oct 19, 2019 at 10:40 PM Kenneth Knowles  wrote:
>>
>>> Welcome! I've added you to the Contributors role.
>>>
>>> Kenn
>>>
>>> On Sat, Oct 19, 2019 at 5:40 PM Saikat Maitra 
>>> wrote:
>>>
 Hi,

 I am Saikat and I am committer in Apache Ignite project.

 I am interested in joining the Apache Beam community and contribute to
 following Apache Ignite integration.

 https://issues.apache.org/jira/browse/IGNITE-7198

 I also wanted to get familiar with Apache Beam project and wanted to
 take up few open issues like below.

 https://issues.apache.org/jira/browse/BEAM-3658

 Can someone please add me in Apache Beam contributor list?

 Regards,

 Saikat

>>>


Re: Install Jenkins AnsiColor plugin

2019-10-22 Thread Udi Meiri
Your proposal will only affect the seed job (which doesn't do color outputs
AFAIK).
I think you want to add colorizeOutput() here:
https://github.com/apache/beam/blob/bfebbd0d16361f61fa40bfdec2f0cb6f943f7c9a/.test-infra/jenkins/CommonJobProperties.groovy#L79-L95

Otherwise no concerns from me.

On Tue, Oct 22, 2019 at 12:01 PM Chad Dombrova  wrote:

> thanks, so IIUC, I’m going to update job_00_seed.groovy like this:
>
>   wrappers {
> colorizeOutput()
> timeout {
>   absolute(60)
>   abortBuild()
> }
>   }
>
> Then add the comment run seed job
>
> Does anyone have any concerns with me trying this out now?
>
> -chad
>
> On Tue, Oct 22, 2019 at 11:42 AM Udi Meiri  wrote:
>
>> Also note that changing the job DSL doesn't take effect until the "seed"
>> job runs. (use the "run seed job" phrase)
>>
>> On Tue, Oct 22, 2019 at 11:06 AM Chad Dombrova  wrote:
>>
>>> Thanks, I'll look into this.  I have a PR I'm building up with a handful
>>> of minor changes related to this.
>>>
>>>
>>>
>>> On Tue, Oct 22, 2019 at 10:45 AM Yifan Zou  wrote:
>>>
 Thanks, Udi! The ansicolor plugin was applied to ASF Jenkins
 universally. You might need to explicitly enable the coloroutput in your
 jenkins dsl.

 On Tue, Oct 22, 2019 at 10:33 AM Udi Meiri  wrote:

> Seems to be already installed:
> https://issues.apache.org/jira/browse/INFRA-16944
> Do we just need to enable it somehow?
> This might work:
> https://jenkinsci.github.io/job-dsl-plugin/#method/javaposse.jobdsl.dsl.helpers.wrapper.WrapperContext.colorizeOutput
>
> BTW, our Jenkins is maintained by ASF's Infrastructure team:
> https://cwiki.apache.org/confluence/display/INFRA/Jenkins
>
> On Tue, Oct 22, 2019 at 10:23 AM Chad Dombrova 
> wrote:
>
>> Hi all,
>> As a user trying to grok failures in jenkins I think it would be a
>> huge help to have color output support.  This is something that works out
>> of the box for CI tools like gitlab and travis, and it really helps bring
>> that 21st century feel to your logs :)
>>
>> There's a Jenkins plugin for colorizing ansi escape sequences here:
>> https://plugins.jenkins.io/ansicolor
>>
>> I think this is something that has to be deployed by a Jenkins admin.
>>
>> -chad
>>
>>


smime.p7s
Description: S/MIME Cryptographic Signature


Reviewer needed for clustering bug fix in Java BigQueryIO.Write

2019-10-22 Thread Jeff Klukas
I've had a one-line bug fix PR open since last week that I'd love to get
merged for 2.17.

Would a committer be willing to take a look at it?

https://github.com/apache/beam/pull/9784


Re: Contributor permission for Beam Jira tickets

2019-10-22 Thread Israel Herraiz
Thank you!


On Tue, Oct 22, 2019 at 4:52 PM Jean-Baptiste Onofré 
wrote:

> Hi Israel,
>
> Welcome aboard !
>
> I just added you in contributors on Jira.
>
> Regards
> JB
>
> On 22/10/2019 21:43, Israel Herraiz wrote:
> > Hi,
> >
> > My name is Israel, and I work in the Professional Services team at
> Google.
> >
> > I have created a JIRA issue and submitted the corresponding pull request
> > (see https://issues.apache.org/jira/browse/BEAM-8458), and I would like
> > to be able to assign the issue to myself.
> >
> > That's my second pull request sent to Apache Beam.
> >
> > My JIRA username is iht
> > (see https://issues.apache.org/jira/secure/ViewProfile.jspa?name=iht).
> >
> > Please could anyone give me that permission in JIRA?
> >
> > Thanks in advance.
> >
> > Israel
> >
>
> --
> Jean-Baptiste Onofré
> jbono...@apache.org
> http://blog.nanthrax.net
> Talend - http://www.talend.com
>


Re: Contributor permission for Beam Jira tickets

2019-10-22 Thread Jean-Baptiste Onofré
Hi Israel,

Welcome aboard !

I just added you in contributors on Jira.

Regards
JB

On 22/10/2019 21:43, Israel Herraiz wrote:
> Hi,
> 
> My name is Israel, and I work in the Professional Services team at Google.
> 
> I have created a JIRA issue and submitted the corresponding pull request
> (see https://issues.apache.org/jira/browse/BEAM-8458), and I would like
> to be able to assign the issue to myself.
> 
> That's my second pull request sent to Apache Beam.
> 
> My JIRA username is iht
> (see https://issues.apache.org/jira/secure/ViewProfile.jspa?name=iht).
> 
> Please could anyone give me that permission in JIRA?
> 
> Thanks in advance.
> 
> Israel
> 

-- 
Jean-Baptiste Onofré
jbono...@apache.org
http://blog.nanthrax.net
Talend - http://www.talend.com


Contributor permission for Beam Jira tickets

2019-10-22 Thread Israel Herraiz
Hi,

My name is Israel, and I work in the Professional Services team at Google.

I have created a JIRA issue and submitted the corresponding pull request
(see https://issues.apache.org/jira/browse/BEAM-8458), and I would like to
be able to assign the issue to myself.

That's my second pull request sent to Apache Beam.

My JIRA username is iht (see
https://issues.apache.org/jira/secure/ViewProfile.jspa?name=iht).

Please could anyone give me that permission in JIRA?

Thanks in advance.

Israel


Re: Install Jenkins AnsiColor plugin

2019-10-22 Thread Chad Dombrova
thanks, so IIUC, I’m going to update job_00_seed.groovy like this:

  wrappers {
colorizeOutput()
timeout {
  absolute(60)
  abortBuild()
}
  }

Then add the comment run seed job

Does anyone have any concerns with me trying this out now?

-chad

On Tue, Oct 22, 2019 at 11:42 AM Udi Meiri  wrote:

> Also note that changing the job DSL doesn't take effect until the "seed"
> job runs. (use the "run seed job" phrase)
>
> On Tue, Oct 22, 2019 at 11:06 AM Chad Dombrova  wrote:
>
>> Thanks, I'll look into this.  I have a PR I'm building up with a handful
>> of minor changes related to this.
>>
>>
>>
>> On Tue, Oct 22, 2019 at 10:45 AM Yifan Zou  wrote:
>>
>>> Thanks, Udi! The ansicolor plugin was applied to ASF Jenkins
>>> universally. You might need to explicitly enable the coloroutput in your
>>> jenkins dsl.
>>>
>>> On Tue, Oct 22, 2019 at 10:33 AM Udi Meiri  wrote:
>>>
 Seems to be already installed:
 https://issues.apache.org/jira/browse/INFRA-16944
 Do we just need to enable it somehow?
 This might work:
 https://jenkinsci.github.io/job-dsl-plugin/#method/javaposse.jobdsl.dsl.helpers.wrapper.WrapperContext.colorizeOutput

 BTW, our Jenkins is maintained by ASF's Infrastructure team:
 https://cwiki.apache.org/confluence/display/INFRA/Jenkins

 On Tue, Oct 22, 2019 at 10:23 AM Chad Dombrova 
 wrote:

> Hi all,
> As a user trying to grok failures in jenkins I think it would be a
> huge help to have color output support.  This is something that works out
> of the box for CI tools like gitlab and travis, and it really helps bring
> that 21st century feel to your logs :)
>
> There's a Jenkins plugin for colorizing ansi escape sequences here:
> https://plugins.jenkins.io/ansicolor
>
> I think this is something that has to be deployed by a Jenkins admin.
>
> -chad
>
>


Re: Install Jenkins AnsiColor plugin

2019-10-22 Thread Udi Meiri
Also note that changing the job DSL doesn't take effect until the "seed"
job runs. (use the "run seed job" phrase)

On Tue, Oct 22, 2019 at 11:06 AM Chad Dombrova  wrote:

> Thanks, I'll look into this.  I have a PR I'm building up with a handful
> of minor changes related to this.
>
>
>
> On Tue, Oct 22, 2019 at 10:45 AM Yifan Zou  wrote:
>
>> Thanks, Udi! The ansicolor plugin was applied to ASF Jenkins universally.
>> You might need to explicitly enable the coloroutput in your jenkins dsl.
>>
>> On Tue, Oct 22, 2019 at 10:33 AM Udi Meiri  wrote:
>>
>>> Seems to be already installed:
>>> https://issues.apache.org/jira/browse/INFRA-16944
>>> Do we just need to enable it somehow?
>>> This might work:
>>> https://jenkinsci.github.io/job-dsl-plugin/#method/javaposse.jobdsl.dsl.helpers.wrapper.WrapperContext.colorizeOutput
>>>
>>> BTW, our Jenkins is maintained by ASF's Infrastructure team:
>>> https://cwiki.apache.org/confluence/display/INFRA/Jenkins
>>>
>>> On Tue, Oct 22, 2019 at 10:23 AM Chad Dombrova 
>>> wrote:
>>>
 Hi all,
 As a user trying to grok failures in jenkins I think it would be a huge
 help to have color output support.  This is something that works out of the
 box for CI tools like gitlab and travis, and it really helps bring that
 21st century feel to your logs :)

 There's a Jenkins plugin for colorizing ansi escape sequences here:
 https://plugins.jenkins.io/ansicolor

 I think this is something that has to be deployed by a Jenkins admin.

 -chad




smime.p7s
Description: S/MIME Cryptographic Signature


Re: Install Jenkins AnsiColor plugin

2019-10-22 Thread Chad Dombrova
Thanks, I'll look into this.  I have a PR I'm building up with a handful of
minor changes related to this.



On Tue, Oct 22, 2019 at 10:45 AM Yifan Zou  wrote:

> Thanks, Udi! The ansicolor plugin was applied to ASF Jenkins universally.
> You might need to explicitly enable the coloroutput in your jenkins dsl.
>
> On Tue, Oct 22, 2019 at 10:33 AM Udi Meiri  wrote:
>
>> Seems to be already installed:
>> https://issues.apache.org/jira/browse/INFRA-16944
>> Do we just need to enable it somehow?
>> This might work:
>> https://jenkinsci.github.io/job-dsl-plugin/#method/javaposse.jobdsl.dsl.helpers.wrapper.WrapperContext.colorizeOutput
>>
>> BTW, our Jenkins is maintained by ASF's Infrastructure team:
>> https://cwiki.apache.org/confluence/display/INFRA/Jenkins
>>
>> On Tue, Oct 22, 2019 at 10:23 AM Chad Dombrova  wrote:
>>
>>> Hi all,
>>> As a user trying to grok failures in jenkins I think it would be a huge
>>> help to have color output support.  This is something that works out of the
>>> box for CI tools like gitlab and travis, and it really helps bring that
>>> 21st century feel to your logs :)
>>>
>>> There's a Jenkins plugin for colorizing ansi escape sequences here:
>>> https://plugins.jenkins.io/ansicolor
>>>
>>> I think this is something that has to be deployed by a Jenkins admin.
>>>
>>> -chad
>>>
>>>


Re: Install Jenkins AnsiColor plugin

2019-10-22 Thread Udi Meiri
Seems to be already installed:
https://issues.apache.org/jira/browse/INFRA-16944
Do we just need to enable it somehow?
This might work:
https://jenkinsci.github.io/job-dsl-plugin/#method/javaposse.jobdsl.dsl.helpers.wrapper.WrapperContext.colorizeOutput

BTW, our Jenkins is maintained by ASF's Infrastructure team:
https://cwiki.apache.org/confluence/display/INFRA/Jenkins

On Tue, Oct 22, 2019 at 10:23 AM Chad Dombrova  wrote:

> Hi all,
> As a user trying to grok failures in jenkins I think it would be a huge
> help to have color output support.  This is something that works out of the
> box for CI tools like gitlab and travis, and it really helps bring that
> 21st century feel to your logs :)
>
> There's a Jenkins plugin for colorizing ansi escape sequences here:
> https://plugins.jenkins.io/ansicolor
>
> I think this is something that has to be deployed by a Jenkins admin.
>
> -chad
>
>


smime.p7s
Description: S/MIME Cryptographic Signature


Install Jenkins AnsiColor plugin

2019-10-22 Thread Chad Dombrova
Hi all,
As a user trying to grok failures in jenkins I think it would be a huge
help to have color output support.  This is something that works out of the
box for CI tools like gitlab and travis, and it really helps bring that
21st century feel to your logs :)

There's a Jenkins plugin for colorizing ansi escape sequences here:
https://plugins.jenkins.io/ansicolor

I think this is something that has to be deployed by a Jenkins admin.

-chad


Re: Java PortableRunner package name

2019-10-22 Thread Maximilian Michels
+1 for moving. This is just a left-over from the fist "reference" runner 
implementation for portability.


On 22.10.19 16:59, Łukasz Gajowy wrote:
+1 for moving/renaming. I agree with Kyle and Michał - there indeed 
seems to be some confusion. The name "runners/reference" suggests that 
it's a not production-ready "Runner" (it seems to be neither of those). 
If possible, maybe sdks/java/portablility is a good place for this?


Łukasz

wt., 22 paź 2019 o 16:41 Kyle Weaver > napisał(a):


I agree this should be moved. PortableRunner.java is analogous to
portable_runner.py, which resides under
sdks/python/apache_beam/runners/portability. Maybe
PortableRunner.java should be moved to somewhere under sdks/java, as
it's not actually a runner itself. The nomenclature is
confusing, PortableRunner could be more aptly named something like
`PortableRunnerClient`, or `JobClient` to better illustrate its
relationship with `JobServer`.

On Tue, Oct 22, 2019 at 4:11 PM Michał Walenia
mailto:michal.wale...@polidea.com>> wrote:

Hi,

I found the Java PortableRunner class in
org.apache.beam.runners.reference package, where ReferenceRunner
used to reside prior to its deletion. The PortableRunner
implementation however is one that can be used with real
JobServers in production code.

*
*

It seems that this class shouldn’t be in the reference package
but somewhere else. I’d like to rename the package from
org.apache.beam.runners.reference to
org.apache.beam.runners.portability, as it contains only classes
related to the portable runner operation.

*
*

What do you think? If nobody is strongly against the change,
I’ll make a pull request with the refactor.

*
*

Have a good day,

Michal




-- 


Michał Walenia
Polidea  | Software Engineer

M: +48 791 432 002 
E: michal.wale...@polidea.com 

Unique Tech
Check out our projects! 



Re: Proposal: Dynamic timer support (BEAM-6857)

2019-10-22 Thread Reuven Lax
Kenn:
+1 to using TimerFamily instead of TimerId and TimerMap.

Jan:
This is definitely not just for DSLs. I've definitely seen cases where the
user wants different timers based on input data, so they cannot be defined
statically. As a thought experiment: one stated goal of state + timers was
to provide the low-level tools we use to implement windowing. However to
implement windowing you need a dynamic set of timers, not just a single
one. Now most users don't need to reimplement windowing (though we have had
some users who had that need, when they wanted something slightly different
than what native Beam windowing provided), however the need for dynamic
timers is not unheard of.

+1 to allowing dynamic state. However I think this is separate enough from
timers that it doesn't need to be coupled in this discussion. Dynamic state
also raises the wrinkle of pipeline compatibility (as you mentioned),
which I think is a bit less of an issue for dynamic timers.

Allowing a DSL to specify a DoFnSignature does not quite solve this
problem. The DSL still needs a way to set and process the timers. It also
does not solve the problem where the timers are based on input data
elements, so cannot be known at pipeline construction time. However what
might be more important is statically defining the timer families, and a
DSL could do this by specifying a DoFnSignature (and something similar
could be done with state). Also as mentioned above, this is useful to
normal Beam users as well, and we shouldn't force normal users to start
dealing with DoFnSignatures and DoFnInvokers.






On Tue, Oct 22, 2019 at 7:56 AM Jan Lukavský  wrote:

> Hi Max,
>
> wouldn't that be actually the same as
>
> class MyDoFn extends DoFn {
>
>
>@ProcessElement
>public void process(
>ProcessContext context) {
>  // "get" would register a new TimerSpec
>  Timer timer1 = context.getTimer("timer1");
>  Timer timer2 = context.getTimer("timer2");
>  timers.set(...);
>  timers.set(...);
>}
>
> That is - no need to declare anything? One more concern about that - if
> we allow registration of timers (or even state) dynamically like that it
> might be harder to perform validation of pipeline upon upgrades.
>
> Jan
>
> On 10/22/19 4:47 PM, Maximilian Michels wrote:
> > The idea makes sense to me. I really like that Beam gives upfront
> > specs for timer and state, but it is not flexible enough for
> > timer-based libraries or for users which want to dynamically generate
> > timers.
> >
> > I'm not sure about the proposed API yet. Shouldn't we separate the
> > timer specs from setting actual timers?
> >
> > Suggestion:
> >
> > class MyDoFn extends DoFn {
> >   @TimerMap TimerMap timers = TimerSpecs.timerMap();
> >
> >   @ProcessElement
> >   public void process(
> >   @Element String e,
> >   @TimerMap TimerMap timers)) {
> > // "get" would register a new TimerSpec
> > Timer timer1 = timers.get("timer1");
> > Timer timer2 = timers.get("timer2");
> > timers.set(...);
> > timers.set(...);
> >   }
> >
> >   // No args for "@OnTimer" => use generic TimerMap
> >   @OnTimer
> >   public void onTimer(
> >   @TimerId String timerFired,
> >   @Timestamp Instant timerTs,
> >   @TimerMap TimerMap timers) {
> >  // Timer firing
> >  ...
> >  // Set this timer (or another)
> >  Timer timer = timers.get(timerFired);
> >  timer.set(...);
> >   }
> > }
> >
> > What do you think?
> >
> > -Max
> >
> > On 22.10.19 10:35, Jan Lukavský wrote:
> >> Hi Kenn,
> >>
> >> On 10/22/19 2:48 AM, Kenneth Knowles wrote:
> >>> This seems extremely useful.
> >>>
> >>> I assume you mean `@OnTimer("timers")` in your example. I would
> >>> suggest that the parameter annotation be something other
> >>> than @TimerId since that annotation is already used for a very
> >>> similar but different purpose; they are close enough that it is
> >>> tempting to pun them, but it is clearer to keep them distinct IMO.
> >>> Perhaps @TimerName or @TimerKey or some such. Alternatively,
> >>> keep @TimerId in the parameter list and change the declaration
> >>> to @TimerFamily("timers"). I think "family" or "group" may be more
> >>> clear naming than "map".
> >>>
> >>> At the portability level, this API does seem to be pretty close to a
> >>> noop in terms of the messages that needs to be sent over the Fn API,
> >>> so it makes sense to loosen the protos. By the time the Fn API is in
> >>> play, all of our desires to catch errors prior to execution are
> >>> irrelevant anyhow.
> >>>
> >>> On the other hand, I think DSLs have a different & bigger problem
> >>> than this, in that they want to programmatically adjust all the
> >>> capabilities of a DoFn. Same goes for wrapping one DoFn in
> >>> another. Certainly some limited DSL use cases are addressed by this,
> >>> but I wouldn't take that as a primary use case for this feature.
> >>> Ultimately they are probably better served by being able to
> >>> explicitly 

Re: [DISCUSS] How to stopp SdkWorker in SdkHarness

2019-10-22 Thread Harsh Vardhan
Would approach 1 be akin to abort semantics?

On Mon, Oct 21, 2019 at 8:01 PM jincheng sun 
wrote:

> Hi Luke,
>
> Thanks a lot for your reply. Since it allows to share one SDK harness
> between multiple executable stages, the control service termination may
> occur much later than the completion of an executable stage. This is the
> main reason I prefer runners to control the teardown of DoFns.
>
> Regarding to "SDK harnesses can terminate instances any time they want and
> start new instances anytime as well.", personally I think it's not conflict
> with the proposed Approach 1 as the SDK harness could decide what to do
> when receiving the teardown request. It could do nothing if the DoFns has
> already been teared down and could also tear down the DoFns if needed.
>
> What do you think?
>
> Best,
> Jincheng
>
> Luke Cwik  于2019年10月22日周二 上午2:05写道:
>
>> Approach 2 is currently the suggested approach[1] for DoFn's to shutdown.
>> Note that SDK harnesses can terminate instances any time they want and
>> start new instances anytime as well.
>>
>> Why do you want to expose this logic so that Runners could control it?
>>
>> 1:
>> https://docs.google.com/document/d/1n6s3BOxOPct3uF4UgbbI9O9rpdiKWFH9R6mtVmR7xp0/edit#
>>
>> On Mon, Oct 21, 2019 at 4:27 AM jincheng sun 
>> wrote:
>>
>>> Hi,
>>> I found that in `SdkHarness` do not  stop the `SdkWorker` when finish.
>>> We should add the logic for stop the `SdkWorker` in `SdkHarness`.  More
>>> detail can be found [1].
>>>
>>> There are two approaches to solve this issue:
>>>
>>> Approach 1:  We can add a Fn API for teardown purpose and the runner
>>> will teardown a specific bundle descriptor via this teardown Fn API during
>>> disposing.
>>> Approach 2: The control service termination could be seen as a signal
>>> and once SDK harness receives this signal, the teardown of the bundle
>>> descriptor will be performed.
>>>
>>> More detail can be found in [2].
>>>
>>> As the Approach 2, SDK harness could be shared between multiple
>>> executable stages. The control service termination only occurs when all the
>>> executable stages sharing the same SDK harness finished. This means that
>>> the teardown of DoFns may not be executed immediately after an executable
>>> stage is finished.
>>>
>>> So, I prefer Approach 1. Welcome any feedback :)
>>>
>>> Best,
>>> Jincheng
>>>
>>> [1]
>>> https://github.com/apache/beam/blob/master/sdks/python/apache_beam/runners/worker/sdk_worker.py
>>> [2]
>>> https://docs.google.com/document/d/1sCgy9VQPf9zVXKRquK8P6N4x7aB62GEO8ozkujRSHZg/edit?usp=sharing
>>>
>> --

Got feedback? go/harsh-feedback 


Re: Java PortableRunner package name

2019-10-22 Thread Łukasz Gajowy
+1 for moving/renaming. I agree with Kyle and Michał - there indeed seems
to be some confusion. The name "runners/reference" suggests that it's a not
production-ready "Runner" (it seems to be neither of those). If possible,
maybe sdks/java/portablility is a good place for this?

Łukasz

wt., 22 paź 2019 o 16:41 Kyle Weaver  napisał(a):

> I agree this should be moved. PortableRunner.java is analogous to
> portable_runner.py, which resides under
> sdks/python/apache_beam/runners/portability. Maybe PortableRunner.java
> should be moved to somewhere under sdks/java, as it's not actually a runner
> itself. The nomenclature is confusing, PortableRunner could be more aptly
> named something like `PortableRunnerClient`, or `JobClient` to better
> illustrate its relationship with `JobServer`.
>
> On Tue, Oct 22, 2019 at 4:11 PM Michał Walenia 
> wrote:
>
>> Hi,
>>
>> I found the Java PortableRunner class in
>> org.apache.beam.runners.reference package, where ReferenceRunner used to
>> reside prior to its deletion. The PortableRunner implementation however is
>> one that can be used with real JobServers in production code.
>>
>>
>> It seems that this class shouldn’t be in the reference package but
>> somewhere else. I’d like to rename the package from
>> org.apache.beam.runners.reference to org.apache.beam.runners.portability,
>> as it contains only classes related to the portable runner operation.
>>
>>
>> What do you think? If nobody is strongly against the change, I’ll make a
>> pull request with the refactor.
>>
>>
>> Have a good day,
>>
>> Michal
>>
>>
>>
>>
>> --
>>
>> Michał Walenia
>> Polidea  | Software Engineer
>>
>> M: +48 791 432 002 <+48791432002>
>> E: michal.wale...@polidea.com
>>
>> Unique Tech
>> Check out our projects! 
>>
>


Re: Proposal: Dynamic timer support (BEAM-6857)

2019-10-22 Thread Jan Lukavský

Hi Max,

wouldn't that be actually the same as

class MyDoFn extends DoFn {


  @ProcessElement
  public void process(
  ProcessContext context) {
    // "get" would register a new TimerSpec
    Timer timer1 = context.getTimer("timer1");
    Timer timer2 = context.getTimer("timer2");
    timers.set(...);
    timers.set(...);
  }

That is - no need to declare anything? One more concern about that - if 
we allow registration of timers (or even state) dynamically like that it 
might be harder to perform validation of pipeline upon upgrades.


Jan

On 10/22/19 4:47 PM, Maximilian Michels wrote:
The idea makes sense to me. I really like that Beam gives upfront 
specs for timer and state, but it is not flexible enough for 
timer-based libraries or for users which want to dynamically generate 
timers.


I'm not sure about the proposed API yet. Shouldn't we separate the 
timer specs from setting actual timers?


Suggestion:

class MyDoFn extends DoFn {
  @TimerMap TimerMap timers = TimerSpecs.timerMap();

  @ProcessElement
  public void process(
  @Element String e,
  @TimerMap TimerMap timers)) {
    // "get" would register a new TimerSpec
    Timer timer1 = timers.get("timer1");
    Timer timer2 = timers.get("timer2");
    timers.set(...);
    timers.set(...);
  }

  // No args for "@OnTimer" => use generic TimerMap
  @OnTimer
  public void onTimer(
  @TimerId String timerFired,
  @Timestamp Instant timerTs,
  @TimerMap TimerMap timers) {
 // Timer firing
 ...
 // Set this timer (or another)
 Timer timer = timers.get(timerFired);
 timer.set(...);
  }
}

What do you think?

-Max

On 22.10.19 10:35, Jan Lukavský wrote:

Hi Kenn,

On 10/22/19 2:48 AM, Kenneth Knowles wrote:

This seems extremely useful.

I assume you mean `@OnTimer("timers")` in your example. I would 
suggest that the parameter annotation be something other 
than @TimerId since that annotation is already used for a very 
similar but different purpose; they are close enough that it is 
tempting to pun them, but it is clearer to keep them distinct IMO. 
Perhaps @TimerName or @TimerKey or some such. Alternatively, 
keep @TimerId in the parameter list and change the declaration 
to @TimerFamily("timers"). I think "family" or "group" may be more 
clear naming than "map".


At the portability level, this API does seem to be pretty close to a 
noop in terms of the messages that needs to be sent over the Fn API, 
so it makes sense to loosen the protos. By the time the Fn API is in 
play, all of our desires to catch errors prior to execution are 
irrelevant anyhow.


On the other hand, I think DSLs have a different & bigger problem 
than this, in that they want to programmatically adjust all the 
capabilities of a DoFn. Same goes for wrapping one DoFn in 
another. Certainly some limited DSL use cases are addressed by this, 
but I wouldn't take that as a primary use case for this feature. 
Ultimately they are probably better served by being able to 
explicitly author a DoFnInvoker and provide it to a variant of 
beam:transforms:ParDo where the do_fn field is a serialized 
DoFnInvoker. Now that I think about this, I cannot recall why we 
don't already ship a DoFnSignature & DoFnInvoker as the payload. 
That would allow maximum flexibility in utilizing the portability 
framework.


yes, exactly, but when DSLs are in question, we have to make sure 
that DSLs are not bound to portability - we have to be able to 
translate even in case of "legacy" runners as well. That might 
complicate things a bit maybe.


Jan



Kenn

On Mon, Oct 21, 2019 at 3:23 PM Reuven Lax > wrote:


    BEAM-6857 documents the need for dynamic timer support in the Beam
    API. I wanted to make a proposal for what this API would look
    like, and how to express it in the portability protos.

    Background: Today Beam (especially BeamJava) requires a ParDo to
    statically declare all timers it accesses at compile time. For
    example:

    class MyDoFn extends DoFn {
      @TimerId("timer1") TimerSpec timer1 =
    TimerSpecs.timer(TimeDomain(EVENT_TIME));
      @TimerId("timer2") TimerSpec timer2 =
    TimerSpecs.timer(TimeDomain(PROCESSING_TIME));

      @ProcessElement
      public void process(@Element String e, @TimerId("timer1") Timer
    timer1, @TimerId("timer2") Timer timer2)) {
        timer1.set(...);
        timer2.set(...);
      }

      @OnTimer("timer1") public void onTimer1() { ... }
      @OnTimer("timer2") public void onTimer2() { ... }
    }

    This requires the author of a ParDo to know the full list of
    timers ahead of time, which has been problematic in many cases.
    One example where it causes issues is for DSLs such as Euphoria or
    Scio. DSL authors usually write ParDos to interpret the code
    written in the high-level DSL, and so don't know ahead of time the
    list of timers needed; alternatives today are quite ugly: physical
    code generation or creating a single timer that 

Re: Proposal: Dynamic timer support (BEAM-6857)

2019-10-22 Thread Maximilian Michels
The idea makes sense to me. I really like that Beam gives upfront specs 
for timer and state, but it is not flexible enough for timer-based 
libraries or for users which want to dynamically generate timers.


I'm not sure about the proposed API yet. Shouldn't we separate the timer 
specs from setting actual timers?


Suggestion:

class MyDoFn extends DoFn {
  @TimerMap TimerMap timers = TimerSpecs.timerMap();

  @ProcessElement
  public void process(
  @Element String e,
  @TimerMap TimerMap timers)) {
// "get" would register a new TimerSpec
Timer timer1 = timers.get("timer1");
Timer timer2 = timers.get("timer2");
timers.set(...);
timers.set(...);
  }

  // No args for "@OnTimer" => use generic TimerMap
  @OnTimer
  public void onTimer(
  @TimerId String timerFired,
  @Timestamp Instant timerTs,
  @TimerMap TimerMap timers) {
 // Timer firing
 ...
 // Set this timer (or another)
 Timer timer = timers.get(timerFired);
 timer.set(...);
  }
}

What do you think?

-Max

On 22.10.19 10:35, Jan Lukavský wrote:

Hi Kenn,

On 10/22/19 2:48 AM, Kenneth Knowles wrote:

This seems extremely useful.

I assume you mean `@OnTimer("timers")` in your example. I would 
suggest that the parameter annotation be something other than @TimerId 
since that annotation is already used for a very similar but different 
purpose; they are close enough that it is tempting to pun them, but it 
is clearer to keep them distinct IMO. Perhaps @TimerName or @TimerKey 
or some such. Alternatively, keep @TimerId in the parameter list and 
change the declaration to @TimerFamily("timers"). I think "family" or 
"group" may be more clear naming than "map".


At the portability level, this API does seem to be pretty close to a 
noop in terms of the messages that needs to be sent over the Fn API, 
so it makes sense to loosen the protos. By the time the Fn API is in 
play, all of our desires to catch errors prior to execution are 
irrelevant anyhow.


On the other hand, I think DSLs have a different & bigger problem than 
this, in that they want to programmatically adjust all the 
capabilities of a DoFn. Same goes for wrapping one DoFn in 
another. Certainly some limited DSL use cases are addressed by this, 
but I wouldn't take that as a primary use case for this feature. 
Ultimately they are probably better served by being able to explicitly 
author a DoFnInvoker and provide it to a variant of 
beam:transforms:ParDo where the do_fn field is a serialized 
DoFnInvoker. Now that I think about this, I cannot recall why we don't 
already ship a DoFnSignature & DoFnInvoker as the payload. That would 
allow maximum flexibility in utilizing the portability framework.


yes, exactly, but when DSLs are in question, we have to make sure that 
DSLs are not bound to portability - we have to be able to translate even 
in case of "legacy" runners as well. That might complicate things a bit 
maybe.


Jan



Kenn

On Mon, Oct 21, 2019 at 3:23 PM Reuven Lax > wrote:


BEAM-6857 documents the need for dynamic timer support in the Beam
API. I wanted to make a proposal for what this API would look
like, and how to express it in the portability protos.

Background: Today Beam (especially BeamJava) requires a ParDo to
statically declare all timers it accesses at compile time. For
example:

class MyDoFn extends DoFn {
  @TimerId("timer1") TimerSpec timer1 =
TimerSpecs.timer(TimeDomain(EVENT_TIME));
  @TimerId("timer2") TimerSpec timer2 =
TimerSpecs.timer(TimeDomain(PROCESSING_TIME));

  @ProcessElement
  public void process(@Element String e, @TimerId("timer1") Timer
timer1, @TimerId("timer2") Timer timer2)) {
    timer1.set(...);
    timer2.set(...);
  }

  @OnTimer("timer1") public void onTimer1() { ... }
  @OnTimer("timer2") public void onTimer2() { ... }
}

This requires the author of a ParDo to know the full list of
timers ahead of time, which has been problematic in many cases.
One example where it causes issues is for DSLs such as Euphoria or
Scio. DSL authors usually write ParDos to interpret the code
written in the high-level DSL, and so don't know ahead of time the
list of timers needed; alternatives today are quite ugly: physical
code generation or creating a single timer that multiplexes all of
the users logical timers. There are also cases where a ParDo needs
multiple distinct timers, but the set of distinct timers is
controlled by the input data, and therefore not knowable in
advance. The Beam timer API has been insufficient for these use cases.

I propose a new TimerMap construct, which allow a ParDo to
dynamically set named timers. It's use in the Java API would look
as follows:

class MyDoFn extends DoFn {
  @TimerId("timers") TimerSpec timers =
TimerSpecs.timerMap(TimeDomain(EVENT_TIME));

  @ProcessElement
  public 

Re: Java PortableRunner package name

2019-10-22 Thread Kyle Weaver
I agree this should be moved. PortableRunner.java is analogous to
portable_runner.py, which resides under
sdks/python/apache_beam/runners/portability. Maybe PortableRunner.java
should be moved to somewhere under sdks/java, as it's not actually a runner
itself. The nomenclature is confusing, PortableRunner could be more aptly
named something like `PortableRunnerClient`, or `JobClient` to better
illustrate its relationship with `JobServer`.

On Tue, Oct 22, 2019 at 4:11 PM Michał Walenia 
wrote:

> Hi,
>
> I found the Java PortableRunner class in org.apache.beam.runners.reference
> package, where ReferenceRunner used to reside prior to its deletion. The
> PortableRunner implementation however is one that can be used with real
> JobServers in production code.
>
>
> It seems that this class shouldn’t be in the reference package but
> somewhere else. I’d like to rename the package from
> org.apache.beam.runners.reference to org.apache.beam.runners.portability,
> as it contains only classes related to the portable runner operation.
>
>
> What do you think? If nobody is strongly against the change, I’ll make a
> pull request with the refactor.
>
>
> Have a good day,
>
> Michal
>
>
>
>
> --
>
> Michał Walenia
> Polidea  | Software Engineer
>
> M: +48 791 432 002 <+48791432002>
> E: michal.wale...@polidea.com
>
> Unique Tech
> Check out our projects! 
>


Java PortableRunner package name

2019-10-22 Thread Michał Walenia
Hi,

I found the Java PortableRunner class in org.apache.beam.runners.reference
package, where ReferenceRunner used to reside prior to its deletion. The
PortableRunner implementation however is one that can be used with real
JobServers in production code.


It seems that this class shouldn’t be in the reference package but
somewhere else. I’d like to rename the package from
org.apache.beam.runners.reference to org.apache.beam.runners.portability,
as it contains only classes related to the portable runner operation.


What do you think? If nobody is strongly against the change, I’ll make a
pull request with the refactor.


Have a good day,

Michal




-- 

Michał Walenia
Polidea  | Software Engineer

M: +48 791 432 002 <+48791432002>
E: michal.wale...@polidea.com

Unique Tech
Check out our projects! 


Re: Question related to running unit tests in IDE

2019-10-22 Thread Alexey Romanenko
Hi, 

Thank you for your interest to contribute!

Did you properly imported a project (as explained on page [1]) and all deps 
were resolved successfully? 

[1] 
https://cwiki.apache.org/confluence/display/BEAM/Set+up+IntelliJ+from+scratch 


> On 22 Oct 2019, at 02:28, Saikat Maitra  wrote:
> 
> Hi,
> 
> I am interested to contribute to this issue 
> 
> https://issues.apache.org/jira/browse/BEAM-3658 
> 
> 
> I have followed the contribution guide and was able to build the project 
> locally using gradlew commands. 
> 
> I wanted to debug and trace the issue further by running the tests locally 
> using Intellij Idea but I am getting following errors. I looked up the docs 
> related to running tests 
> (https://cwiki.apache.org/confluence/display/BEAM/Run+a+single+unit+test 
> ) 
> and common IDE errors 
> (https://cwiki.apache.org/confluence/display/BEAM/%28FAQ%29+Recovering+from+common+IDE+errors
>  
> )
>  but have not found similar errors.
> 
> Error:(632, 17) java: cannot find symbol
>   symbol:   method 
> apply(org.apache.beam.sdk.transforms.Values)
>   location: interface org.apache.beam.sdk.values.POutput
> 
> Error:(169, 26) java: cannot find symbol
>   symbol:   class PCollection
>   location: class org.apache.beam.sdk.transforms.Watch
> 
> Error:(169, 59) java: cannot find symbol
>   symbol:   class KV
>   location: class org.apache.beam.sdk.transforms.Watch
> 
> Please let me know if you have feedback.
> 
> Regards,
> Saikat



Re: Hi

2019-10-22 Thread Aizhamal Nurmamat kyzy
Welcome Saikat!

On Sun, Oct 20, 2019, 5:55 AM Saikat Maitra  wrote:

> Hi Kenneth,
>
> Thank you, much appreciate your help.
>
> Warm regards,
> Saikat
>
> On Sat, Oct 19, 2019 at 10:40 PM Kenneth Knowles  wrote:
>
>> Welcome! I've added you to the Contributors role.
>>
>> Kenn
>>
>> On Sat, Oct 19, 2019 at 5:40 PM Saikat Maitra 
>> wrote:
>>
>>> Hi,
>>>
>>> I am Saikat and I am committer in Apache Ignite project.
>>>
>>> I am interested in joining the Apache Beam community and contribute to
>>> following Apache Ignite integration.
>>>
>>> https://issues.apache.org/jira/browse/IGNITE-7198
>>>
>>> I also wanted to get familiar with Apache Beam project and wanted to
>>> take up few open issues like below.
>>>
>>> https://issues.apache.org/jira/browse/BEAM-3658
>>>
>>> Can someone please add me in Apache Beam contributor list?
>>>
>>> Regards,
>>>
>>> Saikat
>>>
>>


Re: Question related to running unit tests in IDE

2019-10-22 Thread Michał Walenia
Hi,
can you give some more context on which tests are you running, what
system/version are you using and which IntelliJ version gives you errors?
It's hard to tell anything from just the errors.

Regards,
Michal

On Tue, Oct 22, 2019 at 2:29 AM Saikat Maitra 
wrote:

> Hi,
>
> I am interested to contribute to this issue
>
> https://issues.apache.org/jira/browse/BEAM-3658
>
> I have followed the contribution guide and was able to build the project
> locally using gradlew commands.
>
> I wanted to debug and trace the issue further by running the tests locally
> using Intellij Idea but I am getting following errors. I looked up the docs
> related to running tests (
> https://cwiki.apache.org/confluence/display/BEAM/Run+a+single+unit+test)
> and common IDE errors (
> https://cwiki.apache.org/confluence/display/BEAM/%28FAQ%29+Recovering+from+common+IDE+errors)
> but have not found similar errors.
>
> Error:(632, 17) java: cannot find symbol
>   symbol:   method
> apply(org.apache.beam.sdk.transforms.Values)
>   location: interface org.apache.beam.sdk.values.POutput
>
> Error:(169, 26) java: cannot find symbol
>   symbol:   class PCollection
>   location: class org.apache.beam.sdk.transforms.Watch
>
> Error:(169, 59) java: cannot find symbol
>   symbol:   class KV
>   location: class org.apache.beam.sdk.transforms.Watch
>
> Please let me know if you have feedback.
>
> Regards,
> Saikat
>


-- 

Michał Walenia
Polidea  | Software Engineer

M: +48 791 432 002 <+48791432002>
E: michal.wale...@polidea.com

Unique Tech
Check out our projects! 


Re: Proposal: Dynamic timer support (BEAM-6857)

2019-10-22 Thread Jan Lukavský

Hi Kenn,

On 10/22/19 2:48 AM, Kenneth Knowles wrote:

This seems extremely useful.

I assume you mean `@OnTimer("timers")` in your example. I would 
suggest that the parameter annotation be something other than @TimerId 
since that annotation is already used for a very similar but different 
purpose; they are close enough that it is tempting to pun them, but it 
is clearer to keep them distinct IMO. Perhaps @TimerName or @TimerKey 
or some such. Alternatively, keep @TimerId in the parameter list and 
change the declaration to @TimerFamily("timers"). I think "family" or 
"group" may be more clear naming than "map".


At the portability level, this API does seem to be pretty close to a 
noop in terms of the messages that needs to be sent over the Fn API, 
so it makes sense to loosen the protos. By the time the Fn API is in 
play, all of our desires to catch errors prior to execution are 
irrelevant anyhow.


On the other hand, I think DSLs have a different & bigger problem than 
this, in that they want to programmatically adjust all the 
capabilities of a DoFn. Same goes for wrapping one DoFn in 
another. Certainly some limited DSL use cases are addressed by this, 
but I wouldn't take that as a primary use case for this feature. 
Ultimately they are probably better served by being able to explicitly 
author a DoFnInvoker and provide it to a variant of 
beam:transforms:ParDo where the do_fn field is a serialized 
DoFnInvoker. Now that I think about this, I cannot recall why we don't 
already ship a DoFnSignature & DoFnInvoker as the payload. That would 
allow maximum flexibility in utilizing the portability framework.


yes, exactly, but when DSLs are in question, we have to make sure that 
DSLs are not bound to portability - we have to be able to translate even 
in case of "legacy" runners as well. That might complicate things a bit 
maybe.


Jan



Kenn

On Mon, Oct 21, 2019 at 3:23 PM Reuven Lax > wrote:


BEAM-6857 documents the need for dynamic timer support in the Beam
API. I wanted to make a proposal for what this API would look
like, and how to express it in the portability protos.

Background: Today Beam (especially BeamJava) requires a ParDo to
statically declare all timers it accesses at compile time. For
example:

class MyDoFn extends DoFn {
  @TimerId("timer1") TimerSpec timer1 =
TimerSpecs.timer(TimeDomain(EVENT_TIME));
  @TimerId("timer2") TimerSpec timer2 =
TimerSpecs.timer(TimeDomain(PROCESSING_TIME));

  @ProcessElement
  public void process(@Element String e, @TimerId("timer1") Timer
timer1, @TimerId("timer2") Timer timer2)) {
    timer1.set(...);
    timer2.set(...);
  }

  @OnTimer("timer1") public void onTimer1() { ... }
  @OnTimer("timer2") public void onTimer2() { ... }
}

This requires the author of a ParDo to know the full list of
timers ahead of time, which has been problematic in many cases.
One example where it causes issues is for DSLs such as Euphoria or
Scio. DSL authors usually write ParDos to interpret the code
written in the high-level DSL, and so don't know ahead of time the
list of timers needed; alternatives today are quite ugly: physical
code generation or creating a single timer that multiplexes all of
the users logical timers. There are also cases where a ParDo needs
multiple distinct timers, but the set of distinct timers is
controlled by the input data, and therefore not knowable in
advance. The Beam timer API has been insufficient for these use cases.

I propose a new TimerMap construct, which allow a ParDo to
dynamically set named timers. It's use in the Java API would look
as follows:

class MyDoFn extends DoFn {
  @TimerId("timers") TimerSpec timers =
TimerSpecs.timerMap(TimeDomain(EVENT_TIME));

  @ProcessElement
  public void process(@Element String e, @TimerId("timers")
TimerMap timer)) {
    timers.set("timer1", ...);
    timers.set("timer2", ...);
  }

  @OnTimer("timer") public void onTimer(@TimerId String
timerFired, @Timestamp Instant timerTs) { ... }
}

There is a new TimerSpec type to specify a TimerMap. The TimerMap
class itself allows dynamically setting multiple timers based on a
String tag argument. Each TimerMap has a single callback which
when called is given the id of the timer that is currently firing.

It is allowed to have multiple TimerMap objects in a ParDo (and
required if you want to have both processing-time and event-time
timers in the same ParDo). Each TimerMap is its own logical
namespace. i.e. if the user sets timers with the same string tag
on different TimerMap objects the timers will not collide.

Currently the portability protos were written to mirror the Java
API, expecting one TimerSpec per timer accessed by the ParDo. I
suggest that we instead make TimerMap the default for 

Re: Proposal: Dynamic timer support (BEAM-6857)

2019-10-22 Thread Jan Lukavský

Hi Reuven,

first of all big +1 for this.

Next, couple of questions that arise. Do you target DSLs only, or do you 
suppose that this would be used by end-users as well? If only DSLs would 
be in concern, then I think:


 a) it is not only about timers, but state has to be managed in the 
same way (although the mentioned JIRA talks about timers only now, so we 
can start with that)


 b) maybe an alternative approach (also discussed several times in 
mailing lists) would be more flexible - expose the possibility to 
provide runners directly with DoFnSignature, which would enable DSLs to 
hook "closer" to runners


I don't currently have in mind how would this "lower level" API look 
like, but concerning Euphoria, this would be a preferred solution.


Although the TimerMap looks general enough and also flexible enough, the 
general solution must include state (at least as thought experiment), 
just to make sure that we don't enable dynamic setup of timers, but 
exposing the same functionality for state would again mean we have to 
expose the complete DoFnSignature (and therefore make the TimerMap a 
somewhat redundant feature).


Jan

On 10/22/19 12:23 AM, Reuven Lax wrote:
BEAM-6857 documents the need for dynamic timer support in the Beam 
API. I wanted to make a proposal for what this API would look like, 
and how to express it in the portability protos.


Background: Today Beam (especially BeamJava) requires a ParDo to 
statically declare all timers it accesses at compile time. For example:


class MyDoFn extends DoFn {
  @TimerId("timer1") TimerSpec timer1 = 
TimerSpecs.timer(TimeDomain(EVENT_TIME));
  @TimerId("timer2") TimerSpec timer2 = 
TimerSpecs.timer(TimeDomain(PROCESSING_TIME));


  @ProcessElement
  public void process(@Element String e, @TimerId("timer1") Timer 
timer1, @TimerId("timer2") Timer timer2)) {

    timer1.set(...);
    timer2.set(...);
  }

  @OnTimer("timer1") public void onTimer1() { ... }
  @OnTimer("timer2") public void onTimer2() { ... }
}

This requires the author of a ParDo to know the full list of timers 
ahead of time, which has been problematic in many cases. One example 
where it causes issues is for DSLs such as Euphoria or Scio. DSL 
authors usually write ParDos to interpret the code written in the 
high-level DSL, and so don't know ahead of time the list of timers 
needed; alternatives today are quite ugly: physical code generation or 
creating a single timer that multiplexes all of the users logical 
timers. There are also cases where a ParDo needs multiple distinct 
timers, but the set of distinct timers is controlled by the input 
data, and therefore not knowable in advance. The Beam timer API has 
been insufficient for these use cases.


I propose a new TimerMap construct, which allow a ParDo to dynamically 
set named timers. It's use in the Java API would look as follows:


class MyDoFn extends DoFn {
  @TimerId("timers") TimerSpec timers = 
TimerSpecs.timerMap(TimeDomain(EVENT_TIME));


  @ProcessElement
  public void process(@Element String e, @TimerId("timers") TimerMap 
timer)) {

    timers.set("timer1", ...);
    timers.set("timer2", ...);
  }

  @OnTimer("timer") public void onTimer(@TimerId String timerFired, 
@Timestamp Instant timerTs) { ... }

}

There is a new TimerSpec type to specify a TimerMap. The TimerMap 
class itself allows dynamically setting multiple timers based on a 
String tag argument. Each TimerMap has a single callback which when 
called is given the id of the timer that is currently firing.


It is allowed to have multiple TimerMap objects in a ParDo (and 
required if you want to have both processing-time and event-time 
timers in the same ParDo). Each TimerMap is its own logical namespace. 
i.e. if the user sets timers with the same string tag on different 
TimerMap objects the timers will not collide.


Currently the portability protos were written to mirror the Java API, 
expecting one TimerSpec per timer accessed by the ParDo. I suggest 
that we instead make TimerMap the default for portability, and model 
the current behavior on top of timer map. If this proves problematic 
for some runners, we could instead introduce a new TimerSpec proto to 
represent TimerMap.


Thoughts?

Reuven