Re: How to serialize/deserialize a Pipeline object?

2016-12-21 Thread Robert Bradshaw
On Wed, Dec 21, 2016 at 10:58 AM, Shen Li  wrote:
> Hi Kenn,
>
> Thanks a lot for the information.
>
> Sure, below are more details about the problem I encountered.
>
> I am developing a runner for IBM Streams, and am exploring possible ways to
> conduct integration tests. As Streams is not an open-source project, we
> cannot add the full set of libraries to Maven Central repo. Nor can we
> guarantee to provide a server (with Streams installed) as a long-term
> Jenkins slave. So, it seems more flexible to let the runner submit the
> graph to a Streams cloud service, and provide the account info through
> "-DrunnableOnServicePipelineOptions" (please correct me if it does not work
> in this way). The problem is that the runner cannot convert the Pipeline
> into a Streams graph format without a local Streams install. So, I am
> thinking about sending the serialized Pipeline to the Cloud service for
> execution. Maybe I should create some intermediate format between the
> Pipeline and Streams graph format. Or, is there any other way to carry out
> the integration test without a Streams install?

Choosing an intermediate representation that can be serialized and
sent to a cloud service (where it is then translated into the actual
implementation representation) is a fine solution. In fact that's what
Dataflow itself does.

Of course we'll want to move as close to (2) as possible once it exists.

> On Wed, Dec 21, 2016 at 12:08 PM, Kenneth Knowles 
> wrote:
>
>> Hi Shen,
>>
>> I want to tell you (1) how things work today and (2) how we want them to be
>> eventually.
>>
>> (1) So far, each runner translates the Pipeline to their own graph format
>> before serialization, so we have not yet encountered this issue.
>>
>> (2) We intend to make a standard mostly-readable JSON format for a
>> Pipeline. It is based the Avro schema sketched in the design doc at
>> https://s.apache.org/beam-runner-api and there is also a draft JSON schema
>> at https://github.com/apache/incubator-beam/pull/662.
>>
>> You may wish to follow https://issues.apache.org/jira/browse/BEAM-115,
>> though that is a very general ticket.
>>
>> Can you share any more details?
>>
>> Kenn
>>
>> On Wed, Dec 21, 2016 at 8:47 AM, Shen Li  wrote:
>>
>> > Hi,
>> >
>> > What are the recommended ways to serialize/deserialize a Pipeline
>> object? I
>> > need to submit a pipeline object to cloud for execution and fetch the
>> > result.
>> >
>> > Thanks,
>> >
>> > Shen
>> >
>>


Re: Beam Tuple

2016-12-13 Thread Robert Bradshaw
On Tue, Dec 13, 2016 at 9:02 AM, Jean-Baptiste Onofré  wrote:
> Hi Robert,
>
> Agree, however which one the user would use ? Create his own one ?

Whichever suits their needs best, which could include his or her own.

> Today, I think Beam is heavily flexible in term of data format (which is
> great), but the trade off is that the end-users have to write lot of
> boilerplate code (just to convert from one type to another).
>
> So, basically, the purpose of a Beam Tuple is to have something provided out
> of box: if the user wants to use another tuple, that's fine.
> Generally speaking, the discussion about data format extension is about to
> simplify the way for users to manipulate popular data formats.

If I understand correctly, the proposal is to pick (or write) a Tuple
API and bless it by shipping it with the SDK along with beam-specific
helper code. I'd be helpful to see concretely how large of a savings
this would be to a user, and whether that's worth the cost.

> On 12/13/2016 05:56 PM, Robert Bradshaw wrote:
>>
>> The Java language isn't very amenable to Tuple APIs as there are several
>> (mutually exclusive?) tradeoffs that must be made, each with their pros
>> and
>> cons. What advantage is there of Beam providing its own tuple API vs.
>> letting users pick whatever tuple library they want and using that with
>> Beam?
>>
>> (I suppose we're already using and encouraging AutoValue which covers a
>> lot
>> of tuple cases.)
>>
>> On Tue, Dec 13, 2016 at 8:20 AM, Aparup Banerjee (apbanerj) <
>> apban...@cisco.com> wrote:
>>
>>> We have created one. An untagged Tuple. Will be happy to contribute it to
>>> the community
>>>
>>> Aparup
>>>
>>>> On Dec 13, 2016, at 5:11 AM, Amit  wrote:
>>>>
>>>> I'll add that I know of Beam's PTuple, but my question is about much
>>>> simpler Tuples, untagged.
>>>>
>>>> On Tue, Dec 13, 2016 at 1:56 PM Jean-Baptiste Onofré 
>>>> wrote:
>>>>
>>>>> Hi Amit,
>>>>>
>>>>> as discussed together, I think a Tuple abstraction would be good in the
>>>>> SDK (more than in the data format extension).
>>>>>
>>>>> Regards
>>>>> JB
>>>>>
>>>>>> On 12/13/2016 11:06 AM, Amit Sela wrote:
>>>>>> Hi all,
>>>>>>
>>>>>> I was wondering why Beam doesn't have tuples as part of the SDK ?
>>>>>> To the best of my knowledge all currently supported (OSS) runners:
>>>
>>> Spark,
>>>>>>
>>>>>> Flink, Apex provide a Tuple abstraction and I was wondering if Beam
>>>>>
>>>>> should
>>>>>>
>>>>>> too ?
>>>>>>
>>>>>> Consider KV for example; it is a special ("*keyed*" by the first
>>>>>> field)
>>>>>> implementation Tuple2.
>>>>>> While KV's importance is far more than being a Tuple2, I'm wondering
>>>>>> if
>>>>>
>>>>> the
>>>>>>
>>>>>> SDK would benefit from a proper TupleX support ?
>>>>>>
>>>>>> Thanks,
>>>>>> Amit
>>>>>>
>>>>>
>>>>> --
>>>>> Jean-Baptiste Onofré
>>>>> jbono...@apache.org
>>>>> http://blog.nanthrax.net
>>>>> Talend - http://www.talend.com
>>>>>
>>>
>>
>
> --
> Jean-Baptiste Onofré
> jbono...@apache.org
> http://blog.nanthrax.net
> Talend - http://www.talend.com


Re: Beam Tuple

2016-12-13 Thread Robert Bradshaw
The Java language isn't very amenable to Tuple APIs as there are several
(mutually exclusive?) tradeoffs that must be made, each with their pros and
cons. What advantage is there of Beam providing its own tuple API vs.
letting users pick whatever tuple library they want and using that with
Beam?

(I suppose we're already using and encouraging AutoValue which covers a lot
of tuple cases.)

On Tue, Dec 13, 2016 at 8:20 AM, Aparup Banerjee (apbanerj) <
apban...@cisco.com> wrote:

> We have created one. An untagged Tuple. Will be happy to contribute it to
> the community
>
> Aparup
>
> > On Dec 13, 2016, at 5:11 AM, Amit  wrote:
> >
> > I'll add that I know of Beam's PTuple, but my question is about much
> > simpler Tuples, untagged.
> >
> > On Tue, Dec 13, 2016 at 1:56 PM Jean-Baptiste Onofré 
> > wrote:
> >
> >> Hi Amit,
> >>
> >> as discussed together, I think a Tuple abstraction would be good in the
> >> SDK (more than in the data format extension).
> >>
> >> Regards
> >> JB
> >>
> >>> On 12/13/2016 11:06 AM, Amit Sela wrote:
> >>> Hi all,
> >>>
> >>> I was wondering why Beam doesn't have tuples as part of the SDK ?
> >>> To the best of my knowledge all currently supported (OSS) runners:
> Spark,
> >>> Flink, Apex provide a Tuple abstraction and I was wondering if Beam
> >> should
> >>> too ?
> >>>
> >>> Consider KV for example; it is a special ("*keyed*" by the first field)
> >>> implementation Tuple2.
> >>> While KV's importance is far more than being a Tuple2, I'm wondering if
> >> the
> >>> SDK would benefit from a proper TupleX support ?
> >>>
> >>> Thanks,
> >>> Amit
> >>>
> >>
> >> --
> >> Jean-Baptiste Onofré
> >> jbono...@apache.org
> >> http://blog.nanthrax.net
> >> Talend - http://www.talend.com
> >>
>


Re: [DISCUSS] ExecIO

2016-12-08 Thread Robert Bradshaw
On Wed, Dec 7, 2016 at 1:32 AM, Jean-Baptiste Onofré  wrote:
> By the way, just to elaborate a bit why I provided as an IO:
>
> 1. From an user experience perspective, I think we have to provide
> convenient way to write pipeline. Any syntax simplifying this is valuable.
> I think it's easier to write:
>
> pipeline.apply(ExecIO.read().withCommand("foo"))
>
> than:
>
> pipeline.apply(Create.of("foo")).apply(ParDo.of(new ExecFn());

Slightly. Still, when I see

pipeline.apply(ExecIO.read().withCommand("foo"))

I am surprised to get a PCollection with a single element...

> 2. For me (maybe I'm wrong ;)), an IO is an extension dedicated for
> "connector": reading/writing from/to a data source. So, even without the IO
> "wrapping" (by wrapping, I mean the Read and Write), I think Exec extension
> should be in IO as it's a source/write of data.

To clarify, if you wrote a DoFn that, say, did lookups against a MySQL
database, you would consider this an IO? For me, IO denotes
input/output, i.e. the roots and leaves of a pipeline.

> Regards
> JB
>
> On 12/07/2016 08:37 AM, Robert Bradshaw wrote:
>>
>> I don't mean to derail the tricky environment questions, but I'm not
>> seeing why this is bundled as an IO rather than a plain DoFn (which
>> can be applied to a PCollection of one or more commands, yielding
>> their outputs). Especially for the case of a Read, which in this case
>> is not splittable (initially or dynamically) and always produces a
>> single element--feels much more like a Map to me.
>>
>> On Tue, Dec 6, 2016 at 3:26 PM, Eugene Kirpichov
>>  wrote:
>>>
>>> Ben - the issues of "things aren't hung, there is a shell command
>>> running",
>>> aren't they general to all DoFn's? i.e. I don't see why the runner would
>>> need to know that a shell command is running, but not that, say, a heavy
>>> monolithic computation is running. What's the benefit to the runner in
>>> knowing that the DoFn contains a shell command?
>>>
>>> By saying "making sure that all shell commands finish", I suppose you're
>>> referring to the possibility of leaks if the user initiates a shell
>>> command
>>> and forgets to wait for it? I think that should be solvable again without
>>> Beam intervention, by making a utility class for running shell commands
>>> which implements AutoCloseable, and document that you have to use it that
>>> way.
>>>
>>> Ken - I think the question here is: are we ok with a situation where the
>>> runner doesn't check or care whether the shell command can run, and the
>>> user accepts this risk and studies what commands will be available on the
>>> worker environment provided by the runner they use in production, before
>>> productionizing a pipeline with those commands.
>>>
>>> Upon some thought I think it's ok. Of course, this carries an obligation
>>> for runners to document their worker environment and its changes across
>>> versions. Though for many runners such documentation may be trivial:
>>> "whatever your YARN cluster has, the runner doesn't change it in any way"
>>> and it may be good enough for users. And for other runners, like
>>> Dataflow,
>>> such documentation may also be trivial: "no guarantees whatsoever, only
>>> what you stage in --filesToStage is available".
>>>
>>> I can also see Beam develop to a point where we'd want all runners to be
>>> able to run your DoFn in a user-specified Docker container, and manage
>>> those intelligently - but I think that's quite a while away and it
>>> doesn't
>>> have to block work on a utility for executing shell commands. Though it'd
>>> be nice if the utility was forward-compatible with that future world.
>>>
>>> On Tue, Dec 6, 2016 at 2:16 AM Jean-Baptiste Onofré 
>>> wrote:
>>>
>>>> Hi Eugene,
>>>>
>>>> thanks for the extended questions.
>>>>
>>>> I think we have two levels of expectations here:
>>>> - end-user responsibility
>>>> - worker/runner responsibility
>>>>
>>>> 1/ From a end-user perspective, the end-user has to know that using a
>>>> system command (via ExecIO) and more generally speaking anything which
>>>> relay on worker resources (for instance a local filesystem directory
>>>> available only on a worker) can fail if the ex

Re: [DISCUSS] ExecIO

2016-12-06 Thread Robert Bradshaw
I don't mean to derail the tricky environment questions, but I'm not
seeing why this is bundled as an IO rather than a plain DoFn (which
can be applied to a PCollection of one or more commands, yielding
their outputs). Especially for the case of a Read, which in this case
is not splittable (initially or dynamically) and always produces a
single element--feels much more like a Map to me.

On Tue, Dec 6, 2016 at 3:26 PM, Eugene Kirpichov
 wrote:
> Ben - the issues of "things aren't hung, there is a shell command running",
> aren't they general to all DoFn's? i.e. I don't see why the runner would
> need to know that a shell command is running, but not that, say, a heavy
> monolithic computation is running. What's the benefit to the runner in
> knowing that the DoFn contains a shell command?
>
> By saying "making sure that all shell commands finish", I suppose you're
> referring to the possibility of leaks if the user initiates a shell command
> and forgets to wait for it? I think that should be solvable again without
> Beam intervention, by making a utility class for running shell commands
> which implements AutoCloseable, and document that you have to use it that
> way.
>
> Ken - I think the question here is: are we ok with a situation where the
> runner doesn't check or care whether the shell command can run, and the
> user accepts this risk and studies what commands will be available on the
> worker environment provided by the runner they use in production, before
> productionizing a pipeline with those commands.
>
> Upon some thought I think it's ok. Of course, this carries an obligation
> for runners to document their worker environment and its changes across
> versions. Though for many runners such documentation may be trivial:
> "whatever your YARN cluster has, the runner doesn't change it in any way"
> and it may be good enough for users. And for other runners, like Dataflow,
> such documentation may also be trivial: "no guarantees whatsoever, only
> what you stage in --filesToStage is available".
>
> I can also see Beam develop to a point where we'd want all runners to be
> able to run your DoFn in a user-specified Docker container, and manage
> those intelligently - but I think that's quite a while away and it doesn't
> have to block work on a utility for executing shell commands. Though it'd
> be nice if the utility was forward-compatible with that future world.
>
> On Tue, Dec 6, 2016 at 2:16 AM Jean-Baptiste Onofré  wrote:
>
>> Hi Eugene,
>>
>> thanks for the extended questions.
>>
>> I think we have two levels of expectations here:
>> - end-user responsibility
>> - worker/runner responsibility
>>
>> 1/ From a end-user perspective, the end-user has to know that using a
>> system command (via ExecIO) and more generally speaking anything which
>> relay on worker resources (for instance a local filesystem directory
>> available only on a worker) can fail if the expected resource is not
>> present on all workers. So, basically, all workers should have the same
>> topology. It's what I'm assuming for the PR.
>> For example, I have my Spark cluster, using the same Mesos/Docker setup,
>> then the user knows that all nodes in the cluster will have the same
>> setup and so resources (it could be provided by DevOps for instance).
>> On the other hand, running on Dataflow is different because I don't
>> "control" the nodes (bootstrapping or resources), but in that case, the
>> user knows it (he knows the runner he's using).
>>
>> 2/ As you said, we can expect that runner can deal with some
>> requirements (expressed depending of the pipeline and the runner), and
>> the runner can know the workers which provide capabilities matching
>> those requirements.
>> Then, the end user is not more responsible: the runner will try to
>> define if the pipeline can be executed, and where a DoFn has to be run
>> (on which worker).
>>
>> For me, it's two different levels where 2 is smarter but 1 can also make
>> sense.
>>
>> WDYT ?
>>
>> Regards
>> JB
>>
>> On 12/05/2016 08:51 PM, Eugene Kirpichov wrote:
>> > Hi JB,
>> >
>> > Thanks for bringing this to the mailing list. I also think that this is
>> > useful in general (and that use cases for Beam are more than just classic
>> > bigdata), and that there are interesting questions here at different
>> levels
>> > about how to do it right.
>> >
>> > I suggest to start with the highest-level question [and discuss the
>> > particular API only after agreeing on this, possibly in a separate
>> thread]:
>> > how to deal with the fact that Beam gives no guarantees about the
>> > environment on workers, e.g. which commands are available, which shell or
>> > even OS is being used, etc. Particularly:
>> >
>> > - Obviously different runners will have a different environment, e.g.
>> > Dataflow workers are not going to have Hadoop commands available because
>> > they are not running on a Hadoop cluster. So, pipelines and transforms
>> > developed using this connector will be necessaril

Re: [DISCUSS] Graduation to a top-level project

2016-11-23 Thread Robert Bradshaw
+1

On Wed, Nov 23, 2016 at 7:36 AM, Lukasz Cwik  wrote:
> +1
>
> On Wed, Nov 23, 2016 at 9:48 AM, Stephan Ewen  wrote:
>
>> +1
>> The community if doing very well and behaving very Apache
>>
>> On Wed, Nov 23, 2016 at 9:54 AM, Etienne Chauchot 
>> wrote:
>>
>> > A big +1 of course, very excited to go forward
>> >
>> > Etienne
>> >
>> >
>> >
>> > Le 22/11/2016 à 19:19, Davor Bonaci a écrit :
>> >
>> >> Hi everyone,
>> >> With all the progress we’ve had recently in Apache Beam, I think it is
>> >> time
>> >> we start the discussion about graduation as a new top-level project at
>> the
>> >> Apache Software Foundation.
>> >>
>> >> Graduation means we are a self-sustaining and self-governing community,
>> >> and
>> >> ready to be a full participant in the Apache Software Foundation. It
>> does
>> >> not imply that our community growth is complete or that a particular
>> level
>> >> of technical maturity has been reached, rather that we are on a solid
>> >> trajectory in those areas. After graduation, we will still periodically
>> >> report to, and be overseen by, the ASF Board to ensure continued growth
>> of
>> >> a healthy community.
>> >>
>> >> Graduation is an important milestone for the project. It is also key to
>> >> further grow the user community: many users (incorrectly) see incubation
>> >> as
>> >> a sign of instability and are much less likely to consider us for a
>> >> production use.
>> >>
>> >> A way to think about graduation readiness is through the Apache Maturity
>> >> Model [1]. I think we clearly satisfy all the requirements [2]. It is
>> >> probably worth emphasizing the recent community growth: over each of the
>> >> past three months, no single organization contributing to Beam has had
>> >> more
>> >> than ~50% of the unique contributors per month [2, see assumptions].
>> >> That’s
>> >> a great statistic that shows how much we’ve grown our diversity!
>> >>
>> >> Process-wise, graduation consists of drafting a board resolution, which
>> >> needs to identify the full Project Management Committee, and getting it
>> >> approved by the community, the Incubator, and the Board. Within the Beam
>> >> community, most of these discussions and votes have to be on the
>> private@
>> >> mailing list, but, as usual, we’ll try to keep dev@ updated as much as
>> >> possible.
>> >>
>> >> With that in mind, let’s use this discussion on dev@ for two things:
>> >> * Collect additional data points on our progress that we may want to
>> >> present to the Incubator as a part of the proposal to accept our
>> >> graduation.
>> >> * Determine whether the community supports graduation. Please reply
>> +1/-1
>> >> with any additional comments, as appropriate. I’d encourage everyone to
>> >> participate -- regardless whether you are an occasional visitor or have
>> a
>> >> specific role in the project -- we’d love to hear your perspective.
>> >>
>> >> Data points so far:
>> >> * Project’s maturity self-assessment [2].
>> >> * 1500 pull requests in incubation, which makes us one of the most
>> active
>> >> project across all of ASF on this metric.
>> >> * 3 releases, each driven by a different release manager.
>> >> * 120+ individual contributors.
>> >> * 3 new committers added, 2 of which aren’t from the largest
>> organization.
>> >> * 1027 issues created, 515 resolved.
>> >> * 442 dev@ emails in October alone, sent by 51 individuals.
>> >> * 50 user@ emails in the last 30 days, sent by 22 individuals.
>> >>
>> >> Thanks!
>> >>
>> >> Davor
>> >>
>> >> [1] http://community.apache.org/apache-way/apache-project-
>> >> maturity-model.html
>> >> [2] http://beam.incubator.apache.org/contribute/maturity-model/
>> >>
>> >>
>> >
>>


Re: Configuring Jenkins

2016-11-15 Thread Robert Bradshaw
This is great; thanks for doing this!

On Tue, Nov 15, 2016 at 6:43 AM, Dan Halperin
 wrote:
> Seems phenomenal!
>
> Reading between the lines of your email, it sounds like changes to Jenkins
> configuration will not actually be exercised on the PR that makes them. So,
> we still need to work out a process of how we test changes that would
> affect Jenkins config.
>
> (That does not take away from the fact that DSL is a vast improvement!)
> Dan
>
> On Tue, Nov 15, 2016 at 12:52 PM, Aljoscha Krettek 
> wrote:
>
>> +1 I like this a lot!
>>
>> On Tue, 15 Nov 2016 at 10:37 Jean-Baptiste Onofré  wrote:
>>
>> > Fantastic Davor !
>> >
>> > I like this approach, I gonna take a deeper look.
>> >
>> > Thanks !
>> >
>> > Regards
>> > JB
>> >
>> > On 11/15/2016 10:01 AM, Davor Bonaci wrote:
>> > > Hi everybody,
>> > > As I'm sure everybody knows, we use Apache's Jenkins instance for all
>> our
>> > > testing, including pre-commit, post-commit, nightly snapshot, etc.
>> > (Travis
>> > > CI is a backup system and recommended for individual forks only.)
>> > >
>> > > Managing Jenkins projects has been a big pain point so far. Among other
>> > > reasons, only a few of us have access to configure it, way too few of
>> us
>> > > have visibility into what those jobs do, and nobody has any visibility
>> > into
>> > > changes being made or an opportunity to comment on them.
>> > >
>> > > Well, not any more! I was playing a little bit with Jenkins DSL plugin
>> > and
>> > > was able to move our configuration out of Jenkins and into the git
>> > > repository. I've done it as a proof of concept for the website
>> repository
>> > > only [1], but Jason is planning on extending that work to the main
>> > > repository. Look for a PR shortly!
>> > >
>> > > Going forward, anyone can see what our Jenkins jobs are doing, and
>> anyone
>> > > can add new jobs or improve existing ones by simply proposing a pull
>> > > request to change the configuration. Finally, the project maintains a
>> > > history in source repository, instead of direct changes without much
>> > > accountability.
>> > >
>> > > How this works? There's a "seed" job that periodically applies
>> > > configuration specified in the source repository into Jenkins.
>> Currently,
>> > > this happens once per day. If you modify the configuration in the
>> source
>> > > repository, it will be applied within 24 hours. If you, however, modify
>> > the
>> > > configuration in Jenkins directly, it will revert back to whatever is
>> > > specified in the code repository also within 24 hours.
>> > >
>> > > How to understand Jenkins DSL? There are many resources available; I've
>> > > found Jenkins Job DSL API [2] particularly helpful.
>> > >
>> > > I hope you are excited to have this feature available to us! If you
>> have
>> > > any thoughts on improving this further, please comment. Thanks!
>> > >
>> > > Davor
>> > >
>> > > [1] https://github.com/apache/incubator-beam-site/pull/80
>> > > [2] https://jenkinsci.github.io/job-dsl-plugin/
>> > >
>> >
>> > --
>> > Jean-Baptiste Onofré
>> > jbono...@apache.org
>> > http://blog.nanthrax.net
>> > Talend - http://www.talend.com
>> >
>>


Re: [PROPOSAL] Merge apex-runner to master branch

2016-11-11 Thread Robert Bradshaw
Thanks, David! +1 to getting this into master from me.

On Thu, Nov 10, 2016 at 12:03 AM, David Yan  wrote:
>> >- Have at least 2 contributors interested in maintaining it, and 1
>> >committer interested in supporting it:  *I'm going to sign up for the
>> >support and there are more folks interested. Some have already contrib=
> uted
>> >and helped with PR reviews, others from the Apex community have expres=
> sed
>> >interest [3].*
>>
>> As anyone in the open source ecosystem knows, maintaining is a much
>> higher bar than contributing, but very important. I'd like to see
>> specific names here.
>
>
> I would like to sign up as a maintainer for the Apex runner if possible.
> Thanks!
>
> David


Re: [DISCUSS] Change "RunnableOnService" To A More Intuitive Name

2016-11-10 Thread Robert Bradshaw
+1 to ValidatesRunner. I'd be nice if it were (optionally?)
parameterized by which feature it validates.

@NeedsRunner is odd, as using a runner is the most natural way to
write many (most) tests, but an annotation should be used to mark the
exception, not the norm. (I'd just assume a runner is available for
all tests, e.g. CoreTests depends on DirectRunner depends on Core).

On Thu, Nov 10, 2016 at 10:14 AM, Mark Liu  wrote:
> +1 ValidatesRunner
>
> On Thu, Nov 10, 2016 at 8:40 AM, Kenneth Knowles 
> wrote:
>
>> Nice. I like ValidatesRunner.
>>
>> On Nov 10, 2016 03:39, "Amit Sela"  wrote:
>>
>> > How about @ValidatesRunner ?
>> > Seems to complement @NeedsRunner as well.
>> >
>> > On Thu, Nov 10, 2016 at 9:47 AM Aljoscha Krettek 
>> > wrote:
>> >
>> > > +1
>> > >
>> > > What I would really like to see is automatic derivation of the
>> capability
>> > > matrix from an extended Runner Test Suite. (As outlined in Thomas'
>> doc).
>> > >
>> > > On Wed, 9 Nov 2016 at 21:42 Kenneth Knowles 
>> > > wrote:
>> > >
>> > > > Huge +1 to this.
>> > > >
>> > > > The two categories I care most about are:
>> > > >
>> > > > 1. Tests that need a runner, but are testing the other "thing under
>> > > test";
>> > > > today this is NeedsRunner.
>> > > > 2. Tests that are intended to test a runner; today this is
>> > > > RunnableOnService.
>> > > >
>> > > > Actually the lines are not necessary clear between them, but I think
>> we
>> > > can
>> > > > make good choices, like we already do.
>> > > >
>> > > > The idea of two categories with a common superclass actually has a
>> > > pitfall:
>> > > > what if a test is put in the superclass category, when it does not
>> > have a
>> > > > clear meaning? And also, I don't have any good ideas for names.
>> > > >
>> > > > So I think just replacing RunnableOnService with RunnerTest to make
>> > clear
>> > > > that it is there just to test the runner is good. We might also want
>> > > > RunnerIntegrationTest extends NeedsRunner to use in the IO modules.
>> > > >
>> > > > See also Thomas's doc on capability matrix testing* which is aimed at
>> > > case
>> > > > 2. Those tests should all have a category from the doc, or a new one
>> > > added.
>> > > >
>> > > > *
>> > > >
>> > > >
>> > > https://docs.google.com/document/d/1fICxq32t9yWn9qXhmT07xpclHeHX2
>> > VlUyVtpi2WzzGM/edit
>> > > >
>> > > > Kenn
>> > > >
>> > > > On Wed, Nov 9, 2016 at 12:20 PM, Jean-Baptiste Onofré <
>> j...@nanthrax.net
>> > >
>> > > > wrote:
>> > > >
>> > > > > Hi Mark,
>> > > > >
>> > > > > Generally speaking, I agree.
>> > > > >
>> > > > > As RunnableOnService extends NeedsRunner, @TestsWithRunner or
>> > > > @RunOnRunner
>> > > > > sound clearer.
>> > > > >
>> > > > > Regards
>> > > > > JB
>> > > > >
>> > > > >
>> > > > > On 11/09/2016 09:00 PM, Mark Liu wrote:
>> > > > >
>> > > > >> Hi all,
>> > > > >>
>> > > > >> I'm working on building RunnableOnService in Python SDK. After
>> > having
>> > > > >> discussions with folks, "RunnableOnService" looks like not a very
>> > > > >> intuitive
>> > > > >> name for those unit tests that require runners and build
>> lightweight
>> > > > >> pipelines to test specific components. Especially, they don't have
>> > to
>> > > > run
>> > > > >> on a service.
>> > > > >>
>> > > > >> So I want to raise this idea to the community and see if anyone
>> have
>> > > > >> similar thoughts. Maybe we can come up with a name this is tight
>> to
>> > > > >> runner.
>> > > > >> Currently, I have two names in my head:
>> > > > >>
>> > > > >> - TestsWithRunners
>> > > > >> - RunnerExecutable
>> > > > >>
>> > > > >> Any thoughts?
>> > > > >>
>> > > > >> Thanks,
>> > > > >> Mark
>> > > > >>
>> > > > >>
>> > > > > --
>> > > > > Jean-Baptiste Onofré
>> > > > > jbono...@apache.org
>> > > > > http://blog.nanthrax.net
>> > > > > Talend - http://www.talend.com
>> > > > >
>> > > >
>> > >
>> >
>>


Re: [DISCUSS] Change "RunnableOnService" To A More Intuitive Name

2016-11-09 Thread Robert Bradshaw
I think it's important to tease apart what why we're trying to mark
tests. Generally, nearly all tests should run on all runners. However,
there are some exceptions, namely.

1) Some runners don't support all features (especially at the start).
2) Some tests are incompatible with distributed runners (e.g. rely on
in-process IO fakes)

@RunnableOnService has also been used to mark tests that *should* be
run on the service, as it is prohibitively expensive to run all tests
on all runners. We should also have the notion of a comprehensive
suite of tests a runner should pass to support the full model. This
would exclude many tests that are of unmodified composite transforms
(that hopefully could run on any runner, but the incremental benefit
would be small.)


On Wed, Nov 9, 2016 at 12:20 PM, Jean-Baptiste Onofré  wrote:
> Hi Mark,
>
> Generally speaking, I agree.
>
> As RunnableOnService extends NeedsRunner, @TestsWithRunner or @RunOnRunner
> sound clearer.
>
> Regards
> JB
>
>
> On 11/09/2016 09:00 PM, Mark Liu wrote:
>>
>> Hi all,
>>
>> I'm working on building RunnableOnService in Python SDK. After having
>> discussions with folks, "RunnableOnService" looks like not a very
>> intuitive
>> name for those unit tests that require runners and build lightweight
>> pipelines to test specific components. Especially, they don't have to run
>> on a service.
>>
>> So I want to raise this idea to the community and see if anyone have
>> similar thoughts. Maybe we can come up with a name this is tight to
>> runner.
>> Currently, I have two names in my head:
>>
>> - TestsWithRunners
>> - RunnerExecutable
>>
>> Any thoughts?
>>
>> Thanks,
>> Mark
>>
>
> --
> Jean-Baptiste Onofré
> jbono...@apache.org
> http://blog.nanthrax.net
> Talend - http://www.talend.com


Re: [PROPOSAL] Merge apex-runner to master branch

2016-11-08 Thread Robert Bradshaw
Nice. I'm +1 modulo one caveat below (hopefully easily addressed).

On Tue, Nov 8, 2016 at 5:54 AM, Thomas Weise  wrote:
> Hi,
>
> As per previous discussion [1], I would like to propose to merge the
> apex-runner branch into master. The runner satisfies the criteria outlined
> in [2] and merging it to master will give more visibility to other
> contributors and users.
>
> Specifically the Apex runner addresses:
>
>- Have at least 2 contributors interested in maintaining it, and 1
>committer interested in supporting it:  *I'm going to sign up for the
>support and there are more folks interested. Some have already contributed
>and helped with PR reviews, others from the Apex community have expressed
>interest [3].*

As anyone in the open source ecosystem knows, maintaining is a much
higher bar than contributing, but very important. I'd like to see
specific names here.

>- Provide both end-user and developer-facing documentation:  *Runner has
>README, capability matrix, Javadoc. Planning to add it to the tutorial
>later.*
>- Have at least a basic level of unit test coverage:  *Has 30 runner
>specific tests and passes all Beam RunnableOnService tests.*
>- Run all existing applicable integration tests with other Beam
>components and create additional tests as appropriate: * Enabled runner
>for examples integration tests in the same way as other runners.*
>- Be able to handle a subset of the model that address a significant set of
>use cases (aka. ‘traditional batch’ or ‘processing time
> streaming’):  *Passes
>RunnableOnService without exclusions and example IT.*
>- Update the capability matrix with the current status:  *Done.*
>- Add a webpage under learn/runners: *Same "TODO" page as other runners
>added to site.*
>
> The PR for the merge: https://github.com/apache/incubator-beam/pull/1305
>
> (There are intermittent test failures in individual Travis runs that are
> unrelated to the runner.)
>
> Thanks,
> Thomas
>
> [1]
> https://lists.apache.org/thread.html/2b420a35f05e47561f27c19e8ec6484f595553f32da88fe593ad931d@%3Cdev.beam.apache.org%3E
>
> [2] http://beam.apache.org/contribute/contribution-guide/#feature-branches
>
> [3]
> https://lists.apache.org/thread.html/6e7618768cdcde81c28aa9883a1fcf4d3d4e41de4249547
> 
> 130691d52@%3Cdev.apex.apache.org%3E
> 


Re: Why does `Combine.perKey(SerializableFunction)` require same input and output type

2016-10-31 Thread Robert Bradshaw
On Mon, Oct 31, 2016 at 8:39 PM, Kenneth Knowles  
wrote:
> Manu, I think your critique about user interface clarity is valid.
> CombineFn conflates a few operations and is not that clear about what it is
> doing or why. You seem to be concerned about CombineFn versus
> SerializableFunction constructors for the Combine family of transforms. I
> thought I'd respond from my own perspective, in case it is helpful. It is
> mostly the same things that Luke has said. Let's ignore keys. I don't think
> they change things much.
>
> As you seem to already understand, a CombineFn is a convenient collapsed
> representation of three functions:
>
> init : InputT -> AccumT
> combiner: (AccumT, AccumT) -> AccumT
> extract: AccumT -> OutputT
>
> And the real semantics:
>
> MapElements.via(init)
> Combine.via(combiner)
> MapElements.via(extract)
>
> For starters, "associative" is not even a well-typed word to use unless
> input type and output type are the same. So it is `combiner` that needs to
> be associative and commutative. Sometimes `combiner` also has an identity
> element. I'm afraid `createAccumulator()` and `defaultValue()` confuse
> things here (the latter is never meaningfully used). When we say a
> CombineFn has to be "associative" and "commutative" we just mean that it
> can be factored into these methods.
>
> So the SerializableFunction just needs to be factorable into these methods,
> too, like Luke said. Pragmatically, if we only have a
> SerializableFunction, OutputT> then we don't have a way to
> do hierarchical combines (can't feed the output of one layer into the next
> layer), so associativity is irrelevant and it might as well be a
> MapElements. So it only makes sense to allow
> SerializableFunction, AccumT>. Some variant that is a
> binary function would make sense for lambdas, etc.
>
> Here are some reasons for the particular design of CombineFn that actually
> should be called out:
>
>  - It is a major efficiency gain to mutate the accumulator.
>  - Usually `init` is trivial and best to inline, hence addInput(InputT,
> AccumT)

I would add that often the map InputT -> AccumT is *non-trivial*, as
is AccumT -> AccumT, so AccumT + Input -> AccumT is preferable (both
for efficiency and code simplicity) for anything beyond trivial
combiners. FlumeJava, a predecessor to Beam that we took many lessons
from, had an explicit init rather than addInput and that turned out to
be a drawback when implementing CombineFns.

>  - With `compact` we allow multiple physical representations of the same
> semantic accumulator, and a hook to switch between them
>  - And it is hard to take the user through the journey from the real
> reasons behind it and the particular Java interface
>
> Note also that CombineWithContext allows side inputs, which complicates the
> formalities somewhat but doesn't change the intuition.
>
> Kenn
>
> On Mon, Oct 31, 2016 at 6:37 PM Manu Zhang  wrote:
>
>> I'm a bit confused here because neither of them requires same type of
>> input and output. Also, the Javadoc of Globally says "It is common for {@code
>> *InputT == OutputT}, but not required" *If associative and commutative is
>> expected, why don't they have restrictions like
>> Combine.perKey(SerializableFunction) ?
>>
>> I understand the motive and requirement behind Combine functions. I'm more
>> asking about the user interface consistency.
>> By the way, it's hard to know what Combine.Globally does from the name but
>> that discussion should be put in another thread.
>>
>> Thanks for your patience here.
>>
>> Manu
>>
>> On Tue, Nov 1, 2016 at 12:04 AM Lukasz Cwik  wrote:
>>
>> GlobalCombineFn and PerKeyCombineFn still expect an associative and
>> commutative function when accumulating.
>> GlobalCombineFn is shorthand for assigning everything to a single key,
>> doing the combine, and then discarding the key and extracting the single
>> output.
>> PerKeyCombineFn is shorthand for doing accumulation where the key doesn't
>> modify the accumulation in anyway.
>>
>> On Fri, Oct 28, 2016 at 6:09 PM, Manu Zhang 
>> wrote:
>>
>> Then what about the other interfaces, like Combine.perKey(GlobalCombineFn)
>> and Combine.perKey(PerkeyCombineFn) ? Why not all of them have the
>> requirements ?
>>
>> On Sat, Oct 29, 2016 at 12:06 AM Lukasz Cwik  wrote:
>>
>> For it to be considered a combiner, the function needs to be associative
>> and commutative.
>>
>> The issue is that from an API perspective it would be easy to have a
>> Combine.perKey(SerializableFunction, OutputT>). But many
>> people in the data processing world expect that this
>> parallelization/optimization is performed and thus exposing such a method
>> would be dangerous as it would be breaking users expectations so from the
>> design perspective it is a hard requirement. If PCollections ever become
>> ordered or gain other properties, these requirements may loosen but it
>> seems unlikely in the short term.
>>
>> At this point, I think your looking for a Ma

Re: [DISCUSS] Merging master -> feature branch

2016-10-27 Thread Robert Bradshaw
My concern was mostly about what to do in the face of conflicts, but
it sounds like the consensus is that for a clean merge, with no
conflicts or test breakage (or other concerns) a committer is free to
push without any oversight which is fine by me.

[If/when the Mergbot comes into action, and runs more extensive tests
than standard precommit, it might make sense to still go through that
rather than debug bad merges discovered in postcommit tests.]

On Wed, Oct 26, 2016 at 9:07 PM, Davor Bonaci  wrote:
> +1
>
> I concur it is fine to proceed with a downstream integration (master ->
> feature branch -> sub-feature branch) without waiting for review for a
> completely clean merge. Exactly as proposed -- I think there should still
> be a pull request and comment saying it is a clean merge. (In some ideal
> world, this would happen nightly by a tool automatically, but I think
> that's not feasible in the short term.)
>
> I think other cases (upstream integration, merge conflict, any manual
> action, etc.) should still wait for a normal review.
>
> On Wed, Oct 26, 2016 at 10:34 AM, Thomas Weise  wrote:
>
>> +1
>>
>> For a merge from master to the feature branch that does not require extra
>> changes, RTC does not add value. It actually delays and burns reviewer time
>> (even mechanics need some) that "real" PRs could benefit from. If
>> adjustments are needed, then the regular process kicks in.
>>
>> Thanks,
>> Thomas
>>
>>
>> On Wed, Oct 26, 2016 at 1:33 AM, Amit Sela  wrote:
>>
>> > I generally agree with Kenneth.
>> >
>> > While working on the SparkRunnerV2 branch, it was a pain - i avoided
>> > frequent merges to avoid trivial PRs, but it cost me with very large and
>> > non-trivial merges later.
>> > I think that frequent merges for feature-branches should most of the time
>> > be trivial (no conflicts) and a committer should be allowed to self-merge
>> > once tests pass.
>> > As for conflicts, even for the smallest once I'd go with review just so
>> > it's very clear when self-merging is OK - we can always revisit this
>> later
>> > and further discuss if we think we can improve this process.
>> >
>> > I guess +1 from me.
>> >
>> > Thanks,
>> > Amit.
>> >
>> > On Wed, Oct 26, 2016 at 8:10 AM Frances Perry 
>> > wrote:
>> >
>> > > On Tue, Oct 25, 2016 at 9:44 PM, Jean-Baptiste Onofré > >
>> > > wrote:
>> > >
>> > > > Agree. When possible it would be great to have the branch merged on
>> > > master
>> > > > quickly, even when it's not fully ready. It would give more
>> visibility
>> > to
>> > > > potential contributors.
>> > > >
>> > >
>> > > This thread is about the opposite, I think -- merging master into
>> feature
>> > > branches regularly to prevent them from getting out of sync.
>> > >
>> > > As for increasing the visibility of feature branches, we have these new
>> > > webpages:
>> > > http://beam.incubator.apache.org/contribute/work-in-progress/
>> > > http://beam.incubator.apache.org/contribute/contribution-
>> > > guide/#feature-branches
>> > > with more changes coming in the basic SDK/Runner landing pages too.
>> > >
>> >
>>


Re: [DISCUSS] Using Verbs for Transforms

2016-10-27 Thread Robert Bradshaw
+1 to all Dan says.

I only brought this up because it seemed new contributors (yay)
jumping in and renaming a core transform based on "Something to
consider" deserved a couple more more eyeballs, but didn't intend for
it to become a big deal.

On Thu, Oct 27, 2016 at 11:03 AM, Dan Halperin
 wrote:
> Folks, I don't think this needs to be a "vote". This is just not that big a
> deal :). It is important to be transparent and have these discussions on
> the list, which is why we brought it here from GitHub/JIRA, but at the end
> of the day I hope that a small group of committers and developers can
> assess "good enough" consensus for these minor issues.
>
> Here's my assessment:
> * We don't really have any rules about naming transforms. "Should be a
> verb" is a sort of guiding principle inherited from the Google Flume
> project from which Dataflow evolved, but honestly we violate this rule for
> clarity all over the place. ("Values", for example).
> * The "Big Data" community is significantly more familiar with the concept
> of Distinct -- Jesse, who filed the original JIRA, is a good example here.
> * Finally, nobody feels very strongly. We could argue minor points of each
> solution, but at the end of the day I don't think anyone wants to block a
> change.
>
> Let's go with Distinct. It's important to align Beam with the open source
> big data community. (And thanks Jesse, our newest (*tied) committer, for
> pushing us in the right direction!)
>
> Jesse, can you please take charge of wrapping up the PR and merging it?
>
> Thanks!
> Dan
>
> On Wed, Oct 26, 2016 at 11:12 PM, Jean-Baptiste Onofré 
> wrote:
>
>> Just to clarify. Davor is right for a code modification change: -1 means a
>> veto.
>> I meant that -1 is not a veto for a release vote.
>>
>> Anyway, even if it's not a formal code, we can have a discussion with
>> "options" a,b and c.
>>
>> Regards
>> JB
>>
>> ⁣
>>
>> On Oct 27, 2016, 06:48, at 06:48, Davor Bonaci 
>> wrote:
>> >In terms of reaching a decision on any code or design changes,
>> >including
>> >this one, I'd suggest going without formal votes. Voting process for
>> >code
>> >modifications between choices A and B doesn't necessarily end with a
>> >decision A or B -- a single (qualified) -1 vote is a veto and cannot be
>> >overridden [1]. Said differently, the guideline is that code changes
>> >should
>> >be made by consensus; not by one group outvoting another. I'd like to
>> >avoid
>> >setting such precedent; we should try to drive consensus, as opposed to
>> >attempting to outvote another part of the community.
>> >
>> >In this particular case, we have had a great discussion. Many
>> >contributors
>> >brought different perspectives. Consequently, some opinions have been
>> >likely changed. At this point, someone should summarize the arguments,
>> >try
>> >to critique them from a neutral standpoint, and suggest a refined
>> >proposal
>> >that takes these perspectives into account. If nobody objects in a
>> >short
>> >time, we should consider this decided. [ I can certainly help here, but
>> >I'd
>> >love to see somebody else do it! ]
>> >
>> >[1] http://www.apache.org/foundation/voting.html
>> >
>> >On Wed, Oct 26, 2016 at 7:35 AM, Ben Chambers
>> >
>> >wrote:
>> >
>> >> I also like Distinct since it doesn't make it sound like it modifies
>> >any
>> >> underlying collection. RemoveDuplicates makes it sound like the
>> >duplicates
>> >> are removed, rather than a new PCollection without duplicates being
>> >> returned.
>> >>
>> >> On Wed, Oct 26, 2016, 7:36 AM Jean-Baptiste Onofré 
>> >> wrote:
>> >>
>> >> > Agree. It was more a transition proposal.
>> >> >
>> >> > Regards
>> >> > JB
>> >> >
>> >> > ⁣
>> >> >
>> >> > On Oct 26, 2016, 08:31, at 08:31, Robert Bradshaw
>> >> >  wrote:
>> >> > >On Mon, Oct 24, 2016 at 11:02 PM, Jean-Baptiste Onofré
>> >> > > wrote:
>> >> > >> And what about use RemoveDuplicates and create an alias Distinct
>> >?
>> >> > >
>> >> > >I'd really like to avoid (long term) aliases--you end up having to
>> >> > 

Re: Tracking backward-incompatible changes for Beam

2016-10-27 Thread Robert Bradshaw
If the API/semantics are sufficiently well tested, backwards
incompatibility should manifest as test failures. The corollary is
that one should look closely at any test changes that get proposed.

On Mon, Oct 24, 2016 at 1:52 PM, Davor Bonaci  wrote:
> I don't think we have it right now. We should, of course, but this is
> something that needs to be defined/discussed first.
>
> On Mon, Oct 24, 2016 at 1:20 PM, Neelesh Salian 
> wrote:
>
>> +1 for the labels and also a need for tests.
>> Do we document any rules for backward-compatibility? Be good to have a
>> checklist-like list.
>>
>>
>>
>>
>> On Mon, Oct 24, 2016 at 1:02 PM, Davor Bonaci 
>> wrote:
>>
>> > It would be awesome to have that! At least a good portion of
>> > backward-incompatible changes could be automatically caught.
>> >
>> > We should also think about defining backward-compatibility more
>> precisely.
>> > This would be good in its own right, but also necessary to configure the
>> > tool. Historically, we have applied the backward-compatibility rules on
>> > APIs that are intended for users, excluding experimental ones, but not
>> > necessarily on all publicly visible APIs. If we continue this practice,
>> it
>> > might be a challenge for the tool. In any case, I think there's a good
>> > discussion to be had around what backward-compatibility means exactly in
>> > Beam.
>> >
>> > On Sat, Oct 22, 2016 at 2:47 AM, Aljoscha Krettek 
>> > wrote:
>> >
>> > > Very good idea!
>> > >
>> > > Should we already start thinking about automatic tests for backwards
>> > > compatibility of the API?
>> > >
>> > > On Fri, 21 Oct 2016 at 10:56 Jean-Baptiste Onofré 
>> > wrote:
>> > >
>> > > > Hi Dan,
>> > > >
>> > > > +1, good idea.
>> > > >
>> > > > Regards
>> > > > JB
>> > > >
>> > > > On 10/21/2016 02:21 AM, Dan Halperin wrote:
>> > > > > Hey everyone,
>> > > > >
>> > > > > In the Beam codebase, we’ve improved, rewritten, or deleted many
>> > APIs.
>> > > > > While this has improved the model and gives us great freedom to
>> > > > experiment,
>> > > > > we are also causing churn on users authoring Beam libraries and
>> > > > pipelines.
>> > > > >
>> > > > > To really kick off Beam as something users can depend on, we need
>> to
>> > > > > stabilize the Beam API. Stabilizing means a commitment to not
>> making
>> > > > > breaking changes -- except between major versions as per standard
>> > > > semantic
>> > > > > versioning.
>> > > > >
>> > > > > To get there, I’ve started a process for tracking these changes by
>> > > > applying
>> > > > > the `backward-incompatible` label [1] to the corresponding JIRA
>> > issues.
>> > > > > Naturally, open `backward-incompatible` changes are “blocking
>> issues”
>> > > for
>> > > > > the first stable release. (Or we’ll have to put them off for the
>> next
>> > > > major
>> > > > > version!)
>> > > > >
>> > > > > So here are some requests for help:
>> > > > > * Please review and appropriately label the components I skipped:
>> > > > > runner-{apex, flink, gearpump, spark}, sdk-py.
>> > > > > * Please proactively file JIRA issues for breaking API changes you
>> > > still
>> > > > > want to make, and label them.
>> > > > >
>> > > > > Thanks everyone!
>> > > > > Dan
>> > > > >
>> > > > >
>> > > > > [1]
>> > > > >
>> > > > https://issues.apache.org/jira/issues/?jql=project%20%
>> > > 3D%20BEAM%20AND%20labels%20%3D%20backward-incompatible
>> > > > >
>> > > >
>> > > > --
>> > > > Jean-Baptiste Onofré
>> > > > jbono...@apache.org
>> > > > http://blog.nanthrax.net
>> > > > Talend - http://www.talend.com
>> > > >
>> > >
>> >
>>
>>
>>
>> --
>> Neelesh Srinivas Salian
>> Customer Operations Engineer
>>


Re: [DISCUSS] Using Verbs for Transforms

2016-10-25 Thread Robert Bradshaw
On Mon, Oct 24, 2016 at 11:02 PM, Jean-Baptiste Onofré  
wrote:
> And what about use RemoveDuplicates and create an alias Distinct ?

I'd really like to avoid (long term) aliases--you end up having to
document (and maintain) them both, and it adds confusion as to which
one to use (especially if they every diverge), and means searching for
one or the other yields half the results.

> It doesn't break the API and would address both SQL users and more "big data" 
> users.
>
> My $0.01 ;)
>
> Regards
> JB
>
> ⁣
>
> On Oct 24, 2016, 22:23, at 22:23, Dan Halperin  
> wrote:
>>I find "MakeDistinct" more confusing. My votes in decreasing
>>preference:
>>
>>1. Keep `RemoveDuplicates` name, ensure that important keywords are in
>>the
>>Javadoc. This reduces churn on our users and is honestly pretty dang
>> descriptive.
>>2. Rename to `Distinct`, which is clear if you're a SQL user and likely
>>less clear otherwise. This is a backwards-incompatible API change, so
>>we
>>should do it before we go stable.
>>
>>I am not super strong that 1 > 2, but I am very strong that "Distinct"
>
>>"MakeDistinct" or and "RemoveDuplicates" >>> "AvoidDuplicate".
>>
>>Dan
>>
>>On Mon, Oct 24, 2016 at 10:12 AM, Kenneth Knowles
>>
>>wrote:
>>
>>> The precedent that we use verbs has many exceptions. We have
>>> ApproximateQuantiles, Values, Keys, WithTimestamps, and I would even
>>> include Sum (at least when I read it).
>>>
>>> Historical note: the predilection towards verbs is from the Google
>>Style
>>> Guide for Java method names
>>>
>>,
>>> which states "Method names are typically verbs or verb phrases". But
>>even
>>> in Google code there are lots of exceptions when it makes sense, like
>>> Guava's
>>> Iterables.any(), Iterables.all(), Iterables.toArray(), the entire
>>> Predicates module, etc. Just an aside; Beam isn't Google code. I
>>suggest we
>>> use our judgment rather than a policy.
>>>
>>> I think "Distinct" is one of those exceptions. It is a standard
>>widespread
>>> name and also reads better as an adjective. I prefer it, but also
>>don't
>>> care strongly enough to change it or to change it back :-)
>>>
>>> If we must have a verb, I like it as-is more than MakeDistinct and
>>> AvoidDuplicate.
>>>
>>> On Mon, Oct 24, 2016 at 9:46 AM Jesse Anderson
>>
>>> wrote:
>>>
>>> > My original thought for this change was that Crunch uses the class
>>name
>>> > Distinct. SQL also uses the keyword distinct.
>>> >
>>> > Maybe the rule should be changed to adjectives or verbs depending
>>on the
>>> > context.
>>> >
>>> > Using a verb to describe this class really doesn't connote what the
>>class
>>> > does as succinctly as the adjective.
>>> >
>>> > On Mon, Oct 24, 2016 at 9:40 AM Neelesh Salian
>>
>>> > wrote:
>>> >
>>> > > Hello,
>>> > >
>>> > > First of all, thank you to Daniel, Robert and Jesse for their
>>review on
>>> > > this: https://issues.apache.org/jira/browse/BEAM-239
>>> > >
>>> > > A point that came up was using verbs explicitly for Transforms.
>>> > > Here is the PR:
>>https://github.com/apache/incubator-beam/pull/1164
>>> > >
>>> > > Posting it to help understand if we have a consensus for it and
>>if yes,
>>> > we
>>> > > could perhaps document it for future changes.
>>> > >
>>> > > Thank you.
>>> > >
>>> > > --
>>> > > Neelesh Srinivas Salian
>>> > > Engineer
>>> > >
>>> >
>>>


Re: [DISCUSS] Merging master -> feature branch

2016-10-25 Thread Robert Bradshaw
On Tue, Oct 25, 2016 at 2:33 PM, Kenneth Knowles  
wrote:
> Hi all,
>
> While collaborating on the apex-runner branch, the issue of how best to
> continuously merge master into the feature branch came up. IMO it differs
> somewhat from normal commits in two notable ways:
>
> 1. Modulo fix-ups, it is actually not adding any new code to the overall
> codebase, so reviews don't serve to raise the quality bar for contributions.
> 2. It is pretty important to do very frequently to keep out of trouble, so
> friction must be thoroughly justified.
>
> I really wouldn't want reviewing a daily merge from master to consume
> someone's time every day. On the gearpump-runner branch we had some major
> review latency problems. Obviously, I'd like to hear from folks on other
> feature branches. How has this process been for the Python SDK?

The Python SDK sits at the extreme end of there being no conflicts,
and due to the paucity of intersection, little motivation to bother
merging at all.

> I will also throw out an idea for a variation in process:
>
> 1. A committer merges master to their feature branch without conflicts.*
> 2. They open a PR for the merge.
> 3a. If the tests pass without modifications _the committer self-merges the
> PR without waiting for review_.
> 3b. If there are any adjustments needed, these go in separate commits on
> the same PR and the review process is as usual (the reviewer can review
> just the added commits).
>
> What do you think? Does this treat such merges too blithely? This is meant
> just as a starting point for discussion.
>
> Kenn
>
> * There should never be real conflicts unless something strange has
> happened - the feature can't be edited on the master branch, and master
> stuff shouldn't be touched on the feature branch.

I think you're being a little optimistic about there rarely being a
need for conflict resolution--often work on a feature requires
refactoring/extending the main codebase. Hopefully as we provide a
clean (and stable) API between runners and SDKs this will still be the
case, but I don't think we're there yet.

If there are conflicts, does one check in the conflict markers then
resolve in a separate commit? Only for messy ones, or all the time?
Certainly if further adjustments are needed we should do a full
review.

In my opinion, reviewing a merge should not be too laborious a process
(e.g. one needn't necessarily read the entire diff as one would with a
standard commit). It shouldn't be blocking anyone to wait for a review
(one can always develop on top of the merge). If there's not enough
activity on a branch to do this, the branch has other troubles. So I'd
lean towards not making an exception here, but could be convinced
otherwise.

- Robert


Re: The Availability of PipelineOptions

2016-10-25 Thread Robert Bradshaw
+1

On Tue, Oct 25, 2016 at 7:26 AM, Thomas Weise  wrote:
> +1
>
>
> On Tue, Oct 25, 2016 at 3:03 AM, Jean-Baptiste Onofré 
> wrote:
>
>> +1
>>
>> Agree
>>
>> Regards
>> JB
>>
>> ⁣
>>
>> On Oct 25, 2016, 12:01, at 12:01, Aljoscha Krettek 
>> wrote:
>> >+1 This sounds quite straightforward.
>> >
>> >On Tue, 25 Oct 2016 at 01:36 Thomas Groh 
>> >wrote:
>> >
>> >> Hey everyone,
>> >>
>> >> I've been working on a declaration of intent for how we want to use
>> >> PipelineOptions and an API change to be consistent with that intent.
>> >This
>> >> is generally part of the move to the Runner API, specifically the
>> >desire to
>> >> be able to reuse Pipelines and the ability to choose runner at the
>> >time of
>> >> the call to run.
>> >>
>> >> The high-level summary is I wan to remove the
>> >Pipeline.getPipelineOptions
>> >> method.
>> >>
>> >> I believe this will be compatible with other in-flight proposals,
>> >> especially Dynamic PipelineOptions, but would love to see what
>> >everyone
>> >> else thinks. The document is available at the link below.
>> >>
>> >>
>> >>
>> >https://docs.google.com/document/d/1Wr05cYdqnCfrLLqSk-
>> -XmGMGgDwwNwWZaFbxLKvPqEQ/edit?usp=sharing
>> >>
>> >> Thanks,
>> >>
>> >> Thomas
>> >>
>>


Re: [DISCUSS] Current ongoing work on runners

2016-10-24 Thread Robert Bradshaw
I think it would be worth publishing a compatibility matrix, if not on
the main site, as part of the branch itself.

Even better would be if the compatibility matrix was automatically
deduced based on a suite of tests that each runner could (attempt to)
pass.

On Mon, Oct 24, 2016 at 12:52 PM, Ismaël Mejía  wrote:
> Hello,
>
> I am really happy to see new runners been contributed to our community
> (e.g. GearPump and Apex recently). We have not discussed a lot about the
> current capabilities of both runners.
>
> Following the recent discussion about making ongoing work more explicit in
> the mailing list, I would like to ask the people involved about the current
> status of them, I think it is important to discuss this (apart of creating
> the given JIRAs + updating the capability matrix docs) because more people
> can eventually jump and give a hand on open issues.
>
> I remember there was a google doc for the  capabilities of each runner, is
> this doc still available (sorry I lost the link). I suppose that once these
> ongoing runners mature we can add this doc also to the website.
> https://beam.apache.org/learn/runners/capability-matrix/
>
> Regards,
> Ismaël
>
> ps. @Amit, given that the spark 2 (Dataset based) runner has also a feature
> branch, if you consider it worth, can you please share a bit about that
> work too.
>
> ps2. Can anyone please share the link to the google doc I was talking
> about, I can't find it after the recent changes to the website.
>


Re: [DISCUSS] Using Verbs for Transforms

2016-10-24 Thread Robert Bradshaw
On Mon, Oct 24, 2016 at 8:52 PM, Robert Bradshaw  wrote:
> On Mon, Oct 24, 2016 at 10:12 AM, Kenneth Knowles
>  wrote:
>> The precedent that we use verbs has many exceptions. We have
>> ApproximateQuantiles, Values, Keys, WithTimestamps, and I would even
>> include Sum (at least when I read it).
>
> True.
>
>> Historical note: the predilection towards verbs is from the Google Style
>> Guide for Java method names
>> <https://google.github.io/styleguide/javaguide.html#s5.2.3-method-names>,
>> which states "Method names are typically verbs or verb phrases". But even
>> in Google code there are lots of exceptions when it makes sense, like Guava's
>> Iterables.any(), Iterables.all(), Iterables.toArray(), the entire
>> Predicates module, etc. Just an aside; Beam isn't Google code. I suggest we
>> use our judgment rather than a policy.
>
> Yes, we should favor what flows well. Verbs often do, but...

On this note, however, the first attempt at trigger builders were
developed to be "fluent" and read like English sentences, but in
retrospect were needlessly verbose.

>> I think "Distinct" is one of those exceptions. It is a standard widespread
>> name and also reads better as an adjective. I prefer it, but also don't
>> care strongly enough to change it or to change it back :-)
>>
>> If we must have a verb, I like it as-is more than MakeDistinct and
>> AvoidDuplicate.
>
> I much prefer "Distinct" to the other options forcing it to be
> verb-like (despite being the one to bring this up). My (weak)
> preference is to leave RemoveDuplicates with better documentation, but
> Distinct could be fine.
>


Re: [DISCUSS] Using Verbs for Transforms

2016-10-24 Thread Robert Bradshaw
On Mon, Oct 24, 2016 at 10:12 AM, Kenneth Knowles
 wrote:
> The precedent that we use verbs has many exceptions. We have
> ApproximateQuantiles, Values, Keys, WithTimestamps, and I would even
> include Sum (at least when I read it).

True.

> Historical note: the predilection towards verbs is from the Google Style
> Guide for Java method names
> ,
> which states "Method names are typically verbs or verb phrases". But even
> in Google code there are lots of exceptions when it makes sense, like Guava's
> Iterables.any(), Iterables.all(), Iterables.toArray(), the entire
> Predicates module, etc. Just an aside; Beam isn't Google code. I suggest we
> use our judgment rather than a policy.

Yes, we should favor what flows well. Verbs often do, but...

> I think "Distinct" is one of those exceptions. It is a standard widespread
> name and also reads better as an adjective. I prefer it, but also don't
> care strongly enough to change it or to change it back :-)
>
> If we must have a verb, I like it as-is more than MakeDistinct and
> AvoidDuplicate.

I much prefer "Distinct" to the other options forcing it to be
verb-like (despite being the one to bring this up). My (weak)
preference is to leave RemoveDuplicates with better documentation, but
Distinct could be fine.

> On Mon, Oct 24, 2016 at 9:46 AM Jesse Anderson 
> wrote:
>
>> My original thought for this change was that Crunch uses the class name
>> Distinct. SQL also uses the keyword distinct.
>>
>> Maybe the rule should be changed to adjectives or verbs depending on the
>> context.
>>
>> Using a verb to describe this class really doesn't connote what the class
>> does as succinctly as the adjective.
>>
>> On Mon, Oct 24, 2016 at 9:40 AM Neelesh Salian 
>> wrote:
>>
>> > Hello,
>> >
>> > First of all, thank you to Daniel, Robert and Jesse for their review on
>> > this: https://issues.apache.org/jira/browse/BEAM-239
>> >
>> > A point that came up was using verbs explicitly for Transforms.
>> > Here is the PR: https://github.com/apache/incubator-beam/pull/1164
>> >
>> > Posting it to help understand if we have a consensus for it and if yes,
>> we
>> > could perhaps document it for future changes.
>> >
>> > Thank you.
>> >
>> > --
>> > Neelesh Srinivas Salian
>> > Engineer
>> >
>>


Re: [ANNOUNCEMENT] New committers!

2016-10-22 Thread Robert Bradshaw
Congrats and welcome to all three of you!

On Sat, Oct 22, 2016 at 9:02 AM, Thomas Weise  wrote:
> Thanks everyone!
>
>
> On Sat, Oct 22, 2016 at 12:59 AM, Aljoscha Krettek 
> wrote:
>
>> Welcome everyone! +3 :-)
>>
>> On Sat, 22 Oct 2016 at 06:43 Jean-Baptiste Onofré  wrote:
>>
>> > Just a small thing.
>> >
>> > If it's not already done, don't forget to sign a ICLA and let us know
>> > your apache ID.
>> >
>> > Thanks,
>> > Regards
>> > JB
>> >
>> > On 10/22/2016 12:18 AM, Davor Bonaci wrote:
>> > > Hi everyone,
>> > > Please join me and the rest of Beam PPMC in welcoming the following
>> > > contributors as our newest committers. They have significantly
>> > contributed
>> > > to the project in different ways, and we look forward to many more
>> > > contributions in the future.
>> > >
>> > > * Thomas Weise
>> > > Thomas authored the Apache Apex runner for Beam [1]. This is an
>> exciting
>> > > new runner that opens a new user base. It is a large contribution,
>> which
>> > > starts the whole new component with a great potential.
>> > >
>> > > * Jesse Anderson
>> > > Jesse has contributed significantly by promoting Beam. He has
>> > co-developed
>> > > a Beam tutorial and delivered it at a top big data conference. He
>> > published
>> > > several blog posts positioning Beam, Q&A with the Apache Beam team,
>> and a
>> > > demo video how to run Beam on multiple runners [2]. On the side, he has
>> > > authored 7 pull requests and reported 6 JIRA issues.
>> > >
>> > > * Thomas Groh
>> > > Since starting incubation, Thomas has contributed the most commits to
>> the
>> > > project [3], a total of 226 commits, which is more than anybody else.
>> He
>> > > has contributed broadly to the project, most significantly by
>> developing
>> > > from scratch the DirectRunner that supports the full model semantics.
>> > > Additionally, he has contributed a new set of APIs for testing
>> unbounded
>> > > pipelines. He published a blog highlighting this work.
>> > >
>> > > Congratulations to all three! Welcome!
>> > >
>> > > Davor
>> > >
>> > > [1] https://github.com/apache/incubator-beam/tree/apex-runner
>> > > [2] http://www.smokinghand.com/
>> > > [3] https://github.com/apache/incubator-beam/graphs/contributors
>> > > ?from=2016-02-01&to=2016-10-14&type=c
>> > >
>> >
>> > --
>> > Jean-Baptiste Onofré
>> > jbono...@apache.org
>> > http://blog.nanthrax.net
>> > Talend - http://www.talend.com
>> >
>>


Re: [DISCUSS] Deferring (pre) combine for merging windows.

2016-10-22 Thread Robert Bradshaw
On Sat, Oct 22, 2016 at 2:38 AM, Amit Sela  wrote:
> I understand the semantics, but I feel like there might be a different
> point of view for open-source runners.

It seems we're losing a major promise of the runner interchangeability
story if different runners can give different results for a
well-defined transformation. I strongly feel we should avoid that path
whenever possible. Specifically in this case Combine.perKey should
mean the same thing on all runners (namely its composite definition),
and only be executed differently when it's safe to do so.

> Dataflow is a service, and it tries to do it's best to optimize execution
> while users don't have to worry about internal implementation (they are not
> aware of it).
> I can assure
> <https://databricks.gitbooks.io/databricks-spark-knowledge-base/content/best_practices/prefer_reducebykey_over_groupbykey.html>
> you that for Spark users, applying groupByKey instead of combinePerKey is
> an important note.

For sure. Dataflow calls this out too. See the second star at
https://cloud.google.com/dataflow/model/combine#using-combine-perkey
(though it's not called out as prominently as it is for Spark
users--likely should be more). Beam documentation should make this
point as well.

> @Aljoscha do Flink users (working on Flink native API) usually care about
> this difference of implementation ?
> Any other runners that can provide input ?

IIRC, Flink and Dataflow (and, trivially, the direct runner) all avoid
this unsafe optimization when merging windows are mixed with
non-global side inputs.

Note also that the user of the Combine.perKey transform may not know
the choice of windowing of the main or side inputs, so can't make this
determination of whether it's safe to use this optimization. (As a
concrete example, suppose I created a TopNPercent transform that did a
global count and passed that as a side input to the Top CombineFn.)

> On Sat, Oct 22, 2016 at 2:25 AM Robert Bradshaw 
> wrote:
>
> Combine.perKey() is defined as GroupByKey() | Combine.values().
>
> A runner is free, in fact encouraged, to take advantage of the
> associative properties of CombineFn to compute the result of
> GroupByKey() | Combine.values() as cheaply as possible, but it is
> incorrect to produce something that could not have been produced by
> this composite implementation. (In the case of deterministic trigger
> firing, (e.g. the default trigger), plus assuming of course a
> associative, deterministic CombineFn, there is exactly one correct
> output for every input no matter the WindowFns).
>
> A corollary to this is that we cannot apply combining operations that
> inspect the main input window (including side inputs where the mapping
> is anything but the constant map (like to GlobalWindow)) until the
> main input window is known.
>
>
> On Fri, Oct 21, 2016 at 3:50 PM, Amit Sela  wrote:
>> Please excuse my typos and apply "s/differ/defer/g" ;-).
>> Amit.
>>
>> On Fri, Oct 21, 2016 at 2:59 PM Amit Sela  wrote:
>>
>>> I'd like to raise an issue that was discussed in BEAM-696
>>> <https://issues.apache.org/jira/browse/BEAM-696>.
>>> I won't recap here because it would be extensive (and probably
>>> exhaustive), and I'd also like to restart the discussion here rather then
>>> summarize it.
>>>
>>> *The problem*
>>> In the case of (main) input in a merging window (e.g. Sessions) with
>>> sideInputs, pre-combining might lead to non-deterministic behaviour, for
>>> example:
>>> Main input: e1 (time: 3), e2 (time: 5)
>>> Session: gap duration of 3 -> e1 alone belongs to [3, 6), e2 alone [5,
> 8),
>>> combined together the merging of their windows yields [3, 8).
>>> Matching SideInputs with FixedWindows of size 2 should yield - e1
> matching
>>> sideInput window [4, 6), e2 [6, 8), merged [6, 8).
>>> Now, if the sideInput is used in a merging step of the combine, and both
>>> elements are a part of the same bundle, the sideInput accessed will
>>> correspond to [6, 8) which is the expected behaviour, but if e1 is
>>> pre-combined in a separate bundle, it will access sideInput for [4, 6)
>>> which is wrong.
>>> ** this can tends to be a bit confusing, so any
> clarifications/corrections
>>> are most welcomed.*
>>>
>>> *Solutions*
>>> The optimal solution would be to differ until trigger in case of merging
>>> windows with sideInputs that are not "agnostic" to such behaviour, but
> this
>>> is clearly not feasible since the nature and use of sideInputs in
>>> CombineFns are opaque.
>>> Second best would be

Re: [DISCUSS] Deferring (pre) combine for merging windows.

2016-10-21 Thread Robert Bradshaw
Combine.perKey() is defined as GroupByKey() | Combine.values().

A runner is free, in fact encouraged, to take advantage of the
associative properties of CombineFn to compute the result of
GroupByKey() | Combine.values() as cheaply as possible, but it is
incorrect to produce something that could not have been produced by
this composite implementation. (In the case of deterministic trigger
firing, (e.g. the default trigger), plus assuming of course a
associative, deterministic CombineFn, there is exactly one correct
output for every input no matter the WindowFns).

A corollary to this is that we cannot apply combining operations that
inspect the main input window (including side inputs where the mapping
is anything but the constant map (like to GlobalWindow)) until the
main input window is known.


On Fri, Oct 21, 2016 at 3:50 PM, Amit Sela  wrote:
> Please excuse my typos and apply "s/differ/defer/g" ;-).
> Amit.
>
> On Fri, Oct 21, 2016 at 2:59 PM Amit Sela  wrote:
>
>> I'd like to raise an issue that was discussed in BEAM-696
>> .
>> I won't recap here because it would be extensive (and probably
>> exhaustive), and I'd also like to restart the discussion here rather then
>> summarize it.
>>
>> *The problem*
>> In the case of (main) input in a merging window (e.g. Sessions) with
>> sideInputs, pre-combining might lead to non-deterministic behaviour, for
>> example:
>> Main input: e1 (time: 3), e2 (time: 5)
>> Session: gap duration of 3 -> e1 alone belongs to [3, 6), e2 alone [5, 8),
>> combined together the merging of their windows yields [3, 8).
>> Matching SideInputs with FixedWindows of size 2 should yield - e1 matching
>> sideInput window [4, 6), e2 [6, 8), merged [6, 8).
>> Now, if the sideInput is used in a merging step of the combine, and both
>> elements are a part of the same bundle, the sideInput accessed will
>> correspond to [6, 8) which is the expected behaviour, but if e1 is
>> pre-combined in a separate bundle, it will access sideInput for [4, 6)
>> which is wrong.
>> ** this can tends to be a bit confusing, so any clarifications/corrections
>> are most welcomed.*
>>
>> *Solutions*
>> The optimal solution would be to differ until trigger in case of merging
>> windows with sideInputs that are not "agnostic" to such behaviour, but this
>> is clearly not feasible since the nature and use of sideInputs in
>> CombineFns are opaque.
>> Second best would be to differ until trigger *only* if sideInputs are
>> used for merging windows - pretty sure this is how Flink and Dataflow (soon
>> Spark) runners do that.
>>
>> *Tradeoffs*
>> This seems like a very user-friendly way to apply authored pipelines
>> correctly, but this also means that users who called for a Combine
>> transformation will get a Grouping transformation instead (sort of the
>> opposite of combiner lifting ? a combiner unwrapping ?).
>> For the SDK, Combine is simply a composite transform, but keep in mind
>> that this affects runner optimization.
>> The price to pay here is (1) shuffle all elements into a single bundle
>> (the cost varies according to a runner's typical bundle size) (2) state can
>> grow as processing is differed and not compacted until triggered.
>>
>> IMHO, the execution should remain faithful to what the pipeline states,
>> and if this results in errors, well... it happens.
>> There are many legitimate use cases where an actual GroupByKey should be
>> used (regardless of sideInputs), such as sequencing of events in a window,
>> and I don't see the difference here.
>>
>> As stated above, I'm (almost) not recapping anyones notes as they are
>> persisted in BEAM-696, so if you had something to say please provide you
>> input here.
>> I will note that Ben Chambers and Pei He mentioned that even with
>> differing, this could still run into some non-determinism if there are
>> triggers controlling when we extract output because non-merging windows'
>> trigger firing is non-deterministic.
>>
>> Thanks,
>> Amit
>>
>>


Re: [DISCUSS] Executing (Jenkins) RunnableOnService tests more efficiently.

2016-10-20 Thread Robert Bradshaw
+1 to making these cheaper.

Another idea I've had, though I'm not sure how hard it would be to
implement, is that many ROS tests are trivial pipelines and dominated
by the overhead of invoking the service itself. I wonder how hard it
would be to transparently "pack" many pipelines into single job
requests whose success would be a very strong (though not perfect)
indicator that each pipeline would have succeeded individually. Of
course not all tests are structured in this manner, but from my
experience most are, and this could be significant savings.

On Thu, Oct 20, 2016 at 12:24 PM, Amit Sela  wrote:
> Hi all,
>
> I'd like to discuss options to execute ROS tests (per runner) more
> efficiently, and explore the option of running them on PreCommit, as
> opposed to PostCommit as they run today.
>
> The SDK provides a set of tests called "RunnableOnService" (aka ROS) that
> can be applied to a runner and validate it (correctly) supports SDK
> features.
> It's 300+ tests in total (batch + streaming) and it clearly takes time, and
> that is why it runs on PostCommit.
> I think we should look for a configuration where this is executed
> more efficiently and if possible run on PreCommit since runners are
> encouraged to rely on those tests and it's better to know of breaking
> changes before hand.
>
> This came up somewhere in this
>  conversation, and the
> highlights are basically:
>
> Kenneth Knowles suggested we might parallelize sub-builds in the following
> manner:
>
>1. Run unit tests.
>2. (sub tasks) Run ROS tests for each runner in parallel, skipping unit
>tests.
>
> I was wondering if we could setup Jenkins to run ROS per runner only of
> there was a code change for that runner - of course SDK changes will
> probably have to run ROS for all runners, but that might still be an
> optimization.
>
> I think one of Beam's best sell-points is it's extensive testing framework,
> and the fact that runners can be validated across capabilities, but it
> would be best to know of runner-braking changes before merging to master.
>
> Thoughts ?
>
> Thanks,
> Amit


Re: Placement of temporary files by FileBasedSink

2016-10-20 Thread Robert Bradshaw
On Thu, Oct 20, 2016 at 9:58 AM, Kenneth Knowles  
wrote:
> I like the spirit of proposal #1 for addressing the critical duplication
> problem, though as Dan points out the logic to choose a related but
> collision-free name might be slightly more complex.
>
> It is a nice bonus that it addresses the less critical issues and improves
> usability for manual inspections and interventions.
>
> The term "sibling" is being slightly misused here. I'd say #1 as proposed
> is a "sibling of the parent" while today's behavior is "sibling". I'd say a
> root cause of multiple problems is that our sharded file format is "a bunch
> of files next to each other" and the sibling is "other files in the same
> directory" so it takes some care, and explicit file name tracking instead
> of globbing, to work with it correctly.
>
>  AFAIK (corrections welcome) there is nothing special about
> Write.to("s3://bucket/file") meaning write to
> "s3://bucket/file-$shardnum-of-$totalshards". An alternative that seems
> superior is to write to "s3://bucket/file/$shardnum-of-$totalshards" with
> the convention that this prefix is fully owned by this file. Now the prefix
> "s3://bucket/file/" _is_ the sharded file. It is conceptually simpler and
> more glob and UI friendly. (any non "-" character would work for GCS and
> S3, but the "/" convention is better, considering the broader world)
>
> And bringing it back to this thread, the "sibling" is no longer "more files
> in the same directory" now "s3://bucket/file-temp-$uid" which is on the
> same filesystem with the same ACLs. It is also more UI friendly, easier to
> clean up, and does more to explicitly indicate that this is really one
> sharded file. Perhaps there's a pitfall I am overlooking?

Using directories rather than prefixes is a big change, and introduces
complications like dealing with hidden dot files (some placed
implicitly by the system or applications, and worrying about
executable bits rather than just the rw ones and possibly more
complicated permission inheritance).

> Also since you mentioned local file support, FWIW the cleanup glob "file-*"
> today breaks on Windows due to Java library vagaries, while "file/*" would
> succeed.
> On Thu, Oct 20, 2016, 09:14 Dan Halperin 
> wrote:
>
> This thread is conflating many issues.
>
> * Putting temp files where they will not match the glob for the desired
> output files
> * Dealing with eventually-consistent filesystems (s3, GCS, ...)
> * Properly cleaning up all temp files
>
> They all need to get solved, but for now I think we only need to solve  the
> first one.
>
> Siblings fundamentally will not work. Consider the following
> perfectly-valid output path: s3://bucket/file-SSS-NNN.txt . A sibling would
> be a new bucket, so not guaranteed to exist.
>
> On Thu, Oct 20, 2016 at 1:57 AM, Chamikara Jayalath 
> wrote:
>
>> Can this be prevented by moving temporary files (copy + delete
>> individually) at finalization instead of copying all of them and
> performing
>> a bulk delete ? You can support task failures by ignoring renames when the
>> destination exists. Python SDK currently does this (and puts temp files in
>> a sub-directory).
>>
>> Thanks,
>> Cham
>>
>> On Wed, Oct 19, 2016 at 6:25 PM Eugene Kirpichov
>>  wrote:
>>
>> Hello,
>>
>> This is a continuation of the discussion on PR
>> https://github.com/apache/incubator-beam/pull/1050 which turned out more
>> complex than expected.
>>
>> Short summary:
>> Currently FileBasedSink, when writing to /path/to/foo (in practice,
>> /path/to/foo-x-of-y where y is the total number of output
>> files), puts temporary files into /path/to/foo-temp-$uid, and when
>> finalizing the sink, it removes the temporary files by matching the
> pattern
>> /path/to/foo-temp-* and removing everything that matches.
>>
>> There are a couple of issues with this:
>> - FileBasedSink uses IOChannelFactory, which currently supports local
>> filesystems and Google Cloud Storage (GCS). GCS's match() operation is
>> currently eventually consistent. So, it may fail to return some of the
>> files, so we won't remove them.
>> - If the Beam job is cancelled or fails midway, then the temp files won't
>> be deleted at all (that's subject to a separate discussion on cleanup API
> -
>> AFAIK there's no JIRA for it yet, I believe peihe@ was thinking about this
>> and was going to file one).
>> - If a follow-up data processing job is reading /path/to/foo, then the way
>> temp files are named, they will likely match the same glob pattern (e.g.
>> "/path/to/foo*") as the one intending to match the final output in
>> /path/to/foo, so if some temp files are leftover, the follow-up job will
>> effectively read duplicate records (some from /path/to/foo, some from
>> /path/to/foo-temp-$blah).
>>
>> I think, in the absence of a way to guarantee that all temp files will be
>> deleted (I think it'd be very difficult or impossible to provide a hard
>> guarantee of this, considering various possible failure conditio

Re: Placement of temporary files by FileBasedSink

2016-10-20 Thread Robert Bradshaw
Another option would be to just use /path/to/temp-foo-$uid to avoid
matching /path/to/foo-* (hoping of course the temp- or whatever prefix
doesn't match anything).

I see #2 causing all sorts of issues, and #3 would be a significant
reduction in usability. I would lean towards doing
/path/to/temp-beam-foo-$uid/$another_uid when possible, and
/path/to/temp-beam-foo-$uid-$another_uid otherwise (note the dash
instead of the slash). The logic of determining "when possible" seems
like it belongs in IOChannelFactory not FileBasedSource.


On Thu, Oct 20, 2016 at 9:21 AM, Lukasz Cwik  wrote:
> The issue manifests when a completely different pipeline uses the output of
> the last pipeline as input to the new pipeline and then these temporary
> files are matched in the glob expression.
>
> This happens because FileBasedSource is responsible for creating the
> temporary paths which occurs while processing a bundle. If that bundle
> processing fails, there is no way to guarantee for the runner to even know
> that it existed in our current execution model.
>
> I think there are other potential solutions which require support from the
> runner that aren't being considered since this would all fall under a
> general cleanup API which Eugene referred to. The question for now is the
> solution good enough?
>
> I'm in favor of #1 as well.
>
> I'm against #4 since FileBasedSource could do a pretty good job for all
> filesystems and once there is support for cleanup, FileBasedSource could
> migrate to use it without any changes to the various IOChannelFactory's.
> This prevents us from getting to the place where Hadoop filesystem
> implementation has many many methods.
>
>
> On Thu, Oct 20, 2016 at 1:57 AM, Chamikara Jayalath 
> wrote:
>
>> Can this be prevented by moving temporary files (copy + delete
>> individually) at finalization instead of copying all of them and performing
>> a bulk delete ? You can support task failures by ignoring renames when the
>> destination exists. Python SDK currently does this (and puts temp files in
>> a sub-directory).
>>
>> Thanks,
>> Cham
>>
>> On Wed, Oct 19, 2016 at 6:25 PM Eugene Kirpichov
>>  wrote:
>>
>> Hello,
>>
>> This is a continuation of the discussion on PR
>> https://github.com/apache/incubator-beam/pull/1050 which turned out more
>> complex than expected.
>>
>> Short summary:
>> Currently FileBasedSink, when writing to /path/to/foo (in practice,
>> /path/to/foo-x-of-y where y is the total number of output
>> files), puts temporary files into /path/to/foo-temp-$uid, and when
>> finalizing the sink, it removes the temporary files by matching the pattern
>> /path/to/foo-temp-* and removing everything that matches.
>>
>> There are a couple of issues with this:
>> - FileBasedSink uses IOChannelFactory, which currently supports local
>> filesystems and Google Cloud Storage (GCS). GCS's match() operation is
>> currently eventually consistent. So, it may fail to return some of the
>> files, so we won't remove them.
>> - If the Beam job is cancelled or fails midway, then the temp files won't
>> be deleted at all (that's subject to a separate discussion on cleanup API -
>> AFAIK there's no JIRA for it yet, I believe peihe@ was thinking about this
>> and was going to file one).
>> - If a follow-up data processing job is reading /path/to/foo, then the way
>> temp files are named, they will likely match the same glob pattern (e.g.
>> "/path/to/foo*") as the one intending to match the final output in
>> /path/to/foo, so if some temp files are leftover, the follow-up job will
>> effectively read duplicate records (some from /path/to/foo, some from
>> /path/to/foo-temp-$blah).
>>
>> I think, in the absence of a way to guarantee that all temp files will be
>> deleted (I think it'd be very difficult or impossible to provide a hard
>> guarantee of this, considering various possible failure conditions such as
>> zombie workers), the cleanest way to solve this is put temp files in a
>> location that's unlikely to match the same glob pattern as one that matches
>> the final output.
>>
>> Some options for what that could be:
>> 1. A subdirectory that is a sibling of the final path, sufficiently unique,
>> and unlikely to match the same glob -
>> /path/to/temp-beam-foo-$uid/$another_uid (that's the approach the PR
>> currently takes)
>> 2. A subdirectory under PipelineOptions.tempLocation - this might be flawed
>> because PipelineOptions.tempLocation might be on a different filesystem, or
>> have different ACLs, than the output of the FileBasedSink.
>> 3. A subdirectory that the user *must* explicitly provide on their
>> FileBasedSink. This is a reduction in usability, but there may be cases
>> when this is necessary - e.g. if the final location of the FileBasedSink is
>> such that we can't create siblings to it (e.g. the root path in a GCS
>> bucket - gs://some-bucket/)
>> 4. A subdirectory generated by a new IOChannelFactory call ("give me a temp
>> directory for the given final path")

Re: [DISCUSS] Sources and Runners

2016-10-18 Thread Robert Bradshaw
Eventually we'll be able to communicate intent with the runner much
more directly via the ProcessContinuation object:

https://github.com/apache/incubator-beam/blob/a0f649eaca8d8bd47d22db0ba7150fea1bf07975/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java#L658

On Tue, Oct 18, 2016 at 12:44 PM, Jean-Baptiste Onofré  
wrote:
> Thanks for the update and summary.
>
> Regards
> JB
>
> ⁣
>
> On Oct 18, 2016, 20:47, at 20:47, Amit Sela  wrote:
>>I wanted to summarize here a couple of important points raised in some
>>PRs
>>I was involved with, and while those PRs were about KafkaIO and related
>>to
>>the Spark/Direct runners, some important notes were made and I think
>>they
>>are worth this thread.
>>
>>*Background*:
>>The KafkaIO waits (5 seconds) before starting to read, and (10 millis)
>>between advancing the reader, which is problematic for the Spark runner
>>as
>>it might attempt to read (every microbatch) for a shorter period, and
>>so it
>>will never start.
>>Raghu Angadi mentioned in this conversation
>> that originally
>>the
>>wait period was set to 10 millis for both start() and advance(), but it
>>was
>>changed mostly due to DirectRunner issues.
>>PR#1125  originally
>>attempted to allow runners to configure those properties via
>>PipelineOptions, but Dan Halperin raised some interesting points:
>>
>>   - Readers should return as soon as they are able.
>> - Runners may poll advance() in a loop for a certain period of time if
>>   it returned too fast.
>>   - Runners must tolerate sources that take a long time to start or
>>   advance, because real systems operate that way.
>>
>>Dan suggested (and I clearly agreed) that this should be discussed
>>here.
>>
>>BTW, Thomas Groh mentioned that the DirectRunner is OK now with much
>>shorter wait periods, and PR#1125 now aims to re-set the 5 seconds
>>wait-on-start to 10 millis.
>>I think this is a good example of Dan's points here:  Kafka reader can
>>clearly return much faster (millis not seconds) and the DirectRunner
>>accommodates this now by reusing it's readers.
>>
>>I Hope I didn't forget/misunderstood anything (please correct me if I
>>did).
>>
>>Thanks,
>>Amit


Re: Specifying type arguments for generic PTransform builders

2016-10-14 Thread Robert Bradshaw
On Thu, Oct 13, 2016 at 10:36 PM, Eugene Kirpichov
 wrote:
> I think the choice between #1 or #3 is a red herring - the cases where #3
> is a better choice than #1 are few and far between, and probably not at all
> controversial (e.g. ParDo). So I suggest we drop this part of the
> discussion.

I decided to take a peek at the transforms we currently have, and
actually it seems that most of them fall into the category of having
zero or one "primary, required" arguments intrinsic to what the
transform is, and then possibly some optional ones. I'm becoming even
more a fan of #3--it makes it harder for the user to even construct an
invalid transform (and is better documenting too, both on docs and for
IDE autocompletion, about what's essential vs. the slew of optional
things).

We do loose the "database reader ready-to-go" bit, but I'm not sure
that's such a loss. One can instead have

class CompanyDefaults {
public static DatabaseIO.Read setup(DatabaseIO.Read reader)
}

which is actually superior if DatabaseIO.Read is a base class (or
interface) that may have several implementations.

> Looks like the main contenders for the complex case are #1 (Foo.blah())
> vs. #4 (Foo.Unbound and Foo.Bound).
>
> Dan, can you clarify again what you mean by this:
> "a utility function that gives you a database reader ready-to-go ... but
> independent of the type you want the result to end up as. You can't do
> that if you must bind the type first."
>
> I think this is quite doable with #1:
>
> class CompanyDefaults {
> public static  DatabaseIO.Read defaultDatabaseIO() { return
> DatabaseIO.create().withSettings(blah).withCredentials(blah); }
> }
>
> DatabaseIO.Read source =
> CompanyDefaults.defaultDatabaseIO().withQuery(blah);
>
> All in all, it seems to me that #1 (combined with potentially encapsulating
> parts of the configuration into separate objects, such as
> JdbcConnectionParameters in JdbcIO) is the only option that can do
> everything fairly well, its only downside is having to specify the type,

Having to repeat the type is a significant downside, especially when
your types get long. (Yes, I've faced types that get so verbose you
have to figure out where to put the line breaks.) This is why
inference of template arguments was added to the language, and the
whole reason for the existence of many of Guava's "constructors" like
Lists.newArrayList(), etc. (now obsolete due constructors allowing
inference).

> and it is very easy to implement when you're implementing your own
> transform - which, I agree with Kenn, matters a lot too.
>
> I think that coming up with an easy-to-implement, universally applicable
> pattern matters a lot also because the Beam ecosystem is open, and the set
> of connectors/transforms available to users will not always be as carefully
> curated and reviewed as it is currently - the argument "the implementation
> complexity doesn't matter because the user doesn't see it" will not apply.
> So, ultimately, "are there a lot of good-quality connectors available to
> Beam users" will be equivalent to "is it easy to develop a good-quality
> connector". And the consistency between APIs provided by different
> connectors will matter for the user experience, too.

+1

> On Thu, Oct 13, 2016 at 7:09 PM Kenneth Knowles 
> wrote:
>
>> On Thu, Oct 13, 2016 at 4:55 PM Dan Halperin 
>> wrote:
>> > These
>> > suggestions are motivated by making things easier on transform writers,
>> but
>> > IMO we need to be optimizing for transform users.
>>
>> To be fair to Eugene, he was actually analyzing real code patterns that
>> exists in Beam today, not suggesting new ones.
>>
>> Along those lines, your addition of the BigTableIO pattern is well-taken
>> and my own analysis of that code is #5: "when you don't have a type
>> variable to bind, leave every field blank and validate later. Also, have an
>> XYZOptions object". I believe in the presence of type parameters this
>> reduces to #4 Bound/Unbound classes but it is more palatable in the
>> no-type-variable case. It is also a good choice when varying subsets of
>> parameters might be optional - the Window transform matches this pattern
>> for good reason.
>>
>> The other major drawback of #3 is the inability to provide generic
>> > configuration. For example, a utility function that gives you a database
>> > reader ready-to-go with all the default credentials and options you need
>> --
>> > but independent of the type you want the result to end up as. You can't
>> do
>> > that if you must bind the type first.
>> >
>>
>> This is a compelling use case. It is valuable for configuration to be a
>> first-class object that can be passed around. BigTableOptions is a good
>> example. It isn't in contradiction with #3, but actually fits very nicely.
>>
>> By definition for this default configuration to be first-class it has to be
>> more than an invalid intermediate state of a PTransform's builder methods.
>> Concretely, it would be BigTableIO.defaultOptions(), which 

Re: Specifying type arguments for generic PTransform builders

2016-10-13 Thread Robert Bradshaw
On Thu, Oct 13, 2016 at 4:54 PM, Dan Halperin
 wrote:
> For #3 -- I think we should be VERY careful there. You need to be
> absolutely certain that there will never, ever be another alternative to
> your mandatory argument. For example, you build an option to read from a
> DB, so you supply a .from(String query). Then later, you want to add
> reading just a table directly, so you add fromTable(Table). In this case,
> it's much better to use .read().fromQuery() or .read().fromTable() --
> having ".read()" be the "standard builder a'la #1".

I'm not quite following your argument here. Wouldn't one just add a
.from(Table) overload? (I suppose one could argue "from" should be
"fromQuery", but one would have the same problem migrating
.read().from(String) to .read().fromQuery(String); the extra .read()
buys nothing here).

The point with #3 was that if a transform is typed, and the type comes
from an argument, it's almost certain that type-providing argument (in
one form or another) is required. Even without types, you have to read
from *something*, so requiring that you specify that something upfront
(though possibly not always via the same method) isn't too onerous.

> The other major drawback of #3 is the inability to provide generic
> configuration. For example, a utility function that gives you a database
> reader ready-to-go with all the default credentials and options you need --
> but independent of the type you want the result to end up as. You can't do
> that if you must bind the type first.

That's a fair point, though not insurmountable (one can ask for
credentials directly, or have a method that configures a given object
rather than constructing it, though I agree not quite as nice in some
cases).

> I think that in general all of these patterns are significantly worse in
> the long run than the existing standards, e.g., for BigtableIO. These
> suggestions are motivated by making things easier on transform writers, but
> IMO we need to be optimizing for transform users.

This discussion is optimizing for the user. It is painful to write things like

SomeTransform.create().withFn(typeProvidingFn)

and nested, private class with names like Bound and Unbound leak into
users space with auto completion and IDEs and javadocs, etc.

Bound and Unbound were introduced because we needed a pair of builders
(one with the types bound, the other with the types as-yet unbound).
BigtableIO.Read has no type parameters, so the name itself doesn't
even really make sense there. (I suppose we also have an aversion to
the "new" keyword, or one would just write apply("new
BigtableIO.Read().fromTable(...).withoutValidation()...)")

On the note of transform writers vs transform users, I'd really like
to lower the bar so that all transform users also become transform
authors--PTransorms are the tool one should use to structure a
pipeline (just like we break up ordinary code into functions) and we
should make it as easy as possible to construct and use them lest
people write pipelines that are simply inlines of primitives (like
putting everything in the main method). Extra layers of nesting, even
as convention, is off-putting. We haven't even managed to regularly
follow this in the core library.

> On Fri, Oct 7, 2016 at 4:48 PM, Eugene Kirpichov <
> kirpic...@google.com.invalid> wrote:
>
>> In my original email, all FooBuilder's should be simply Foo. Sorry for the
>> confusion.
>>
>> On Thu, Oct 6, 2016 at 3:08 PM Kenneth Knowles 
>> wrote:
>>
>> > Mostly my thoughts are the same as Robert's. Use #3 whenever possible,
>> > fallback to #1 otherwise, but please consider using informative names for
>> > your methods in all cases.
>> >
>> > #1 GBK.create(): IMO this pattern is best only for transforms where
>> > withBar is optional or there is no such method, as in GBK. If it is
>> > mandatory, it should just be required in the first method, eliding the
>> > issue, as in ParDo.of(DoFn), MapElements.via(...), etc, like you
>> say
>> > in your concluding remark.
>> >
>> > #2 FooBuilder FooBuilder.create(): this too - if you are going to fix
>> > the type, fix it first. If it is optional and Foo is usable as a
>> > transform, then sure. (it would have be something weird like Foo> > OutputT, ?> extends PTransform)
>> >
>> > #3 Foo.create(Bar): this is best. Do this whenever possible. From my
>> > perspective, instead of "move the param to create(...)" I would describe
>> > this as "delete create() then rename withBar to create". Just skip the
>> > second step and you are in an even better design, withBar being the
>> > starting point. Just like ParDo.of and MapElements.via.
>> >
>> > #4 Dislike this, too, for the same reasons as #2 plus code bloat plus
>> user
>> > confusion.
>> >
>> > Side note since you use this method in all your examples: This kind of
>> use
>> > of "create" is a bad method name. There may be no new object "created".
>> > Sometimes we have no better idea, but create() is a poor default. For GBK
>> > both ar

Re: Simplifying User-Defined Metrics in Beam

2016-10-12 Thread Robert Bradshaw
+1 to the new metrics design. I strongly favor B as well.

On Wed, Oct 12, 2016 at 10:54 AM, Kenneth Knowles
 wrote:
> Correction: In my eagerness to see the end of aggregators, I mistook the
> intention. Both A and B leave aggregators in place until there is a
> replacement. In which case, I am strongly in favor of B. As soon as we can
> remove aggregators, I think we should.
>
> On Wed, Oct 12, 2016 at 10:48 AM Kenneth Knowles  wrote:
>
>> Huzzah! This is IMO a really great change. I agree that we can get
>> something in to allow work to continue, and improve the API as we learn.
>>
>> On Wed, Oct 12, 2016 at 10:20 AM Ben Chambers 
>> wrote:
>>
>> 3. One open question is what to do with Aggregators. In the doc I mentioned
>>
>> that long term I'd like to consider whether we can improve Aggregators to
>> be a better fit for the model by supporting windowing and allowing them to
>> serve as input for future steps. In the interim it's not clear what we
>> should do with them. The two obvious (and extreme) options seem to be:
>>
>>
>>
>>   Option A: Do nothing, leave aggregators as they are until we revisit.
>>
>>
>>   Option B: Remove aggregators from the SDK until we revisit.
>>
>> I'd like to suggest removing Aggregators once the existing runners have
>> reasonable support for Metrics. Doing so reduces the surface area we need
>> to maintain/support and simplifies other changes being made. It will also
>> allow us to revisit them from a clean slate.
>>
>>
>> +1 to removing aggregators, either of A or B. The new metrics design
>> addresses aggregator use cases as well or better.
>>
>> So A vs B is a choice of whether we have a gap with no aggregator or
>> metrics-like functionality. I think that is perhaps a bit of a bummer for
>> users, and we will likely port over the runner code for it, so we wouldn't
>> want to actually delete it, right? Can we do it in a week or two?
>>
>> One thing motivating me to do this quickly: Currently the new DoFn does
>> not have its own implementation of aggregators, but leverages that of
>> OldDoFn, so we cannot remove OldDoFn until either (1) new DoFn
>> re-implements the aggregator instantiation and worker-side delegation (not
>> hard, but it is throwaway code) or (2) aggregators are removed. This
>> dependency also makes running the new DoFn directly (required for the state
>> API) a bit more annoying.
>>


Re: Introducing a Redistribute transform

2016-10-11 Thread Robert Bradshaw
Actually, Redistribute.perKey seems a bit dangerous, as there's no
guarantee the partitioning is persisted to any subsequent steps, and
we don't have a concrete notion of key-partitioned elements outside of
GBK in the model. I suspect it was only introduced because that's what
Redistribute.arbitrarily() is built on.

On Tue, Oct 11, 2016 at 10:16 AM, Ben Chambers  wrote:
> As Kenn points out, I think the nature of the Redistribute operation is to
> act as a hint (or requirement) to the runner that a certain distribution
> the elements is desirable. In a perfect this wouldn't be necessary because
> every runner would be able to do exactly the right thing. Looking at the
> different use cases may be helpful:
>
> 1. Redistribute.arbitrarily being used in IO as a fusion break and
> checkpoint. We could express this as a hint saying that we'd like to
> persist the PCollection at this point.
> 2. Redistribute.perKey being used to checkpoint keys in a keyed
> PCollection. I think this could be the same as the previous hint or a
> variant thereof.
> 3. Redistribute.perKey to ensure that the elements are distributed across
> machines such that all elements with a specific key are on the same
> machine. This should only be necessary for per-key processing (such as
> state) and can be added by the runner when necessary (becomes easier once
> we have a notion of transforms that preserve key-partitioning, etc.)
>
> Of these 1 and 2 seem to be the most interesting. The hint can be
> implemented in various ways -- a transform that represents the hint (and
> the runner can then implement as it sees fit) or via a method that sets
> some property on the PCollection, to which the runner could choose to apply
> a transform. I lean towards the former (keeping this as a transform) since
> it fits more naturally into the codebase and doesn't require extending
> PCollection (something we avoid).
>
> What if this was something like: ".apply(Hints.checkpoint())" or
> ".apply(Hints.break())"? This makes it clearer that this is a hint to the
> runner and not part of the semantics?
>
> On Tue, Oct 11, 2016 at 10:09 AM Kenneth Knowles 
> wrote:
>
>> On Mon, Oct 10, 2016 at 1:38 PM Eugene Kirpichov
>>
>>  wrote:
>>
>>
>>
>> > The transform, the way it's implemented, actually does several things at
>>
>> > the same time and that's why it's tricky to document it.
>>
>> >
>>
>>
>>
>> This thread has actually made me less sure about my thoughts on this
>>
>> transform. I do know what the transform is about and I do think we need it.
>>
>> But I don't know that it can be explained "within the model". Look at our
>>
>> classic questions about Redistribute.arbitrarily() and
>> Redistribute.byKey():
>>
>>
>>
>>  - "what" is it computing? The identity on its input.
>>
>>  - "where" is the event time windowing? Same as its input.
>>
>>  - "when" is output produced? As fast as reasonable (runner-specific).
>>
>>  - "how" are refinements related? Same as its input (I think this might
>>
>> actually be incorrect if accumulating fired panes)
>>
>>
>>
>> These points don't describe any of the real goals of Redistribute. Hence
>>
>> describing it in terms of fusion and checkpointing, which are quite
>>
>> runner-specific in their (optional) manifestations.
>>
>>
>>
>> - Introduces a fusion barrier (in runners that have it), making sure that
>>
>> > the runner can fully parallelize processing the output PCollection with
>>
>> > DoFn's
>>
>> >
>>
>>
>>
>> Can a runner introduce other fusion barriers whenever it wants? Yes.
>>
>> Can a runner ignore a proposed fusion barrier? Yes. (or when can it not?
>>
>> why not?)
>>
>>
>>
>>
>>
>> > - Introduces a fault-tolerance barrier, effectively "checkpointing" the
>>
>> > input PCollection (again, in runners where it makes sense) and making
>> sure
>>
>> > that processing elements of the output PCollection with a DoFn, if the
>> DoFn
>>
>> > fails, will redo only that processing, but not need to recompute the
>> input
>>
>> > PCollection.
>>
>> >
>>
>>
>>
>> Can a runner introduce a checkpoint whenever appropriate? Yes.
>>
>> Can a runner ignore a hint to checkpoint? Yes (if it can still compute the
>>
>> same result - it may not even conceive of checkpointing in a compatible
>>
>> way).
>>
>>
>>
>> - All of the above and also makes the collection "key-partitioned", giving
>>
>> > access to per-key state to downstream key-preserving DoFns. However, this
>>
>> > is also runner-specific, because it's conceivable that a runner might not
>>
>> > need this "key-partitioned" property (in fact it's best if a runner
>>
>> > inserted such a "redistribute by key" automatically if it needs it...),
>> and
>>
>> > it currently isn't exposed anyway.
>>
>> >
>>
>>
>>
>> Agreed. The runner should insert the necessary keying wherever needed. One
>>
>> might say the same for other uses of Redistribute, but in practice hints
>>
>> are useful.
>>
>>
>>
>>
>>
>> > Still thinking about the best way to describe this in a way that's le

Re: Introducing a Redistribute transform

2016-10-10 Thread Robert Bradshaw
On Mon, Oct 10, 2016 at 12:57 PM, Amit Sela  wrote:

>> > So this is basically a "FanOut" transformation which will depend on the
>>
>> > available resources of the runner (and the uniqueness of the assigned
>> keys)
>>
>> > ?
>>
>> >
>>
>> > Would we want to Redistribute into a user-defined number of bundles (>
>>
>> > current) ?
>>
>>
>>
>> I don't think there's any advantage to letting the user specify a
>>
>> number here; the data is spread out among as many machines as are
>>
>> handling the shuffling (for N elements, there are ~N unique keys,
>>
>> which gets partitioned by the system to the M workers).
>>
>>
>>
>> > How about "FanIn" ?
>>
>>
>>
>> Could you clarify what you would hope to use this for?
>>
> Well, what if for some reason I would want to limit parallelism for a step
> in the Pipeline ? like calling an external service without "DDoS"ing it ?

I think this is something is more difficult to enforce without
runner-specific support. For example, if one writes

input.apply(Redistribute(N)).apply(ParDo(...))

one is assuming that fusion takes place such that the subsequent ParDo
doesn't happen to get processed by more-than-expected shards. It's
also much simpler to spread the elements out among 2^64 keys than
spread them out to a small N keys, and choosing exactly N keys isn't
necessarily the best way to enforce parallelism constraints (as this
would likely introduce stragglers). One typically wants to reduce
parallelism over a portion (interval?) of a pipeline, whereas
redistribution operates at a point in your pipeline.

I agree that being able to limit parallelism (possibly dynamically
based on pushback from an external service, or noting that throughput
is no longer scaling linearly) would be a useful feature to have, but
that's a bit out of scope here.


Re: Introducing a Redistribute transform

2016-10-10 Thread Robert Bradshaw
On Sat, Oct 8, 2016 at 7:31 AM, Amit Sela  wrote:
> Hi Eugene,
>
> This is very interesting.
> Let me see if I get this right, the "Redistribute"  transformation assigns
> a "running id" key (per-bundle) , calls "Redistribute.byKey", and extracts
> back the values, correct ?

The keys are (pseudorandomly) unique per element.

> As for "Redistribute.byKey" - it's made of a GroupByKey transformation that
> follows a Window transformation that neutralises the "resolution" of
> triggers and panes that usually occurs in GroupByKey, correct ?
>
> So this is basically a "FanOut" transformation which will depend on the
> available resources of the runner (and the uniqueness of the assigned keys)
> ?
>
> Would we want to Redistribute into a user-defined number of bundles (>
> current) ?

I don't think there's any advantage to letting the user specify a
number here; the data is spread out among as many machines as are
handling the shuffling (for N elements, there are ~N unique keys,
which gets partitioned by the system to the M workers).

> How about "FanIn" ?

Could you clarify what you would hope to use this for?

> On Fri, Oct 7, 2016 at 10:49 PM Eugene Kirpichov
>  wrote:
>
>> Hello,
>>
>> Heads up that https://github.com/apache/incubator-beam/pull/1036 will
>> introduce a transform called "Redistribute", encapsulating a relatively
>> common pattern - a "fusion break" [see
>>
>> https://cloud.google.com/dataflow/service/dataflow-service-desc#preventing-fusion
>> previously
>> providing advice on that] - useful e.g. when you write an IO as a sequence
>> of ParDo's: split a query into parts, read each part, and you want to
>> prevent fusing these ParDo's because that would make the whole thing
>> execute sequentially, and in other similar cases.
>>
>> The PR also uses it, as an example, in DatastoreIO and JdbcIO, both of
>> which used to have a hand-rolled implementation of the same. The Write
>> transform has something similar, but not quite identical, so I skipped it.
>>
>> This is not a model change - merely providing a common implementation of
>> something useful that already existed but was scattered across the
>> codebase.
>>
>> Redistribute also subsumes the old mostly-internal Reshuffle transform via
>> Redistribute.byKey().
>>
>> I tried finding more cases in the Beam codebase that have an ad-hoc
>> implementation of this; I did not find any, but I might have missed
>> something. I suppose the transform will need to be advertised in
>> documentation on best-practices for connector development; perhaps some
>> StackOverflow answers should be updated; any other places?
>>


Re: [DISCUSS] UnboundedSource and the KafkaIO.

2016-10-10 Thread Robert Bradshaw
Just looking to the future, have you given any thought on how well
this would work on https://s.apache.org/splittable-do-fn?

On Mon, Oct 10, 2016 at 6:35 AM, Amit Sela  wrote:
> Thanks Max!
>
> I'll try to explain Spark's stateful operators and how/why I used them with
> UnboundedSource.
>
> Spark has two stateful operators: *updateStateByKey* and *mapWithState*.
> Since updateStateByKey is bound to output the (updated) state itself - the
> CheckpointMark in our case - we're left with mapWithState.
> mapWithState provides a persistent, distributed "map-like", that is
> partitioned according to the stream. This is indeed how I manage state
> between micro-batches.
> However, mapWithState (like any map) will give you a value (state)
> corresponding to a specific key, so I use a running-id from the initial
> splitting to identify the appropriate state.
> I took a look at Flink's implementation ( I do that sometimes ;-) ) and I
> could do the same and save the split source with the CheckpointMark but
> it'll still have to correspond to the same id, and since I had to wrap the
> split Source to perform a sort of "BoundedReadFromUnboundedSource" I simply
> added an id field and I'm hashing by that id.
> I'll also add that the stateful operator can only be applied to a
> (Pair)Stream and not to input operators so I'm actually generating a stream
> of splits (the same ones for every micro-batch) and reading from within the
> mappingFunction of the mapWithState.
>
> It's not the simplest design, but given how Spark's persistent state and
> InputDStream are designed comparing to the Beam model, I don't see another
> way - though I'd be happy to hear one!
>
> Pretty sure I've added this here but no harm in adding the link again: design
> doc
> 
> and
> a work-in-progress branch
>  all
> mentioned in the ticket  as
> well.
> The design doc also relates to how "pure" Spark works with Kafka, which I
> think is interesting and very different from Flink/Dataflow.
>
> Hope this helped clear things up a little, please keep on asking if
> something is not clear yet.
>
> Thanks,
> Amit.
>
> On Mon, Oct 10, 2016 at 4:02 PM Maximilian Michels  wrote:
>
>> Just to add a comment from the Flink side and its
>>
>> UnboundedSourceWrapper. We experienced the only way to guarantee
>>
>> deterministic splitting of the source, was to generate the splits upon
>>
>> creation of the source and then checkpoint the assignment during
>>
>> runtime. When restoring from a checkpoint, the same reader
>>
>> configuration is restored. It's not possible to change the splitting
>>
>> after the initial splitting has taken place. However, Flink will soon
>>
>> be able to repartition the operator state upon restart/rescaling of a
>>
>> job.
>>
>>
>>
>> Does Spark have a way to pass state of a previous mini batch to the
>>
>> current mini batch? If so, you could restore the last configuration
>>
>> and continue reading from the checkpointed offset. You just have to
>>
>> checkpoint before the mini batch ends.
>>
>>
>>
>> -Max
>>
>>
>>
>> On Mon, Oct 10, 2016 at 10:38 AM, Jean-Baptiste Onofré 
>> wrote:
>>
>> > Hi Amit,
>>
>> >
>>
>> > thanks for the explanation.
>>
>> >
>>
>> > For 4, you are right, it's slightly different from DataXchange (related
>> to
>>
>> > the elements in the PCollection). I think storing the "starting point"
>> for a
>>
>> > reader makes sense.
>>
>> >
>>
>> > Regards
>>
>> > JB
>>
>> >
>>
>> >
>>
>> > On 10/10/2016 10:33 AM, Amit Sela wrote:
>>
>> >>
>>
>> >> Inline, thanks JB!
>>
>> >>
>>
>> >> On Mon, Oct 10, 2016 at 9:01 AM Jean-Baptiste Onofré 
>>
>> >> wrote:
>>
>> >>
>>
>> >>> Hi Amit,
>>
>> >>>
>>
>> >>>
>>
>> >>>
>>
>> >>> For 1., the runner is responsible of the checkpoint storage (associated
>>
>> >>>
>>
>> >>> with the source). It's the way for the runner to retry and know the
>>
>> >>>
>>
>> >>> failed bundles.
>>
>> >>>
>>
>> >> True, this was a recap/summary of another, not-so-clear, thread.
>>
>> >>
>>
>> >>>
>>
>> >>>
>>
>> >>>
>>
>> >>> For 4, are you proposing that KafkaRecord store additional metadata for
>>
>> >>>
>>
>> >>> that ? It sounds like what I proposed in the "Technical Vision"
>> appendix
>>
>> >>>
>>
>> >>> document: there I proposed to introduce a DataXchange object that store
>>
>> >>>
>>
>> >>> some additional metadata (like offset) used by the runner. It would be
>>
>> >>>
>>
>> >>> the same with SDF as the tracker state should be persistent as well.
>>
>> >>>
>>
>> >> I think I was more focused on persisting the "starting point" for a
>>
>> >> reader,
>>
>> >> even if no records were read (yet), so that the next time the reader
>>
>> >> attempts to read it will pick of there. This has more to do with how the
>>
>> >> CheckpointMark handles this.
>>
>> >> I have to say that I'm not familia

Re: Specifying type arguments for generic PTransform builders

2016-10-06 Thread Robert Bradshaw
Thanks for bringing this up. This inconsistency is something that has
bothered me as well.

Personally, I'm a fan of #3 when there's an obvious, required,
type-providing argument, and #1 otherwise. Note that these two are not
necessarily mutually exclusive.

Another thing to keep in mind is that Java8 is a bit smarter about
inferring types, and can do so from the context, e.g. one can write
pc.apply(GroupByKey.<>create()) and the types of pc will populate the
types of GroupByKey.create(). It might be able to infer from later
arguments as well.

- Robert



On Thu, Oct 6, 2016 at 2:09 PM, Eugene Kirpichov
 wrote:
> Quite a few transforms in the SDK are generic (i.e. have type parameters),
> e.g. ParDo, GroupByKey, Keys / WithKeys, many connectors (TextIO, KafkaIO,
> JdbcIO, MongoDbGridFSIO etc - both read and write). They use different
> styles of binding the type parameters to concrete types in caller code.
>
> I would like us to make a decision which of those styles to recommend for
> new transform and connectors writers. This question is coming up rather
> frequently, e.g. it came up in JdbcIO and MongoDbGridFSIO.
>
> For the purpose of this discussion, imagine a hypothetical builder class
> that looks like this:
>
> class Foo {
> private Bar bar;
> private int blah;
>
> Foo withBlah(int blah);
> }
>
> So far I've seen several styles of binding the type argument in a withBar()
> method vs. a creation method:
>
> 1. Binding at the creation method: e.g.:
>
> class Foo {
> ...
> public static  Foo create();
> public FooBuilder withBar(Bar bar);
> }
>
> Foo foo = Foo.create().withBlah(42).withBar(new
> StringBar());
>
> Example: GroupByKey does this. As well as other transforms that don't have
> a withBar()-like method, but still need a type argument, e.g. Keys.
>
> Pros: completely unambiguous, easy to code, interacts well with @AutoValue
> Cons: need to specify type once at call site.
>
> 2. Binding at a method that takes an argument of the given type (let us
> call it "a constraint argument"), e.g.:
>
> class Foo {
> ...
> public static FooBuilder create();
> public  FooBuilder withBar(Bar bar);
> }
>
> Foo foo = Foo.create().withBlah(42).withBar(new StringBar());
>
> Example: KafkaIO
> https://github.com/apache/incubator-beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L280
>
> Pros: don't need to specify type at call site.
> Cons: doesn't interact well with @AutoValue (it doesn't support builder
> methods that change type) - requires unchecked conversions.
>
> 3. Forcing to provide a "constraint argument" in the creation method:
>
> class Foo {
> ...
> public static  FooBuilder create(Bar bar);
> // (do not provide withBar)
> }
>
> Foo foo = Foo.create(new StringBar()).withBlah(42);
>
> Example: WithKeys
> https://github.com/apache/incubator-beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/WithKeys.java,
> ParDo
>
> Pros: easy to code, interacts ok with @AutoValue, don't need to specify
> type at call site.
> Cons: need to supply all constraint arguments in the create method, so they
> are treated differently from other arguments.
>
> 4. Splitting the builder into a "bound" and "unbound" class:
>
> class Foo {
> Unbound create();
>
> class Unbound {
> public Unbound withBlah(int blah);
> public  Bound withBar(Bar bar);
> }
>
> class Bound {
> public Bound withBlah(int blah);
> }
> }
>
> Foo.Bound foo = Foo.create().withBlah(42).withBar(new StringBar());
>
> Example: TextIO.Read
> https://github.com/apache/incubator-beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
> Pros: even more type-safe at call site than the others (using an incomplete
> builder is a compile error)
> Cons: very cumbersome to implement, lots of confusion between "(un)bound"
> and "(un)bounded", tempting for clients to use ugly variable names such as
> "Foo.Bound bound = ..." (rather than "foo")
>
> 
>
> I'd like to argue in favor of #1, because:
> - It makes sense for transforms like Keys.create() which do not have a
> "constraint argument", so we can have consistency between such transforms
> and the others.
> - It is the simplest to implement, and causes the fewest amount of
> generics-related confusion when reading the implementation code.
> - It interacts well with @AutoValue builders.
>
> The only downside is that you have to specify the generic argument at call
> site, but I think this is acceptable given the benefits of consistency,
> unambiguity and providing a pattern that's easy to follow for future
> transform writers.
>
> Of course, there should be an exception for cases when there is a very
> small and fixed number of arguments, or when it's clear that the
> "constraint argument" is the most important one - e.g. ParDo.of(DoFn)
> should *not* be changed to ParDo.create().withFn(DoFn). Also,
> I'm not suggesting making changes to existing transforms, only deciding
> which pattern to recommend for new transforms.
>
> W

Re: Runtime Windows/Aggregation Computations in Beam

2016-09-21 Thread Robert Bradshaw
This may be possible with a custom WindowFn. Where is the configuration of
what aggregations to do coming from?

On Wed, Sep 21, 2016 at 11:27 PM, Chawla,Sumit 
wrote:

> Attaching the Image.
>
>
> ​
>
> Regards
> Sumit Chawla
>
>
> On Wed, Sep 21, 2016 at 11:24 PM, Chawla,Sumit 
> wrote:
>
>> Hi All
>>
>> I am trying to code a solution for following scenarios.
>>
>> 1.  I have a stream of Tuples with multiple numeric fields (e.g. A, B, C,
>> D, E ... etc )
>> 2.  I want the ability to do different Windowing and Aggregation on each
>> field or a group of fields in the Tuple.  e.g. Sum A over a Period of 2
>> minutes, Avg B over a period of 3 minutes,  Sum of C grouped by D over a
>> period of 15 minutes
>> 3.  *These window requirements can be added by user at runtime*.  My
>> pipeline should be able to compute a new aggregation at runtime.
>> 4.  Plan to support only simple aggregation windows like SUM, AVG, MAX,
>> MIN, COUNT etc.
>>
>>
>> As i understand in BEAM pipelines ( with Flink Runner), the DAG of
>> computations cannot be altered once the pipeline is deployed.  I am trying
>> to see how can i support above use case.  I would love to hear your
>> feedback on this, and suggestions on doing it in a completely different
>> way.
>>
>> *My Design:*
>>
>> 1.   Create 1 minute buckets per Field or Group of Fields and compute
>> basic aggregations for bucket.  e.g.  Buckets are highlighted in Yellow
>> here.  For each field i calculate [SUM, COUNT, MAX, MIN] in the bucket.  (
>> Bucket size of 1 minute is defined for simplicity, and would limit the
>> minimum window size to 1 minute)
>>
>> 2.  Downstream process these buckets, and compute the user defined
>> aggregations.  Following diagram depicts Tumbling Window computations.  The
>> Aggregation functions in GREEN are just NATIVE functions consuming
>> different buckets, and doing aggregations on top of these buckets.
>>
>>
>>
>>
>> ​
>> ​
>>
>> *P.S.*
>>
>> * Some of the design choices that i have decided not to go for are:*
>>
>> 1.  Multiple Pipelines for doing computation.  One master pipeline does
>> grouping, and sends to a different topic based on user configured window
>> size. (e.g. topic_window_by_5_min, topic_window_by_15_min), and have a
>> different pipeline consume each topic.
>>
>> 2.  Single pipeline doing all the business with predefined Windows
>> defined for Downstream processing. e.g. 5, 15, 30, 60 minute windows will
>> be defined which will consume from different Side Inputs.  User is only
>> allowed only to select these Window sizes.  Upstream Group By operator will
>> route to the data to different Window Function based on user configuration.
>>
>>
>>
>> Regards
>> Sumit Chawla
>>
>>
>


Re: FYI: All Runners Tested In Precommit

2016-09-15 Thread Robert Bradshaw
Woo hoo! Thanks to everyone who made this happen.

On Thu, Sep 15, 2016 at 12:24 PM, Jesse Anderson  wrote:
> Excellent!
>
> On Thu, Sep 15, 2016 at 12:18 PM Frances Perry 
> wrote:
>
>> Awesome! Strong tests are hugely important in a project with so many
>> diverse components.
>>
>> On Thu, Sep 15, 2016 at 12:16 PM, Jason Kuster > invalid> wrote:
>>
>> > Hi all,
>> >
>> > Just a quick update -- as of yesterday all new PRs now run the WordCount
>> > end-to-end test against every runner in master (Flink, Spark, Dataflow,
>> and
>> > Direct). This is a great milestone for testing and will help us ensure
>> > continued high-quality Beam development; thanks to Dan, Amit, Aljoscha,
>> and
>> > Aviem for their help making this happen!
>> >
>> > Best,
>> >
>> > Jason
>> >
>> > --
>> > ---
>> > Jason Kuster
>> > Apache Beam (Incubating) / Google Cloud Dataflow
>> >
>>


Re: About Finishing Triggers

2016-09-14 Thread Robert Bradshaw
On Wed, Sep 14, 2016 at 5:14 AM, Aljoscha Krettek  wrote:
> Hi,
> I had a chat with Kenn at Flink Forward and he did an off-hand remark about
> how it might be better if triggers where not allowed to mark a window as
> finished and instead always be "Repeatedly" (if I understood correctly).
>
> Maybe you (Kenn) could go a bit more in depth about what you meant by this
> and if we should actually change this in Beam. Would this mean that we then
> have the opposite of Repeatedly, i.e. Once, or Only.once(T)?

Once triggers are significantly easier to reason about from a
user/data perspective, especially in the context of further downstream
aggregation.

> I also noticed some inconsistencies in when triggers behave as repeated
> triggers and once triggers. For example, AfterPane.elementCountAtLeast(5)
> only fires once if used alone but it it fires repeatedly if used as the
> speculative trigger in
> AfterWatermark.pastEndOfWindow().withEarlyFirings(...). (This is true for
> all "once" triggers.)

Yes, a repeatedly is applied to the early/late firing specifications.
The watermark itself is once (marks the window as finished, unless
there is a late trigger specified).

https://cloud.google.com/dataflow/java-sdk/JavaDoc/com/google/cloud/dataflow/sdk/transforms/windowing/AfterWatermark.FromEndOfWindow


Re: Proposal: Dynamic PIpelineOptions

2016-08-02 Thread Robert Bradshaw
Being able to "late-bind" parameters like input paths to a
pre-constructed program would be a very useful feature, and I think is
worth adding to Beam.

Of the four API proposals, I have a strong preference for (4).
Further, it seems that these need not be bound to the PipelineOptions
object itself (i.e. a named RuntimeValueSupplier could be constructed
off of a pipeline object), which the Python API makes less heavy use
of (encouraging the user to use familiar, standard libraries for
argument parsing), though of course such integration is useful to
provide for convenience.

- Robert

On Fri, Jul 29, 2016 at 12:14 PM, Sam McVeety  wrote:
> During the graph construction phase, the given SDK generates an initial
> execution graph for the program.  At execution time, this graph is
> executed, either locally or by a service.  Currently, Beam only supports
> parameterization at graph construction time.  Both Flink and Spark supply
> functionality that allows a pre-compiled job to be run without SDK
> interaction with updated runtime parameters.
>
> In its current incarnation, Dataflow can read values of PipelineOptions at
> job submission time, but this requires the presence of an SDK to properly
> encode these values into the job.  We would like to build a common layer
> into the Beam model so that these dynamic options can be properly provided
> to jobs.
>
> Please see
> https://docs.google.com/document/d/1I-iIgWDYasb7ZmXbGBHdok_IK1r1YAJ90JG5Fz0_28o/edit
> for the high-level model, and
> https://docs.google.com/document/d/17I7HeNQmiIfOJi0aI70tgGMMkOSgGi8ZUH-MOnFatZ8/edit
> for
> the specific API proposal.
>
> Cheers,
> Sam


Re: [RESULT] Release version 0.2.0-incubating

2016-08-01 Thread Robert Bradshaw
+1 (binding)

On Sun, Jul 31, 2016 at 10:08 PM, Jean-Baptiste Onofré  
wrote:
> Ni Aljoscha,
>
> it's not required, but easier when creating the result e-mail ;)
>
> Binding vote is vote from the PPMC, and you are PPMC.
>
> Just a reminder for all:
> - to be considered valid, you clearly have to send an e-mail (in reply of
> the vote e-mail) with your vote (+1, -1, 0), INCLUDING the person who send
> the vote e-mail. So, in this case, Dan should have sent a reply to his own
> e-mail with his vote.
> - anybody can vote
> - to pass, a vote has to get minimum of 3 binding +1 (vote from PPMC,
> Podling Project Management Committee member) and no -1
> - if we don't have the 3 binding votes, it's possible to send a reminder and
> to extend the vote for new 72 hours.
> - if the vote pass, it's forwarded to the incubator general mailing list,
> where the IPMC (Incubator Project Management Committee member) will review
> and vote
> - if we have at least 3 binding IPMC votes, and no -1, the version can be
> released
> - when pass, the artefacts are promoted to central, Jira is updated,
> distributions are publish to dist.apache.org, the website is update and an
> announcement e-mail is sent to the mailing lists. For TLP (Top Level
> Project, not yet the case for Beam), we also update reporter.apache.org
>
> Thanks,
> Regards
> JB
>
>
> On 08/01/2016 04:44 AM, Aljoscha Krettek wrote:
>>
>> Ah, it seems I always have to mention it? I would also make mine "+1
>> (binding)"
>>
>> On Sun, 31 Jul 2016 at 12:47 Dan Halperin 
>> wrote:
>>
>>> My apologies: a slight revision. We have 4 approving votes, including 3
>>> binding votes.
>>>
>>> On Sun, Jul 31, 2016 at 12:29 PM, Dan Halperin 
>>> wrote:
>>>
 I'm happy to announce that we have unanimously approved this release.

 There are 3 binding approving votes:
 * Dan Halperin
 * Jean-Baptiste Onofré
 * Amit Sela

 There is a fourth approving vote:
>>>
>>>
 * Aljoscha Krettek (not binding)

>>>
>>>
 There are no disapproving votes.

 At this point, this proposal will be presented to the Apache Incubator
>>>
>>> for

 their review.

 Thanks,
 Dan

>>>
>>
>
> --
> Jean-Baptiste Onofré
> jbono...@apache.org
> http://blog.nanthrax.net
> Talend - http://www.talend.com


Re: [Proposal] Add waitToFinish(), cancel(), waitToRunning() to PipelineResult.

2016-07-25 Thread Robert Bradshaw
+1, sounds great. Thanks Pei.

On Mon, Jul 25, 2016 at 3:28 PM, Lukasz Cwik  wrote:
> +1 for your proposal Pei
>
> On Mon, Jul 25, 2016 at 5:54 PM, Pei He  wrote:
>
>> Looks to me that followings are agreed:
>> (1). adding cancel() and waitUntilFinish() to PipelineResult.
>> (In streaming mode, "all data watermarks reach to infinity" is
>> considered as finished.)
>> (2). PipelineRunner.run() should return relatively quick as soon as
>> the pipeline/job is started/running. The blocking logic should be left
>> to users' code to handle with PipelineResult.waitUntilFinish(). (Test
>> runners that finish quickly can block run() until the execution is
>> done. So, it is cleaner to verify test results after run())
>>
>> I will send out PR for (1), and create jira issues to improve runners for
>> (2).
>>
>> waitToRunning() is controversial, and we have several half way agreed
>> proposals.
>> I will pull them out from this thread, so we can close this proposal
>> with cancel() and waitUntilFinish(). And, i will create a jira issue
>> to track how to support ''waiting until other states".
>>
>> Does that sound good with anyone?
>>
>> Thanks
>> --
>> Pei
>>
>> On Thu, Jul 21, 2016 at 4:32 PM, Robert Bradshaw
>>  wrote:
>> > On Thu, Jul 21, 2016 at 4:18 PM, Ben Chambers 
>> wrote:
>> >> This health check seems redundant with just waiting a while and then
>> >> checking on the status, other than returning earlier in the case of
>> >> reaching a terminal state. What about adding:
>> >>
>> >> /**
>> >>  * Returns the state after waiting the specified duration. Will return
>> >> earlier if the pipeline
>> >>  * reaches a terminal state.
>> >>  */
>> >> State getStateAfter(Duration duration);
>> >>
>> >> This seems to be a useful building block, both for the user's pipeline
>> (in
>> >> case they wanted to build something like wait and then check health) and
>> >> also for the SDK (to implement waitUntilFinished, etc.)
>> >
>> > A generic waitFor(Duration) which may return early if a terminal state
>> > is entered seems useful. I don't know that we need a return value
>> > here, given that we an then query the PipelineResult however we want
>> > once this returns. waitUntilFinished is simply
>> > waitFor(InfiniteDuration).
>> >
>> >> On Thu, Jul 21, 2016 at 4:11 PM Pei He 
>> wrote:
>> >>
>> >>> I am not in favor of supporting wait for every states or
>> >>> waitUntilState(...).
>> >>> One reason is PipelineResult.State is not well defined and is not
>> >>> agreed upon runners.
>> >>> Another reason is users might not want to wait for a particular state.
>> >>> For example,
>> >>> waitUntilFinish() is to wait for a terminal state.
>> >>> So, even runners have different states, we still can define shared
>> >>> properties, such as finished/terminal.
>> >
>> > +1. Running is an intermediate state that doesn't have an obvious
>> > mapping onto all runners, which is another reason it's odd to wait
>> > until then. All runners have terminal states.
>> >
>> >>> I think when users call waitUntilRunning(), they want to make sure the
>> >>> pipeline is up running and is healthy.
>> >> > Maybe we want to wait for at
>> >>> least one element went through the pipeline.
>> >
>> > -1, That might be a while... Also, you may not start generating data
>> > until you pipline is up.
>> >
>> >>> What about changing the waitUntilRunning() to the following?
>> >>>
>> >>> /**
>> >>> * Check if the pipeline is health for the duration.
>> >>> *
>> >>> * Return true if the pipeline is healthy at the end of duration.
>> >>> * Return false if the pipeline is not healthy at the end of duration.
>> >>> * It may return early if the pipeline is in an unrecoverable failure
>> >>> state.
>> >>> */
>> >>> boolean PipelineResult.healthCheck(Duration duration)
>> >>>
>> >>> (I think this also addressed Robert's comment about waitToRunning())
>> >>>
>> >>> On Thu, Jul 21, 2016 at 1:08 PM, Kenneth Knowles
>> 
>> >>> wrote:
>> >>>

Re: [Proposal] Add waitToFinish(), cancel(), waitToRunning() to PipelineResult.

2016-07-21 Thread Robert Bradshaw
On Thu, Jul 21, 2016 at 4:18 PM, Ben Chambers  wrote:
> This health check seems redundant with just waiting a while and then
> checking on the status, other than returning earlier in the case of
> reaching a terminal state. What about adding:
>
> /**
>  * Returns the state after waiting the specified duration. Will return
> earlier if the pipeline
>  * reaches a terminal state.
>  */
> State getStateAfter(Duration duration);
>
> This seems to be a useful building block, both for the user's pipeline (in
> case they wanted to build something like wait and then check health) and
> also for the SDK (to implement waitUntilFinished, etc.)

A generic waitFor(Duration) which may return early if a terminal state
is entered seems useful. I don't know that we need a return value
here, given that we an then query the PipelineResult however we want
once this returns. waitUntilFinished is simply
waitFor(InfiniteDuration).

> On Thu, Jul 21, 2016 at 4:11 PM Pei He  wrote:
>
>> I am not in favor of supporting wait for every states or
>> waitUntilState(...).
>> One reason is PipelineResult.State is not well defined and is not
>> agreed upon runners.
>> Another reason is users might not want to wait for a particular state.
>> For example,
>> waitUntilFinish() is to wait for a terminal state.
>> So, even runners have different states, we still can define shared
>> properties, such as finished/terminal.

+1. Running is an intermediate state that doesn't have an obvious
mapping onto all runners, which is another reason it's odd to wait
until then. All runners have terminal states.

>> I think when users call waitUntilRunning(), they want to make sure the
>> pipeline is up running and is healthy.
> > Maybe we want to wait for at
>> least one element went through the pipeline.

-1, That might be a while... Also, you may not start generating data
until you pipline is up.

>> What about changing the waitUntilRunning() to the following?
>>
>> /**
>> * Check if the pipeline is health for the duration.
>> *
>> * Return true if the pipeline is healthy at the end of duration.
>> * Return false if the pipeline is not healthy at the end of duration.
>> * It may return early if the pipeline is in an unrecoverable failure
>> state.
>> */
>> boolean PipelineResult.healthCheck(Duration duration)
>>
>> (I think this also addressed Robert's comment about waitToRunning())
>>
>> On Thu, Jul 21, 2016 at 1:08 PM, Kenneth Knowles 
>> wrote:
>> > Some more comments:
>> >
>> >  - What are the allowed/expected state transitions prior to RUNNING?
>> Today,
>> > I presume it is any nonterminal state, so it can be UNKNOWN or STOPPED
>> > (which really means "not yet started") prior to RUNNING. Is this what we
>> > want?
>> >
>> >  - If a job can be paused, a transition from RUNNING to STOPPED, then
>> > waitUntilPaused(Duration) makes sense.
>> >
>> >  - Assuming there is some polling under the hood, are runners required to
>> > send back a full history of transitions? Or can transitions be missed,
>> with
>> > only the latest state retrieved?
>> >
>> >  - If the latter, then does waitUntilRunning() only wait until RUNNING or
>> > does it also return when it sees STOPPED, which could certainly indicate
>> > that the job transitioned to RUNNING then STOPPED in between polls. In
>> that
>> > case it is, today, the same as waitUntilStateIsKnown().
>> >
>> >  - The obvious limit of this discussion is waitUntilState(Duration,
>> > Set), which is the same amount of work to implement. Am I correct
>> > that everyone in this thread thinks this generality is just not the right
>> > thing for a user API?
>> >
>> >  - This enum could probably use revision. I'd chose some combination of
>> > tightening the enum, making it extensible, and make some aspect of it
>> > free-form. Not sure where the best balance lies.
>> >
>> >
>> >
>> > On Thu, Jul 21, 2016 at 12:47 PM, Ben Chambers
>> > >> wrote:
>> >
>> >> (Minor Issue: I'd propose waitUntilDone and waitUntilRunning rather than
>> >> waitToRunning which reads oddly)
>> >>
>> >> The only reason to separate submission from waitUntilRunning would be if
>> >> you wanted to kick off several pipelines in quick succession, then wait
>> for
>> >> them all to be running. For instance:
>> >>
>> >> PipelineResult p1Future = p1.run();
>> >

Re: [Proposal] Add waitToFinish(), cancel(), waitToRunning() to PipelineResult.

2016-07-21 Thread Robert Bradshaw
On Thu, Jul 21, 2016 at 3:24 PM, Pei He  wrote:
> I think the two streaming use cases can be done in users code by:
> 1. sleeping to block for a Duration.
> 2. catch the interrupt signal (such as CTRL-C), and then call
> PipelineResult.cancel().

I think it's risky to kill the launched pipeline when the local
program terminates (unexpectedly or otherwise). There's also the
usecase of starting your streaming pipeline and having it periodically
print out counters, which runs until you are happy with the results,
your screen session terminates, you restart your computer, or
whatever.

> But, it brought up the question what WaitUntilFinish() should do in
> streaming mode:
> 1. Can we define finish in streaming mode as "all data watermarks
> reach to infinity"? This is when a streaming pipeline finished
> processing bounded sources.

Yes.

> 2. If there are unbounded sources, WaitUntilFinish() will throw.
> (PCollections have the information whether it is bounded or unbounded)

I don't think sources can always know. (E.g. you may have a
technically unbounded source that decides at some point it's actually
done.)

> On Thu, Jul 21, 2016 at 6:45 AM, Amit Sela  wrote:
>> Generally this makes sense, though I'm more comfortable thinking of it more
>> in the sense of:
>>
>>1. batch
>>   1. blocking - wait (for results)
>>   2. non-blocking.
>>2. streaming
>>   1. blocking - wait(Duration)
>>   2. blocking - waitForInterruption() - some signal that terminates the
>>   job.
>>   3. non-blocking.
>>
>> My 2¢,
>> Amit
>>
>> On Thu, Jul 21, 2016 at 1:39 AM Pei He  wrote:
>>
>>> Hi everyone,
>>> Here is a proposal to address the following issue:
>>> JIRA issue https://issues.apache.org/jira/browse/BEAM-443
>>>
>>> Currently, users doesn’t have a consistent way to wait for the
>>> pipeline to finish. Different runners have different implementations.
>>> For example:
>>> 1. DirectRunner have a isBlockOnRun in DirectOptions, and users can
>>> configure it by setting this options.
>>> 2. Dataflow have a separate BlockingDataflowRunner, and users can
>>> switch runners to control blocking v.s non-blocking.
>>> 3. Flink and Spark runners might or might not block depends on their
>>> implementations of run().
>>>
>>> Proposal:
>>> Users control whether to wait for the pipeline to finish through
>>> PipelineResult, and be able to cancel a running pipeline.
>>> 1. Add PipelineResult.waitToFinish(Duration)
>>> 2. Add PipelineResult.cancel()
>>> 3. Add PipelineResult.waitToRunning(Duration)
>>> 4. PipelineRunner.run() should (but not required) do non-blocking runs
>>>
>>> UserCode Scenarios:
>>> // Case 1: don't care whether to block
>>> PipelineResult result = pipeline.run();
>>>
>>> // Case 2: wait to finish, and inspect the result.
>>> PipelineResult result = pipeline.run();
>>> result.waitToFinish(Duration);
>>> result.getX(...)
>>>
>>> // Case 3: run multiple pipelines, and inspect results.
>>> for (int i = 0; i < 10; ++i ) {
>>>  pipeline[i].run();
>>> }
>>> … poll statuses and inspect results …
>>>
>>> // Case 4: test streaming pipeline
>>> PipelineResult result = pipeline.run();
>>> result.waitToRunning(Duration);
>>> result.getAggregatorValues();
>>> ... check aggregator ...
>>> result.cancel();
>>>
>>> What does everyone think?
>>>
>>> Thanks
>>> --
>>> Pei
>>>


Re: [Proposal] Add waitToFinish(), cancel(), waitToRunning() to PipelineResult.

2016-07-21 Thread Robert Bradshaw
On Thu, Jul 21, 2016 at 12:47 PM, Ben Chambers
 wrote:
> (Minor Issue: I'd propose waitUntilDone and waitUntilRunning rather than
> waitToRunning which reads oddly)

+1 to Until. Finish is nice, because it implies both success and
failure. (Alternatively, waitUntilDone throws an exception on failure,
so when it returns normally it is always Done).

> The only reason to separate submission from waitUntilRunning would be if
> you wanted to kick off several pipelines in quick succession, then wait for
> them all to be running. For instance:
>
> PipelineResult p1Future = p1.run();
> PipelineResult p2Future = p2.run();
> ...
>
> p1Future.waitUntilRunning();
> p2Future.waitUntilRunning();
> ...
>
> In this setup, you can more quickly start several pipelines, but your main
> program would wait and report any errors before exiting.

I would argue in this case you should be using standard Java
threading, which isn't so bad

ExecutorService executor = Executors.newWhatever();
Future p1Future = executor.submit(() -> p1.run());
Future p2Future = executor.submit(() -> p2.run());
...

p1Future.get(); // to block for one
executor.awaitTermination(...); // to wait for all

The difference with waitUntilFinish() is that one often wants to
interact with the PipelineResult itself in the meantime. It's also
much simpler to have one wait method, rather than two (or, as
mentioned, arbitrary status sets (which gets messy if some of them are
not terminal)). If there is high demand, we could add the second one
later.

(Totally backwards incompatible, we could calls this p.launch() for
clarity, and maybe keep a run as run() { return
p.launch().waitUntilFinish(); }.)


> On Thu, Jul 21, 2016 at 12:41 PM Robert Bradshaw
>  wrote:
>
>> I'm in favor of the proposal. My only question is whether we need
>> PipelineResult.waitToRunning(), instead I'd propose that run() block
>> until the pipeline's running/successfully submitted (or failed). This
>> would simplify the API--we'd only have one kind of wait that makes
>> sense in all cases.
>>
>> What kinds of interactions would one want to have with the
>> PipelineResults before it's running?
>>
>> On Thu, Jul 21, 2016 at 12:24 PM, Thomas Groh 
>> wrote:
>> > TestPipeline is probably the one runner that can be expected to block, as
>> > certainly JUnit tests and likely other tests will run the Pipeline, and
>> > succeed, even if the PipelineRunner throws an exception. Luckily, this
>> can
>> > be added to TestPipeline.run(), which already has additional behavior
>> > associated with it (currently regarding the unwrapping of
>> AssertionErrors)
>> >
>> > On Thu, Jul 21, 2016 at 11:40 AM, Kenneth Knowles > >
>> > wrote:
>> >
>> >> I like this proposal. It makes pipeline.run() seem like a pretty normal
>> >> async request, and easy to program with. It removes the implicit
>> assumption
>> >> in the prior design that main() is pretty much just "build and run a
>> >> pipeline".
>> >>
>> >> The part of this that I care about most is being able to write a program
>> >> (not the pipeline, but the program that launches one or more pipelines)
>> >> that has reasonable cross-runner behavior.
>> >>
>> >> One comment:
>> >>
>> >> On Wed, Jul 20, 2016 at 3:39 PM, Pei He 
>> wrote:
>> >> >
>> >> > 4. PipelineRunner.run() should (but not required) do non-blocking runs
>> >> >
>> >>
>> >> I think we can elaborate on this a little bit. Obviously there might be
>> >> "blocking" in terms of, say, an HTTP round-trip to submit the job, but
>> >> run() should never be non-terminating.
>> >>
>> >> For a test runner that finishes the pipeline quickly, I would be fine
>> with
>> >> run() just executing the pipeline, but the PipelineResult should still
>> >> emulate the usual - just always returning a terminal status. It would be
>> >> annoying to add waitToFinish() to the end of all our tests, but leaving
>> a
>> >> run() makes the tests only work with special blocking runner wrappers
>> (and
>> >> make them poor examples). A JUnit @Rule for test pipeline would hide all
>> >> that, perhaps.
>> >>
>> >>
>> >> Kenn
>> >>
>>


Re: [Proposal] Add waitToFinish(), cancel(), waitToRunning() to PipelineResult.

2016-07-21 Thread Robert Bradshaw
I'm in favor of the proposal. My only question is whether we need
PipelineResult.waitToRunning(), instead I'd propose that run() block
until the pipeline's running/successfully submitted (or failed). This
would simplify the API--we'd only have one kind of wait that makes
sense in all cases.

What kinds of interactions would one want to have with the
PipelineResults before it's running?

On Thu, Jul 21, 2016 at 12:24 PM, Thomas Groh  wrote:
> TestPipeline is probably the one runner that can be expected to block, as
> certainly JUnit tests and likely other tests will run the Pipeline, and
> succeed, even if the PipelineRunner throws an exception. Luckily, this can
> be added to TestPipeline.run(), which already has additional behavior
> associated with it (currently regarding the unwrapping of AssertionErrors)
>
> On Thu, Jul 21, 2016 at 11:40 AM, Kenneth Knowles 
> wrote:
>
>> I like this proposal. It makes pipeline.run() seem like a pretty normal
>> async request, and easy to program with. It removes the implicit assumption
>> in the prior design that main() is pretty much just "build and run a
>> pipeline".
>>
>> The part of this that I care about most is being able to write a program
>> (not the pipeline, but the program that launches one or more pipelines)
>> that has reasonable cross-runner behavior.
>>
>> One comment:
>>
>> On Wed, Jul 20, 2016 at 3:39 PM, Pei He  wrote:
>> >
>> > 4. PipelineRunner.run() should (but not required) do non-blocking runs
>> >
>>
>> I think we can elaborate on this a little bit. Obviously there might be
>> "blocking" in terms of, say, an HTTP round-trip to submit the job, but
>> run() should never be non-terminating.
>>
>> For a test runner that finishes the pipeline quickly, I would be fine with
>> run() just executing the pipeline, but the PipelineResult should still
>> emulate the usual - just always returning a terminal status. It would be
>> annoying to add waitToFinish() to the end of all our tests, but leaving a
>> run() makes the tests only work with special blocking runner wrappers (and
>> make them poor examples). A JUnit @Rule for test pipeline would hide all
>> that, perhaps.
>>
>>
>> Kenn
>>


Re: [DISCUSS] Spark runner packaging

2016-07-07 Thread Robert Bradshaw
I don't think the proposal is to put this into the source release, rather
to have a separate binary artifact that's Beam+Spark.

On Thu, Jul 7, 2016 at 11:54 AM, Vlad Rozov  wrote:

> I am not sure if I read the proposal correctly, but note that it will be
> against Apache policy to include compiled binaries into the source release.
> On the other side, each runner may include necessary run-time binaries as
> test only dependencies into the runner's maven pom.xml
>
>
> On 7/7/16 11:01, Lukasz Cwik wrote:
>
>> That makes a lot of sense. I can see other runners following suit where
>> there is a packaged up version for different scenarios / backend cluster
>> runtimes.
>>
>> Should this be part of Apache Beam as a separate maven module or another
>> sub-module inside of Apache Beam, or something else?
>>
>> On Thu, Jul 7, 2016 at 1:49 PM, Amit Sela  wrote:
>>
>> Hi everyone,
>>>
>>> Lately I've encountered a number of issues concerning the fact that the
>>> Spark runner does not package Spark along with it and forcing people to
>>> do
>>> this on their own.
>>> In addition, this seems to get in the way of having beam-examples
>>> executed
>>> against the Spark runner, again because it would have to add Spark
>>> dependencies.
>>>
>>> When running on a cluster (which I guess was the original goal here), it
>>> is
>>> recommended to have Spark provided by the cluster - this makes sense for
>>> Spark clusters and more so for Spark + YARN clusters where you might have
>>> your Spark built against a specific Hadoop version or using a vendor
>>> distribution.
>>>
>>> In order to make the runner more accessible to new adopters, I suggest to
>>> consider releasing a "spark-included" artifact as well.
>>>
>>> Thoughts ?
>>>
>>> Thanks,
>>> Amit
>>>
>>>
>


Re: Window Strategy for KeyedPCollectionTuples and CoGroupByKey

2016-07-06 Thread Robert Bradshaw
It is an error at pipeline construction time to use CoGroupByKey with
differing windowing strategies. If you want to do such joins, you may
want to look into using side inputs which are more flexible.

On Wed, Jul 6, 2016 at 8:01 AM, Shen Li  wrote:
> Hi,
>
> If the PCollections in a KeyedPCollectionTuple have different window
> strategies (WindowFn, Trigger, etc.), how does a CoGroupByKey work? When
> will it be triggered? How does it determine which kvs from each PCollection
> to co-group?
>
> Thanks,
>
> Shen


Re: [DISCUSS] PTransform.named vs. named apply

2016-06-22 Thread Robert Bradshaw
+1, I think it makes more sense to name the application of a transform
rather than the transform itself. (Still mulling on how best to do
this with Python...)

On Wed, Jun 22, 2016 at 9:27 PM, Jean-Baptiste Onofré  wrote:
> +1
>
> Regards
> JB
>
>
> On 06/23/2016 12:17 AM, Ben Chambers wrote:
>>
>> Based on a recent PR (https://github.com/apache/incubator-beam/pull/468) I
>> was reminded of the confusion around the use of
>> .apply(transform.named(someName)) and .apply(someName, transform). This is
>> one of things I’ve wanted to cleanup for a while. I’d like to propose a
>> path towards removing this redundancy.
>>
>> First, some background -- why are there two ways to name things? When we
>> added support for updating existing pipelines, we needed all applications
>> to have unique user-provided names to allow diff’ing the pipelines. We
>> found a few problems with the first approach -- using .named() to create a
>> new transform -- which led to the introduction of the named apply:
>>
>> 1. When receiving an error about an application not having a name, it is
>> not obvious that a name should be given to the *transform*
>> 2. When using .named() to construct a new transform either the type
>> information is lost or the composite transform has to override .named()
>>
>> We now generally suggest the use of .apply(someName, transform). It is
>> easier to use and doesn’t lead to as much confusion around PTransform
>> names
>> and PTransform application names.
>>
>> To that end, I'd like to propose the following changes to the code and
>> documentation:
>> 1. Replace the usage of .named(name) in all examples and composites with
>> the named-apply syntax.
>> 2. Replace .named(name) with a protected PTransform constructor which
>> takes
>> a default name. If not provided, the default name will be derived from the
>> class of the PTransform.
>> 3. Use the protected constructor in composites (where appropriate) to
>> ensure that the default application has a reasonable name.
>>
>> Users will benefit from having a single way of naming applications while
>> building a pipeline. Any breakages due to the removal of .named should be
>> easily fixed by either using the named application or by passing the name
>> to the constructor of a composite.
>>
>> I’d like to hear any comments or opinions on this topic from the wider
>> community. Please let me know what you think!
>>
>> -- Ben
>>
>
> --
> Jean-Baptiste Onofré
> jbono...@apache.org
> http://blog.nanthrax.net
> Talend - http://www.talend.com


Re: Apache Beam for Python

2016-06-14 Thread Robert Bradshaw
Silviu,
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> thanks for detailed update and great work !
>>>>>>>>>
>>>>>>>>> I would advice to create a:
>>>>>>>>>
>>>>>>>>> sdks/python
>>>>>>>>>
>>>>>>>>> Maven module to store the Python SDK.
>>>>>>>>>
>>>>>>>>> WDYT ?
>>>>>>>>>
>>>>>>>>> By the way, welcome aboard and great to have you all guys in the
>>>>
>>>> team
>>>>>
>>>>> !
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> Regards
>>>>>>>>> JB
>>>>>>>>>
>>>>>>>>> On 06/03/2016 03:13 PM, Silviu Calinoiu wrote:
>>>>>>>>>
>>>>>>>>> Hi all,
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> My name is Silviu Calinoiu and I am a member of the Cloud
>>>
>>> Dataflow
>>>>>
>>>>> team
>>>>>>>>>>
>>>>>>>>>> working on the Python SDK.  As the original Beam proposal (
>>>>>>>>>> https://wiki.apache.org/incubator/BeamProposal) mentioned, we
>>>
>>> have
>>>>>>>>>>
>>>>>>>>>> been
>>>>>>>>>> planning to merge the Python SDK into Beam. The Python SDK is in
>>>
>>> an
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>> early
>>>>>>>>
>>>>>>>>
>>>>>>>>> stage of development (alpha milestone) and so this is a good time
>>>
>>> to
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>> move
>>>>>>>>
>>>>>>>>
>>>>>>>>> the code without causing too much disruption to our customers.
>>>>>>>>>>
>>>>>>>>>> Additionally, this enables the Beam community to contribute as
>>>
>>> soon
>>>>>
>>>>> as
>>>>>>>>>>
>>>>>>>>>> possible.
>>>>>>>>>>
>>>>>>>>>> The current state of the SDK is as follows:
>>>>>>>>>>
>>>>>>>>>>   -
>>>>>>>>>>
>>>>>>>>>>   Open-sourced at
>>>>>>>>>> https://github.com/GoogleCloudPlatform/DataflowPythonSDK/
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>   -
>>>>>>>>>>
>>>>>>>>>>   Model: All main concepts are present.
>>>>>>>>>>   -
>>>>>>>>>>
>>>>>>>>>>   I/O: SDK supports text (Google Cloud Storage) and BigQuery
>>>>>>>>>>
>>>>>>>>> connectors
>>>>>>>>
>>>>>>>>
>>>>>>>>>   and has a framework for adding additional sources and sinks.
>>>>>>>>>>
>>>>>>>>>>   -
>>>>>>>>>>
>>>>>>>>>>   Runners: SDK has two pipeline runners: direct runner (in
>>>>>
>>>>> process,
>>>>>>>>>>
>>>>>>>>>> local
>>>>>>>>>>   execution) and Cloud Dataflow runner for batch pipelines
>>>>
>>>> (submit
>>>>>>>>>>
>>>>>>>>>> job
>>>>>>>>>> to
>>>>>>>>>>   Google Dataflow service). The current direct runner is
>>>
>>> bounded
>>>>>>>>>>
>>>>>>>>>> only
>>>>>>>>>> (batch
>>>>>>>>>>   execution) but there is work in progress to support
>>>
>>> unbounded
>>>>>
>>>>> (as
>>>>>>>>>>
>>>>>>>>>> in
>>>>>>>>>> Java).
>>>>>>>>>>   -
>>>>>>>>>>
>>>>>>>>>>   Testing: The code base has unit test coverage for all the
>>>>>
>>>>> modules
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>> and
>>>>>>>>
>>>>>>>>
>>>>>>>>>   several integration and end to end tests (similar in coverage
>>>>
>>>> to
>>>>>>>>>>
>>>>>>>>>> the
>>>>>>>>>> Java
>>>>>>>>>>   SDK). Streaming is not well tested end to end yet since
>>>
>>> Cloud
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>> Dataflow
>>>>>>>>
>>>>>>>>
>>>>>>>>>   focused first on batch.
>>>>>>>>>>
>>>>>>>>>>   -
>>>>>>>>>>
>>>>>>>>>>   Docs: We have matching Python documentation for the features
>>>>>>>>>>
>>>>>>>>> currently
>>>>>>>>
>>>>>>>>
>>>>>>>>>   supported by Cloud Dataflow. The docs are on
>>>
>>> cloud.google.com
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>> (access
>>>>>>>>
>>>>>>>>
>>>>>>>>>   only by whitelist due to the alpha stage of the project).
>>>
>>> Devin
>>>>>
>>>>> is
>>>>>>>>>>
>>>>>>>>>> working
>>>>>>>>>>   on the transition of all docs to Apache.
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> In the next days/weeks we would like to prepare and start
>>>
>>> migrating
>>>>>
>>>>> the
>>>>>>>>>>
>>>>>>>>>> code and you should start seeing some pull requests. We also hope
>>>>>
>>>>> that
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>> the
>>>>>>>>
>>>>>>>>
>>>>>>>>> Beam community will shape the SDK going forward. In particular,
>>>
>>> all
>>>>>
>>>>> the
>>>>>>>>>>
>>>>>>>>>> model improvements implemented for Java (Runner API, etc.) will
>>>>
>>>> have
>>>>>>>>>>
>>>>>>>>>> equivalents in Python once they stabilize. If you have any advice
>>>>>>>>>> before
>>>>>>>>>> we
>>>>>>>>>> start the journey please let us know.
>>>>>>>>>>
>>>>>>>>>> The team that will join the Beam effort consists of me (Silviu
>>>>>>>>>>
>>>>>>>>> Calinoiu),
>>>>>>>>
>>>>>>>>
>>>>>>>>> Charles Chen, Ahmet Altay, Chamikara Jayalath, and last but not
>>>>
>>>> least
>>>>>>>>>>
>>>>>>>>>> Robert Bradshaw (who is already an Apache Beam committer).
>>>>>>>>>>
>>>>>>>>>> So let us know what you think!
>>>>>>>>>>
>>>>>>>>>> Best regards,
>>>>>>>>>>
>>>>>>>>>> Silviu
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> --
>>>>>>>>>
>>>>>>>>> Jean-Baptiste Onofré
>>>>>>>>> jbono...@apache.org
>>>>>>>>> http://blog.nanthrax.net
>>>>>>>>> Talend - http://www.talend.com
>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>> --
>>>>>> Jean-Baptiste Onofré
>>>>>> jbono...@apache.org
>>>>>> http://blog.nanthrax.net
>>>>>> Talend - http://www.talend.com
>>>>>>
>>>>>
>>>>
>>>
>>
>
> --
> Jean-Baptiste Onofré
> jbono...@apache.org
> http://blog.nanthrax.net
> Talend - http://www.talend.com


Re: [VOTE] Release version 0.1.0-incubating

2016-06-09 Thread Robert Bradshaw
+1 (binding)

I also spot-checked the signatures, they look good.

On Thu, Jun 9, 2016 at 10:32 AM, James Malone
 wrote:
> +1 (binding)
>
> On Thu, Jun 9, 2016 at 10:15 AM, Aljoscha Krettek 
> wrote:
>
>> +1 (binding)
>>
>> I ran "mvn clean verify" on the source package, executed WordCount using
>> the FlinkPipelineRunner. NOTICE, LICENSE and DISCLAIMER also look good
>>
>> On Thu, 9 Jun 2016 at 18:50 Dan Halperin 
>> wrote:
>>
>> > +1 (binding)
>> >
>> > per checklist 2.1, I decompressed the source-release zip and ensured that
>> > `mvn clean verify` passed. per 3.6, I confirmed that there are no binary
>> > files. I also did a few other miscellaneous checks.
>> >
>> > On Thu, Jun 9, 2016 at 8:48 AM, Kenneth Knowles 
>> > wrote:
>> >
>> > > +1 (binding)
>> > >
>> > > Confirmed that we can run pipelines on Dataflow.
>> > >
>> > > Looks good. Very exciting!
>> > >
>> > >
>> > > On Thu, Jun 9, 2016 at 8:16 AM, Jean-Baptiste Onofré 
>> > > wrote:
>> > >
>> > > > Team work ! Special thanks to Davor and Dan ! And thanks to the
>> entire
>> > > > team: it's a major step forward (the first release is always the
>> > hardest
>> > > > one ;)). Let's see how the release will be taken by the IPMC :)
>> > > >
>> > > > Regards
>> > > > JB
>> > > >
>> > > >
>> > > > On 06/09/2016 04:32 PM, Scott Wegner wrote:
>> > > >
>> > > >> +1
>> > > >>
>> > > >> Thanks JB and Davor for all your hard work putting together this
>> > > release!
>> > > >>
>> > > >> On Wed, Jun 8, 2016, 11:02 PM Jean-Baptiste Onofré > >
>> > > >> wrote:
>> > > >>
>> > > >> By the way, I forgot to mention that we will create a
>> 0.1.0-incubating
>> > > >>> tag (kind of alias to RC3) when the vote passed.
>> > > >>>
>> > > >>> Regards
>> > > >>> JB
>> > > >>>
>> > > >>> On 06/09/2016 01:20 AM, Davor Bonaci wrote:
>> > > >>>
>> > >  Hi everyone,
>> > >  Here's the first vote for the first release of Apache Beam --
>> > version
>> > >  0.1.0-incubating!
>> > > 
>> > >  As a reminder, we aren't looking for any specific new
>> functionality,
>> > > but
>> > >  would like to release the existing code, get something to our
>> users'
>> > > 
>> > > >>> hands,
>> > > >>>
>> > >  and test the processes. Previous discussions and iterations on
>> this
>> > > 
>> > > >>> release
>> > > >>>
>> > >  have been archived on the dev@ mailing list.
>> > > 
>> > >  The complete staging area is available for your review, which
>> > > includes:
>> > >  * the official Apache source release to be deployed to
>> > > dist.apache.org
>> > > 
>> > > >>> [1],
>> > > >>>
>> > >  and
>> > >  * all artifacts to be deployed to the Maven Central Repository
>> [2].
>> > > 
>> > >  This corresponds to the tag "v0.1.0-incubating-RC3" in source
>> > control,
>> > > 
>> > > >>> [3].
>> > > >>>
>> > > 
>> > >  Please vote as follows:
>> > >  [ ] +1, Approve the release
>> > >  [ ] -1, Do not approve the release (please provide specific
>> > comments)
>> > > 
>> > >  For those of us enjoying our first voting experience -- the
>> release
>> > >  checklist is here [4]. This is a "package release"-type of the
>> > Apache
>> > >  voting process [5]. As customary, the vote will be open for 72
>> > hours.
>> > > It
>> > > 
>> > > >>> is
>> > > >>>
>> > >  adopted by majority approval with at least 3 PPMC affirmative
>> votes.
>> > > If
>> > >  approved, the proposal will be presented to the Apache Incubator
>> for
>> > > 
>> > > >>> their
>> > > >>>
>> > >  review.
>> > > 
>> > >  Thanks,
>> > >  Davor
>> > > 
>> > >  [1]
>> > > 
>> > > 
>> > > >>>
>> > >
>> >
>> https://repository.apache.org/content/repositories/orgapachebeam-1002/org/apache/beam/beam-parent/0.1.0-incubating/beam-parent-0.1.0-incubating-source-release.zip
>> > > >>>
>> > >  [2]
>> > > 
>> > > >>>
>> > https://repository.apache.org/content/repositories/orgapachebeam-1002/
>> > > >>>
>> > >  [3]
>> > > https://github.com/apache/incubator-beam/tree/v0.1.0-incubating-RC3
>> > >  [4]
>> > > 
>> > http://incubator.apache.org/guides/releasemanagement.html#check-list
>> > >  [5] http://www.apache.org/foundation/voting.html
>> > > 
>> > > 
>> > > >>> --
>> > > >>> Jean-Baptiste Onofré
>> > > >>> jbono...@apache.org
>> > > >>> http://blog.nanthrax.net
>> > > >>> Talend - http://www.talend.com
>> > > >>>
>> > > >>>
>> > > >>
>> > > > --
>> > > > Jean-Baptiste Onofré
>> > > > jbono...@apache.org
>> > > > http://blog.nanthrax.net
>> > > > Talend - http://www.talend.com
>> > > >
>> > >
>> >
>>


Re: DoFn Reuse

2016-06-08 Thread Robert Bradshaw
The unit of commit is the bundle.

Consider a DoFn that does batching (e.g. to interact with some
external service less frequently). Items may be buffered during
process() but these buffered items must be processed and the results
emitted in finishBundle(). If inputs are committed as being consumed
before finishBundle is called (and its outputs committed) this
buffered state would be lost but the inputs not replayed.

Put another way, the elements are partitioned into bundles, and the
exactly once guarantee states that the output is the union of exactly
one processing of each bundle. (Bundles may be retried and/or
partially processed; such outputs are discarded.)

On Wed, Jun 8, 2016 at 10:05 AM, Raghu Angadi
 wrote:
> Such data loss can still occur if the worker dies after finishBundle()
> returns, but before the consumption is committed. I thought finishBundle()
> exists simply as best-effort indication from the runner to user some chunk
> of records have been processed.. not part of processing guarantees. Also
> the term "bundle" itself is fairly loosely defined (may be intentionally).
>
> On Wed, Jun 8, 2016 at 8:47 AM, Thomas Groh 
> wrote:
>
>> finishBundle() **must** be called before any input consumption is committed
>> (i.e. marking inputs as completed, which incldues committing any elements
>> they produced). Doing otherwise can cause data loss, as the state of the
>> DoFn is lost if a worker dies, but the input elements will never be
>> reprocessed to recreate the DoFn state. If this occurs, any buffered
>> outputs are lost.
>>
>> On Wed, Jun 8, 2016 at 8:21 AM, Bobby Evans 
>> wrote:
>>
>> > The local java runner does arbitrary batching of 10 elements.
>> >
>> > I'm not sure if flink exposes this or not, but couldn't you use the
>> > checkpoint triggers to also start/finish a bundle?
>> >  - Bobby
>> >
>> > On Wednesday, June 8, 2016 10:17 AM, Aljoscha Krettek <
>> > aljos...@apache.org> wrote:
>> >
>> >
>> >  Ahh, what we could do is artificially induce bundles using either count
>> or
>> > processing time or both. Just so that finishBundle() is called once in a
>> > while.
>> >
>> > On Wed, 8 Jun 2016 at 17:12 Aljoscha Krettek 
>> wrote:
>> >
>> > > Pretty sure, yes. The Iterable in a MapPartitionFunction should give
>> you
>> > > all the values in a given partition.
>> > >
>> > > I checked again for streaming execution. We're doing the opposite,
>> right
>> > > now: every element is a bundle in itself, startBundle()/finishBundle()
>> > are
>> > > called for every element which seems a bit wasteful. The only other
>> > option
>> > > is to see all elements as one bundle, because Flink does not
>> bundle/micro
>> > > batch elements in streaming execution.
>> > >
>> > > On Wed, 8 Jun 2016 at 16:38 Bobby Evans 
>> > > wrote:
>> > >
>> > >> Are you sure about that for Flink?  I thought the iterable finished
>> when
>> > >> you processed a maximum number of elements or the input queue was
>> empty
>> > so
>> > >> that it could returned control back to akka for better sharing of the
>> > >> thread pool.
>> > >>
>> > >>
>> > >>
>> >
>> https://github.com/apache/incubator-beam/blob/af8f5935ca1866012ceb102b9472c8b1ef102d73/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction.java#L99
>> > >> Also in the javadocs for DoFn.Context it explicitly states that you
>> can
>> > >> emit from the finishBundle method.
>> > >>
>> > >>
>> > >>
>> >
>> https://github.com/apache/incubator-beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java#L104-L110
>> > >> I thought I had seen some example of this being used for batching
>> output
>> > >> to something downstream, like HDFS or Kafka, but I'm not sure on that.
>> > If
>> > >> you can emit from finsihBundle and an new instance of the DoFn will be
>> > >> created around each bundle then I can see some people trying to do
>> > >> aggregations inside a DoFn and then emitting them at the end of the
>> > bundle
>> > >> knowing that if a batch fails or is rolled back the system will handle
>> > it.
>> > >> If that is not allowed we should really update the javadocs around it
>> to
>> > >> explain the pitfalls of doing this.
>> > >>  - Bobby
>> > >>
>> > >>On Wednesday, June 8, 2016 4:24 AM, Aljoscha Krettek <
>> > >> aljos...@apache.org> wrote:
>> > >>
>> > >>
>> > >>  Hi,
>> > >> a quick related question: In the Flink runner we basically see
>> > everything
>> > >> as one big bundle, i.e. we call startBundle() once at the beginning
>> and
>> > >> then keep processing indefinitely, never calling finishBundle(). Is
>> this
>> > >> also correct behavior?
>> > >>
>> > >> Best,
>> > >> Aljoscha
>> > >>
>> > >> On Tue, 7 Jun 2016 at 20:44 Thomas Groh 
>> > wrote:
>> > >>
>> > >> > Hey everyone;
>> > >> >
>> > >> > I'm starting to work on BEAM-38 (
>> > >> > https://issues.apache.org/jira/browse/BEAM-38), which enables an
>> > >> > optimization for runners with many small bundles. BEAM-3

Where's my PCollection.map()?

2016-05-27 Thread Robert Bradshaw
Hi all!

One of the questions that often gets asked is why Beam has PTransforms
for everything instead of having methods on PCollection. This morning
I published a blog post explaining some of the design considerations
and history that went into designing the Beam SDK.

http://beam.incubator.apache.org/blog/2016/05/27/where-is-my-pcollection-dot-map.html

Happy reading,
Robert


Re: Add Sorting Class?

2016-05-27 Thread Robert Bradshaw
Totally agree that orderings of values within a
key[-window[-pane]]-grouping being quite useful, and they make total
sense in the model (primarily because elements themselves are never
partitioned).

On Fri, May 27, 2016 at 11:31 AM, Bobby Evans
 wrote:
> If you have local ordering within a group-by like what Francis said, you can 
> build global total ordering off of it, by partitioning the data in ranges.  
> Most terrasort implementations do this.  They subsample the data to get a 
> fairly evenly distributed range of keys, and then do individual smaller sorts 
> that are written out to ordered files.
>
> Having ordering within a window/key seems to make since to me from a 
> streaming perspective too.  So for batch where the default window is 
> everything, you would get ordering for all of the data with the same 
> key/partition.  For streaming you could get ordering within a given window 
> for a key/partition.  I can see a number of uses for this, and combining 
> different panes/windows together and still preserving ordering is not that 
> hard using a streaming merge sort.
>  Yes there is a metadata problem about which transforms preserve ordering of 
> a PCollection and which ones do not, but for most things except sinks that 
> can come as an optimization that transforms can adapt to over time.  If we 
> assume the default is that they do not preserve order (which is what pig 
> does) then you would write your DAG with an order by transform in front of 
> the sink.  The only issue here would be for sinks that might not preserve 
> ordering (like with bigquery that does a dedupe prior to writing out data).
>
> - Bobby
>
> On Thursday, May 26, 2016 3:40 PM, Jesse Anderson  
> wrote:
>
>
>  Another perspective is to look at other projects in the Hadoop ecosystem.
>
> Impala had to have a LIMIT any time you did an ORDER BY. They're since
> removed this limitation.
>
> Hive has two sorting options. ORDER BY does a global order. SORT BY orders
> everything in that partition.
>
> On Thu, May 26, 2016 at 12:35 PM Jesse Anderson 
> wrote:
>
>> I had a similar thought, but wasn't sure if that violated a tenet of Beam.
>>
>> I'm thinking an ordered sink could wrap around another sink. I could see
>> something like:
>> collection.apply(OrderedSink.Timestamp.write(TextIO.Write.To(...)));
>>
>> On Thu, May 26, 2016 at 12:26 PM Robert Bradshaw
>>  wrote:
>>
>>> As Frances alluded to, it's also really hard to reconcile the notion
>>> of a globally ordered PCollection in the context of a streaming
>>> pipeline. Sorting also imposes conditions on partitioning, which we
>>> intentionally leave unspecified for maximum flexibility in the
>>> runtime. One also gets into the question of whether particular
>>> operations are order-creating, order-preserving, or order-destroying
>>> and how much extra overhead is required to maintain these properties
>>> for intermediate collections.
>>>
>>> Your mention of sorting by time is interesting, as this is the
>>> inherent sort dimension is streaming (and we use features like
>>> windowing and triggering to do correct time-based grouping despite
>>> real-time skew). Other than that, all the uses of sorting I've seen
>>> have been limited to portions of data small enough to be produced by
>>> (and consumed by) a single machine (so tops GBs, not TBs or PBs).
>>>
>>> All that aside, I could see more tractable case being made for
>>> ordering (partitioning, etc.) a particular materialization of a
>>> PCollection, i.e. being sorted would not be a property of a
>>> PCollection itself, but could be provided by a sink (e.g. one could
>>> have a sink that promises to write its records in a particular order
>>> within and across shards). It's not inconceivable that this could be
>>> done in a way that is composible with (a large class of) existing
>>> sinks, e.g. given a FileBasedSink and intra/inter-shard-sorting
>>> specifications, one could produce a bounded sink that writes "sorted"
>>> files. Lots of design work TBD...
>>>
>>> - Robert
>>>
>>>
>>>
>>>
>>> On Thu, May 26, 2016 at 11:32 AM, Jesse Anderson 
>>> wrote:
>>> > @frances great analysis. I'm hoping this serves as the starting point
>>> for
>>> > the discussion.
>>> >
>>> > It really comes down to: is this a nice to have or a show stopping
>>> > requirement? As you mention, it comes down to the use case. I've taught
>>> at
&g

Re: Add Sorting Class?

2016-05-26 Thread Robert Bradshaw
As Frances alluded to, it's also really hard to reconcile the notion
of a globally ordered PCollection in the context of a streaming
pipeline. Sorting also imposes conditions on partitioning, which we
intentionally leave unspecified for maximum flexibility in the
runtime. One also gets into the question of whether particular
operations are order-creating, order-preserving, or order-destroying
and how much extra overhead is required to maintain these properties
for intermediate collections.

Your mention of sorting by time is interesting, as this is the
inherent sort dimension is streaming (and we use features like
windowing and triggering to do correct time-based grouping despite
real-time skew). Other than that, all the uses of sorting I've seen
have been limited to portions of data small enough to be produced by
(and consumed by) a single machine (so tops GBs, not TBs or PBs).

All that aside, I could see more tractable case being made for
ordering (partitioning, etc.) a particular materialization of a
PCollection, i.e. being sorted would not be a property of a
PCollection itself, but could be provided by a sink (e.g. one could
have a sink that promises to write its records in a particular order
within and across shards). It's not inconceivable that this could be
done in a way that is composible with (a large class of) existing
sinks, e.g. given a FileBasedSink and intra/inter-shard-sorting
specifications, one could produce a bounded sink that writes "sorted"
files. Lots of design work TBD...

- Robert




On Thu, May 26, 2016 at 11:32 AM, Jesse Anderson  wrote:
> @frances great analysis. I'm hoping this serves as the starting point for
> the discussion.
>
> It really comes down to: is this a nice to have or a show stopping
> requirement? As you mention, it comes down to the use case. I've taught at
> large financial companies where (global) sorting was a real and show
> stopping use case. Theirs was for a large end of day report that had to be
> globally sorted and consumed by many other groups. Sorry, I can't be more
> specific.
>
> Thanks,
>
> Jesse
>
> On Thu, May 26, 2016 at 10:19 AM Frances Perry 
> wrote:
>
>> Currently the Beam model doesn't provide the functionality to do sorting,
>> so this is a pretty deep feature request. Let's separate the discussion
>> into value sorting and global sorting.
>>
>> For value sorting, you need to be able to specify some property of the
>> value (often called a secondary key) and have the GroupByKey/shuffle
>> implementation sort values for a given key by the secondary key. This is a
>> pretty common use case, and I think exposing this in Beam would make a lot
>> of sense. The Hadoop and the Cloud Dataflow shuffle implementation supports
>> this, for example. So it may just be a matter of figuring out how best to
>> expose it to users. In FlumeJava we had you explicitly ParDo to pair values
>> with a string "sort key" so you'd GroupByKey on a PCollection> KV> and get back the Values sorted lexicographically by
>> String. It's a bit gross for users to think about a way to order things
>> that sorts lexicographically. Looks like Crunch[1] uses a general sort key
>> -- but that likely won't interact cleanly with Beam's use of encoded keys
>> for comparisons. Would be nice to think about if there's a cleaner way.
>>
>> For global sorting, you need to be able to be able to generate and maintain
>> orderedness across the elements in a PCollection and have a way to know how
>> to partition the PCollection into balanced, sorted subchunks. This would
>> have a pretty large impact on the Beam model and potentially on many of the
>> runners. Looking at the Crunch sort [1], it requires users to provide the
>> partitioning function if they want it to scale beyond a single reduce. I'd
>> love to see if there's a way to do better. It also can have a pretty big
>> impact on the ability to efficiently parallelize execution (things like
>> dynamic work rebalancing [2] become trickier). Within Google [3], we've
>> found that this tends to be something that users ask for, but don't really
>> have a strong use case for. It's usually the case that Top suffices or that
>> they would rather redo the algorithm into something that can parallelize
>> more efficiently without relying on a global sort. Though of course, with
>> out this, we can't actually do the TeraSort benchmark in Beam. ;-)
>>
>> And of course there's the impact of the unified model on all this ;-) I
>> think these ideas would translated to windowed PCollections ok, but would
>> want to think carefully about it.
>>
>> [1] https://crunch.apache.org/user-guide.html#sorting
>> [2]
>>
>> https://cloud.google.com/blog/big-data/2016/05/no-shard-left-behind-dynamic-work-rebalancing-in-google-cloud-dataflow
>>
>> [3]
>>
>> https://cloud.google.com/blog/big-data/2016/02/history-of-massive-scale-sorting-experiments-at-google
>>
>>
>> On Thu, May 26, 2016 at 8:56 AM, Jesse Anderson 
>> wrote:
>>
>> > This is somewhat the continuat

Re: [PROPOSAL] IRC or slack channel for Apache Beam

2016-05-18 Thread Robert Bradshaw
The value in such a channel is highly dependent on people regularly
being there--do we have a critical mass of developers that would hang
out there? If so, I'd say go for it.

On Wed, May 18, 2016 at 12:51 AM, Amit Sela  wrote:
> +1 for Slack
>
> On Wed, May 18, 2016 at 10:47 AM Jean-Baptiste Onofré 
> wrote:
>
>> Hi all,
>>
>> What do you think about creating a #apache-beam IRC channel on freenode
>> ? Or if it's more convenient a channel on Slack ?
>>
>> Regards
>> JB
>> --
>> Jean-Baptiste Onofré
>> jbono...@apache.org
>> http://blog.nanthrax.net
>> Talend - http://www.talend.com
>>


Re: [DISCUSS] Beam IO &runners native IO

2016-04-28 Thread Robert Bradshaw
On Thu, Apr 28, 2016 at 5:41 AM, Jean-Baptiste Onofré 
wrote:

> Hi all,
>
> regarding the recent threads on the mailing list, I would like to start a
> format discussion around the IO.
> As we can expect the first contributions on this area (I already have some
> work in progress around this ;)), I think it's a fair discussion to have.
>
> Now, we have two kinds of IO: the one "generic" to Beam, the one "local"
> to the runners.
>
> For example, let's take Kafka: we have the KafkaIO (in IO), and for
> instance, we have the spark-streaming kafka connector (in Spark Runner).
>
> Right now, we have two approaches for the user:
> 1. In the pipeline, we use KafkaIO from Beam: it's the preferred approach
> for sure. However, the user may want to use the runner specific IO for two
> reasons:
> * Beam doesn't provide the IO yet (for instance, spark cassandra
> connector is available whereas we don't have yet any CassandraIO (I'm
> working on it anyway ;))
> * The runner native IO is optimized or contain more features that
> the Beam native IO
> 2. So, for the previous reasons, the user could want to use the native
> runner IO. The drawback of this approach is that the pipeline will be tight
> to a specific runner, which is completely against the Beam design.
>
> I wonder if it wouldn't make sense to add flag on the IO API (and related
> on Runner API) like .useNative().
>
> For instance, the user would be able to do:
>
>
> pipeline.apply(KafkaIO.read().withBootstrapServers("...").withTopics("...").useNative(true);
>
> then, if the runner has a "native" IO, it will use it, else, if
> useNative(false) (the default), it won't use any runner native IO.
>

I think the runner should substitute its native IO whenever it can
(assuming it's actually an improvement). The user should not have to (or,
IMHO, even be able to) request this.

The point there is for the configuration: assuming the Beam IO and the
> runner IO can differ, it means that the "Beam IO" would have to populate
> all runner specific IO configuration.
>

None of this configuration should be a semantic change. There may be other
options that the runner specialization could provide (or require?) but I
think this should be handled by a generic mechanism that allows the user to
pass arbitrary, opaque configuration options to a runner, but importantly
*any* runner is free to ignore these configuration options if it does not
understand them without impacting the semantics of the pipeline.


> Of course, it's always possible to use a PTransform to wrap the runner
> native IO, but we are back on the same concern: the pipeline will be couple
> to a specific runner.
>
> The purpose of the useNative() flag is to "automatically" inform the
> runner to use a specific IO if it has one: the pipeline stays decoupled
> from the runners. 


It's a bit unclear why the runner would not want to use the specific IO if
it has one.


Re: PROPOSAL: Apache Beam (virtual) meeting: 05/11/2016 08:00 - 11:00 Pacific time

2016-04-13 Thread Robert Bradshaw
Either works for me.

On Wed, Apr 13, 2016 at 9:21 AM, Milindu Sanoj Kumarage <
agentmili...@gmail.com> wrote:

> Hi,
>
> 5/4/2016 works for me
>
> Regards,
> Milindu
> On 13 Apr 2016 1:43 p.m., "Aljoscha Krettek"  wrote:
>
> > Either works for me.
> >
> > On Tue, 12 Apr 2016 at 22:29 Kenneth Knowles 
> > wrote:
> >
> > > Either works for me. Thanks James!
> > >
> > > On Tue, Apr 12, 2016 at 11:31 AM, Amit Sela 
> > wrote:
> > >
> > > > Anytime works for me.
> > > >
> > > > On Tue, Apr 12, 2016, 21:24 Jean-Baptiste Onofré 
> > > wrote:
> > > >
> > > > > Hi James,
> > > > >
> > > > > 5/4 works for me !
> > > > >
> > > > > Thanks,
> > > > > Regards
> > > > > JB
> > > > >
> > > > > On 04/12/2016 05:05 PM, James Malone wrote:
> > > > > > Hey JB,
> > > > > >
> > > > > > Sorry for the late reply! That is a good point; apologies I
> missed
> > > > > noticing
> > > > > > that conflict. For everyone in the community, how would one of
> the
> > > > > > following alternatives work?
> > > > > >
> > > > > > 5/4/2016 - 8:00 - 11:00 AM Pacific time
> > > > > > -or-
> > > > > > 5/18/2016 - 8:00 - 11:00 AM Pacific time
> > > > > >
> > > > > > Best,
> > > > > >
> > > > > > James
> > > > > >
> > > > > > On Mon, Apr 11, 2016 at 11:17 AM, Lukasz Cwik
> > >  > > > >
> > > > > > wrote:
> > > > > >
> > > > > >> That works for me.
> > > > > >> But it would be best if people just posted when they are
> available
> > > > > >> depending on the goal/scope of the meeting and then a date is
> > > chosen.
> > > > > >>
> > > > > >> On Sun, Apr 10, 2016 at 9:40 PM, Jean-Baptiste Onofré <
> > > > j...@nanthrax.net>
> > > > > >> wrote:
> > > > > >>
> > > > > >>> OK, what about the week before ApacheCon ?
> > > > > >>>
> > > > > >>> Regards
> > > > > >>> JB
> > > > > >>>
> > > > > >>>
> > > > > >>> On 04/11/2016 04:22 AM, Lukasz Cwik wrote:
> > > > > >>>
> > > > >  I will be gone May 14th - 31st so would prefer a date before
> > that.
> > > > > 
> > > > >  On Fri, Apr 8, 2016 at 10:23 PM, Jean-Baptiste Onofré <
> > > > > j...@nanthrax.net>
> > > > >  wrote:
> > > > > 
> > > > >  Hi James,
> > > > > >
> > > > > > May 11th is during the ApacheCon Vancouver.
> > > > > >
> > > > > > As some Beam current and potential contributors could be busy
> > at
> > > > > > ApacheCon, maybe it's better to postpone to May 18th.
> > > > > >
> > > > > > WDYT ?
> > > > > >
> > > > > > Regards
> > > > > > JB
> > > > > >
> > > > > >
> > > > > > On 04/08/2016 10:37 PM, James Malone wrote:
> > > > > >
> > > > > > Hello everyone,
> > > > > >>
> > > > > >> I'd like to propose holding a meeting in May to discuss a
> few
> > > > Apache
> > > > > >> Beam
> > > > > >> topics. This could be a good venue to discuss design
> > proposals,
> > > > > gather
> > > > > >> technical feedback, and the state of the Beam community. My
> > > > thinking
> > > > > >> is
> > > > > >> we
> > > > > >> will be able to cover two or three Apache Beam topics in
> depth
> > > > over
> > > > > >> the
> > > > > >> course of a few hours.
> > > > > >>
> > > > > >> To make the meeting accessible to the community, I propose a
> > > > virtual
> > > > > >> meeting on:
> > > > > >>
> > > > > >> Wednesday May 11th (2016/05/11)
> > > > > >> 8:00 AM - 11:00 AM Pacific
> > > > > >>
> > > > > >> Since time may be limited, I propose agenda items
> recommended
> > by
> > > > the
> > > > > >> PPMC
> > > > > >> are given preferences. Before the meeting we can finalize
> the
> > > > method
> > > > > >> used
> > > > > >> for the virtual meeting (like Google hangouts) and the
> > finalized
> > > > > >> agenda.
> > > > > >> I'm also happy to volunteer myself for taking notes and
> > > > coordinating
> > > > > >> the
> > > > > >> event.
> > > > > >>
> > > > > >> Best,
> > > > > >>
> > > > > >> James
> > > > > >>
> > > > > >>
> > > > > >> --
> > > > > > Jean-Baptiste Onofré
> > > > > > jbono...@apache.org
> > > > > > http://blog.nanthrax.net
> > > > > > Talend - http://www.talend.com
> > > > > >
> > > > > >
> > > > > 
> > > > > >>> --
> > > > > >>> Jean-Baptiste Onofré
> > > > > >>> jbono...@apache.org
> > > > > >>> http://blog.nanthrax.net
> > > > > >>> Talend - http://www.talend.com
> > > > > >>>
> > > > > >>
> > > > > >
> > > > >
> > > > > --
> > > > > Jean-Baptiste Onofré
> > > > > jbono...@apache.org
> > > > > http://blog.nanthrax.net
> > > > > Talend - http://www.talend.com
> > > > >
> > > >
> > >
> >
>


Re: A question about windowed values

2016-04-13 Thread Robert Bradshaw
As Thomas says, the fact that we ever produce values in "no window" is
an implementation quirk that should probably be fixed. (IIRC, it's
used for the output of a GBK before we've done the
group-also-by-windows to figure out what window it really should be
in, so "value in unknown windows" would be a better choice).

If a WindowFn doesn't assign a value to any windows, the system is
free to drop it. There are pros and cons to supporting this degenerate
case vs. making it an error. However, this should almost certainly not
be in the public API...

- Robert


On Wed, Apr 13, 2016 at 9:06 AM, Thomas Groh  wrote:
> Actually, my above claim isn't as strong as it can be.
>
> A value in no windows is considered to not exist. Values that are not
> assigned to any window can be dropped by a runner at *any time*. A WindowFn
> *must* assign all elements to at least one window. All elements that are
> produced by any PTransform (including Sources) must be in a window,
> potentially the GlobalWindow.
>
> On Wed, Apr 13, 2016 at 8:52 AM, Thomas Groh  wrote:
>
>> Values should almost always be part of at least one window. WindowFns
>> should place all elements in at least one window, as values that are in no
>> windows will be dropped when they reach a GroupByKey.
>>
>> Elements in no windows, for example those created by
>> WindowedValue.valueInEmptyWindows(T) are generally an implementation
>> detail of a transform; for example, in the InProcessPipelineRunner, the KV> Iterable>> elements output by a GroupByKeyOnly are in
>> empty windows - but by the time the element reaches the boundary of the
>> GroupByKey, the elements are reassigned to the appropriate window(s).
>>
>> On Tue, Apr 12, 2016 at 11:44 PM, Amit Sela  wrote:
>>
>>> My instinct tells me that if a value does not belong to a specific window
>>> (in time) it's a part of a global window, but if so, what's the role of
>>> the
>>> "empty window". When should an element be a "value in an empty window" ?
>>>
>>
>>


Re: [PROPOSAL] Nightly builds by Jenkins

2016-04-05 Thread Robert Bradshaw
On Tue, Apr 5, 2016 at 12:57 AM, Jason Kuster
 wrote:
> Hey JB,
>
> Just want to clarify - do you mean that beam_nightly would continue to run
> on the schedule it currently has (SCM poll/hourly), plus one run at
> midnight?
>
> I think Dan's question centers around whether beam_nightly build would just
> run once every 24h. We want our postsubmit coverage to run more often than
> that is my impression. Doing a deploy every time the SCM poll returns some
> changes seems like an aggressive schedule to me, but I welcome your
> thoughts there. Otherwise we could keep beam_mavenverify running on the scm
> poll/hourly schedule and add the beam_nightly target which just does a
> single deploy every 24h.

Unless it's too expensive, I would suggest we do a mvn verify every
time the branch changes (plus at some regular interval). The nightly
target would deploy just at midnight (and I don't see much value,
actually it's probably counter-productive, to deploy more often than
that).

> On Tue, Apr 5, 2016 at 12:41 AM, Jean-Baptiste Onofré 
> wrote:
>
>> Hi Dan,
>>
>> you can have both mvn clean deploy and mvn clean verify, but IMHO, the
>> first already covers the second ;)
>>
>> So, I think mvn clean deploy is fine instead of clean verify.
>>
>> WDYT ?
>>
>> Regards
>> JB
>>
>>
>> On 04/05/2016 08:05 AM, Dan Halperin wrote:
>>
>>> I am completely behind producing nightly jars.
>>>
>>> But, I don't think that `beam_MavenVerify` is completely redundant -- I
>>> was
>>> under the impression it was our main post-submit coverage. Is that wrong?
>>>
>>> If I'm not wrong, then I think this should simply be a fourth Jira target
>>> that runs every 24h.
>>>
>>> On Mon, Apr 4, 2016 at 10:50 PM, Jean-Baptiste Onofré 
>>> wrote:
>>>
>>> Hi beamers,

 Now, on Jenkins, we have three jobs:

 - beam_PreCommit does a mvn clean verify for each opened PR
 - beam_MavenVerify does a mvn clean verify on master branch
 - beam_RunnableOnService_GoogleCloudDataflow does a mvn clean verify
 -PDataflowPipelineTests on master branch

 As discussed last week, Davor and I are working on renaming (especially
 package).

 Once this renaming done (it should take a week or so), I propose to
 change
 beam_MavenVerify as beam_Nightly: it will do a mvn clean deploy deploying
 SNAPSHOTs on the Apache SNAPSHOT repo (deploy phase includes verify and
 test of course) with a schedule every night and SCM change.

 It will allow people to test and try beam without building.

 Thoughts ?

 Regards
 JB
 --
 Jean-Baptiste Onofré
 jbono...@apache.org
 http://blog.nanthrax.net
 Talend - http://www.talend.com


>>>
>> --
>> Jean-Baptiste Onofré
>> jbono...@apache.org
>> http://blog.nanthrax.net
>> Talend - http://www.talend.com
>>
>
>
>
> --
> ---
> Jason Kuster
> Apache Beam (Incubating) / Google Cloud Dataflow


Re: [PROPOSAL] Writing More Expressive Beam Tests

2016-03-28 Thread Robert Bradshaw
On Fri, Mar 25, 2016 at 4:28 PM, Ben Chambers
 wrote:
> My only concern is that in the example, you first need to declare all the
> inputs, then the pipeline to be tested, then all the outputs. This can lead
> to tests that are hard to follow, since what you're really testing is an
> interleaving more like "When these inputs arrive, I get this output. Then
> when this happens, I get that output. Etc.".

+1 to pursuing this direction.

> What if instea of returning a PTransform> we had
> a "TestSource".

I think TestSource is a PTransform>.

> so we did something like:
>
> TestPipeline p = TestPipeline.create();
> TestSource source = p.testSource();
>
> // Set up pipeline reading from source.
> PCollection sum = ...;

I'm really curious what the "..." looks like. How are we using the source?

> BeamAssert sumAssert = BeamAssert.sum();

Did you mean BeamAssert.that(sum)?

> // Test for the Speculative Pane
> source.addElements(...);
> source.advanceWatermark(...);
> sumAssert.thatWindowPane(...);
>
> // Test for the On Time Pane
> source.addElements(...)
> source.advanceWatermark(...);
> sumAssert.thatWindowPane(...);
>
> etc.

Is there a p.run() at the end?

> We could also allow TestSource to work with multiple input pipelines like
> this:
>
> TestSource intSource = p.testSource(new TypeDescriptor());
> TestSource longSource = p.testSource(new TypeDescriptor());
> ...
> intSource.addElements(...);
> longSource.addElements(...);
> etc.

Would we get at total ordering on the addition of elements/advancement
of watermarks across sources by the temporal ordering of these
operations in the users program (e.g. by incrementing some global
counter)?

> On Fri, Mar 25, 2016 at 4:08 PM Thomas Groh 
> wrote:
>
>> Hey everyone;
>>
>> I'd still be happy to get feedback. I'm going to start working on this
>> early next week
>>
>> Thanks,
>>
>> Thomas
>>
>> On Mon, Mar 21, 2016 at 5:38 PM, Thomas Groh  wrote:
>>
>> > Hey everyone,
>> >
>> > I've been working on a proposal to expand the capabilities of our testing
>> > API, mostly around writing deterministic tests for pipelines that have
>> > interesting triggering behavior, especially speculative and late
>> triggers.
>> >
>> > I've shared a doc here
>> > <
>> https://docs.google.com/document/d/1fZUUbG2LxBtqCVabQshldXIhkMcXepsbv2vuuny8Ix4/edit?usp=sharing>
>> containing
>> > the proposal and some examples, with world comment access + explicit
>> > committer edit access. I'd welcome any feedback you all have.
>> >
>> > Thanks,
>> >
>> > Thomas
>> >
>>


Re: Capability matrix question

2016-03-23 Thread Robert Bradshaw
+1 to Metric too.

Sounds like there's consensus on renaming to something, likely
[P]Metric. I created https://issues.apache.org/jira/browse/BEAM-147 to
track the actual work.

On Wed, Mar 23, 2016 at 1:56 PM, Dan Halperin
 wrote:
> +1 @Amit =>  -1 to Counter but +1 to Metric.
>
> On Wed, Mar 23, 2016 at 1:43 PM, Amit Sela  wrote:
>
>> IMHO Counters just count..  Metrics measure things, so I think metrics
>> sounds better. Accumulators and Aggregators would have been good as well if
>> they weren't so overloaded.
>> That's just my thoughts here though..
>>
>> On Wed, Mar 23, 2016 at 10:38 PM Robert Bradshaw
>>  wrote:
>>
>> > +1 to renaming this. [P]Counter is another option.
>> >
>> > On Wed, Mar 23, 2016 at 9:12 AM, Kenneth Knowles > >
>> > wrote:
>> > > +1 to considering "metric" / PMetric / etc.
>> > >
>> > > On Wed, Mar 23, 2016 at 8:09 AM, Amit Sela 
>> wrote:
>> > >
>> > >> How about "PMetric" ?
>> > >>
>> > >> On Wed, Mar 23, 2016, 16:53 Frances Perry  wrote:
>> > >>
>> > >>>
>> > >>>>> Perhaps I'm unclear on what an “Aggregator” is. I assumed that a
>> line
>> > >>>>> such as the following:
>> > >>>>>
>> > >>>>> PCollection> meanByName =
>> > >>>>> dataPoints.apply(Mean.perKey());
>> > >>>>>
>> > >>>>> …would be considered an Aggregator, since it applies a mean
>> > aggregation
>> > >>>>> over a window. Is that correct, with respect to the Beam
>> > terminology? If
>> > >>>>> not, what would an example of an Aggregator be?
>> > >>>>>
>> > >>>>
>> > >>> Ah, we may have some slightly confusing terminology here.
>> > >>>
>> > >>> In that code snippet you are using a PTransform (Mean.perKey) to
>> > combine
>> > >>> a PCollection using the Mean CombineFn
>> > >>> <
>> >
>> https://github.com/apache/incubator-beam/blob/c199f085473cfcd79014d0a022b5ce3fdd4863ec/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/Combine.java#L359
>> > >.
>> > >>> An Aggregator
>> > >>> <
>> >
>> https://github.com/apache/incubator-beam/blob/211e76abf9ba34c35ef13cca279cbeefdad7c406/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/Aggregator.java#L54
>> > >
>> > >>> takes a CombineFn and applies it continuously within a DoFn. So it's
>> > more
>> > >>> analogous to a 'counter'. You can see an example of aggregators in
>> > >>> DebuggingWordCount
>> > >>> <
>> >
>> https://github.com/apache/incubator-beam/blob/master/examples/src/main/java/com/google/cloud/dataflow/examples/DebuggingWordCount.java#L129
>> > >
>> > >>> .
>> > >>>
>> > >>> We never really used the term *aggregation *to refer to a general set
>> > of
>> > >>> PTransforms until we started describing things to the community. But
>> > it is
>> > >>> a useful word, so we've ended up in a bit of confusing state. Maybe
>> we
>> > >>> should consider renaming Aggregator? Something like "metric" might be
>> > >>> clearer.
>> > >>>
>> > >>>
>> >
>>


Re: Capability matrix question

2016-03-23 Thread Robert Bradshaw
+1 to renaming this. [P]Counter is another option.

On Wed, Mar 23, 2016 at 9:12 AM, Kenneth Knowles  
wrote:
> +1 to considering "metric" / PMetric / etc.
>
> On Wed, Mar 23, 2016 at 8:09 AM, Amit Sela  wrote:
>
>> How about "PMetric" ?
>>
>> On Wed, Mar 23, 2016, 16:53 Frances Perry  wrote:
>>
>>>
> Perhaps I'm unclear on what an “Aggregator” is. I assumed that a line
> such as the following:
>
> PCollection> meanByName =
> dataPoints.apply(Mean.perKey());
>
> …would be considered an Aggregator, since it applies a mean aggregation
> over a window. Is that correct, with respect to the Beam terminology? If
> not, what would an example of an Aggregator be?
>

>>> Ah, we may have some slightly confusing terminology here.
>>>
>>> In that code snippet you are using a PTransform (Mean.perKey) to combine
>>> a PCollection using the Mean CombineFn
>>> .
>>> An Aggregator
>>> 
>>> takes a CombineFn and applies it continuously within a DoFn. So it's more
>>> analogous to a 'counter'. You can see an example of aggregators in
>>> DebuggingWordCount
>>> 
>>> .
>>>
>>> We never really used the term *aggregation *to refer to a general set of
>>> PTransforms until we started describing things to the community. But it is
>>> a useful word, so we've ended up in a bit of confusing state. Maybe we
>>> should consider renaming Aggregator? Something like "metric" might be
>>> clearer.
>>>
>>>


Re: Renaming process: first step Maven coordonates

2016-03-22 Thread Robert Bradshaw
On Mon, Mar 21, 2016 at 10:24 AM, Jean-Baptiste Onofré  
wrote:
> Hi Ben,
>
> 1. True for Python, but it can go in a folder in sdk (sdk/python) anyway. I
> think the DSLs (Java based) and other languages that we might introduce
> (Scala, ...) can be the same.

Python would certainly want to be on PyPi. I don't think most other
(currently hypothetical) languages such as Go, Ruby, (node)js, R, ...
would be using Maven either.

The current directory structure is very java-centric, e.g.

- sdk
- pom.xml
- src
- main
- test
- runners
- pom.xml
- src
- main
- test
- examples
...
- java8examples
- contrib
- lib1

I'm wondering if it makes more sense to partition into languages at a
higher level (e.g. a top-level src) instead.

https://docs.google.com/document/d/1mTeZED33Famq25XedbKeDlGIJRvtzCXjSfwH9NKQYUE/edit#heading=h.oxy7eveq2260


Re: status update

2016-02-18 Thread Robert Bradshaw
Yes, (b) would be a so-called feature branch. (The proposal here is to
discard the idea of having a separate long-lived "develop" branch.)

On Thu, Feb 18, 2016 at 10:08 AM, Frances Perry 
wrote:

> Our developers are going to be a varied group -- so "main development" will
> look quite different to different developers. In particular, look at:
> (a) a developer writing a java sdk extension for a new IO connector
> (b) a developer changing the beam model
>
> I think it's fine for work like (a) to occur on master, but I think things
> like (b) should  happen on a development branch so that we can keep the
> master branch in a working state. There are going to be a number of large,
> backwards incompatible, churn-y changes to Runner APIs in the near future.
> I'd like us to be able to do those in a way that doesn't affect folks who
> are attempting more surface level contributions.
>
> Frances
>
> On Thu, Feb 18, 2016 at 8:07 AM, Robert Bradshaw <
> rober...@google.com.invalid> wrote:
>
> > +1 to using master for main development (and most non-ASF projects use
> > master like this too). Not having master (the default when one clones,
> > etc.) be at HEAD is often surprising. Tags are easy enough to use when
> one
> > wants a stable version.
> >
> > - Robert
> >
> >
> > On Wed, Feb 17, 2016 at 11:38 PM, Jean-Baptiste Onofré 
> > wrote:
> >
> > > Thanks Henry, I remember now, and Frances posted the link.
> > >
> > > I agree: we should use the master branch as dev branch as all other ASF
> > > projects do.
> > >
> > > Regards
> > > JB
> > >
> > >
> > > On 02/18/2016 08:04 AM, Henry Saputra wrote:
> > >
> > >> Actually no, it is a bit different.
> > >> The concept of develop branch is following the "successful git
> branching
> > >> model" blog post [1] that introduce using develop branch as active
> > branch
> > >> for development and use master as stable branch.
> > >>
> > >> I would recommend using master branch instead as default branch to do
> > >> active development to match other ASF projects.
> > >>
> > >> Some projects using develop from origin company, like Twill [2], had
> > also
> > >> moved to using master as default active branch.
> > >>
> > >> Just my 2 cents.
> > >>
> > >> Thx.
> > >>
> > >> Henry
> > >>
> > >>
> > >> [1] http://nvie.com/posts/a-successful-git-branching-model/
> > >> [2] http://twill.incubator.apache.org/HowToContribute.html
> > >>
> > >> On Wed, Feb 17, 2016 at 10:52 PM, Jean-Baptiste Onofré <
> j...@nanthrax.net
> > >
> > >> wrote:
> > >>
> > >> Hi,
> > >>>
> > >>> Correct me if I'm wrong, but I'm assuming that develop == master
> (from
> > a
> > >>> git perspective).
> > >>>
> > >>> I configured Jenkins this way as it's the "regular" naming ;)
> > >>>
> > >>> I think Frances said "develop" from a dev perspective. All projects
> use
> > >>> master (it's what I'm doing in Falcon, Lens, Karaf, Camel, etc, etc).
> > >>>
> > >>> Maybe I'm wrong ;)
> > >>>
> > >>> Regards
> > >>> JB
> > >>>
> > >>>
> > >>> On 02/18/2016 06:46 AM, Sandeep Deshmukh wrote:
> > >>>
> > >>> Hi All,
> > >>>>
> > >>>> I have some comments on the repository structure and most of them
> are
> > >>>> wrt
> > >>>> my experience in another Apache incubating project.
> > >>>>
> > >>>>
> > >>>>  1. Most active projects use *master* as default development
> > branch
> > >>>> than
> > >>>>  *develop*.  For example, Flink, Spark, Storm, Samza, Pig, Hive,
> > and
> > >>>>  Hadoop use master branch.
> > >>>>  2. Released artifacts are always hosted on downloads page.Maser
> > >>>> need
> > >>>> not
> > >>>>  be the one with production ready state.
> > >>>>  3. It is quite intuitive to use *master* otherwise new
> > contributors
> > >>>>  needs to go through documentatio

Re: Confusing about the bouded naming of PubsubIO

2016-02-18 Thread Robert Bradshaw
+1, maybe call them "typed." (And for many transforms, the unbound versions
need not exist, so we wouldn't even need the distinction...)
On Feb 18, 2016 8:38 AM, "Dan Halperin"  wrote:

> What Ben said is true  but I'd be in favor of renaming or removing
> these constructs once we can make backwards-incompatible changes ;)
>
> On Thu, Feb 18, 2016 at 8:31 AM, Ben Chambers  >
> wrote:
>
> > Classes named "Bound" are used throughout the sdk to describe builders
> that
> > are specified enough to be applied. It indicates that the required
> > parameters have been bound. It is not related to whether the output
> > PCollection is bounded or unbounded.
> >
> > On Thu, Feb 18, 2016, 7:42 AM bakey pan  wrote:
> >
> > > Hi,all:
> > > I notice that in in the PubSubIO class, there only exist one Bound
> > > static class inherit from PTransform.
> > > But actually in the apply method of Bound,whether return a bounded
> or
> > > unbounded PCollection is depend on the variables maxNumRecords and
> > > maxReadTime.So why not name this class "MixBound" or else.
> > >  I think that is a little bit confuse name it  "Bound" but actually
> > it
> > > can be unbouned data stream
> > >
> > > --
> > >  Best Regards
> > >BakeyPan
> > >
> >
>


Re: status update

2016-02-18 Thread Robert Bradshaw
+1 to using master for main development (and most non-ASF projects use
master like this too). Not having master (the default when one clones,
etc.) be at HEAD is often surprising. Tags are easy enough to use when one
wants a stable version.

- Robert


On Wed, Feb 17, 2016 at 11:38 PM, Jean-Baptiste Onofré 
wrote:

> Thanks Henry, I remember now, and Frances posted the link.
>
> I agree: we should use the master branch as dev branch as all other ASF
> projects do.
>
> Regards
> JB
>
>
> On 02/18/2016 08:04 AM, Henry Saputra wrote:
>
>> Actually no, it is a bit different.
>> The concept of develop branch is following the "successful git branching
>> model" blog post [1] that introduce using develop branch as active branch
>> for development and use master as stable branch.
>>
>> I would recommend using master branch instead as default branch to do
>> active development to match other ASF projects.
>>
>> Some projects using develop from origin company, like Twill [2], had also
>> moved to using master as default active branch.
>>
>> Just my 2 cents.
>>
>> Thx.
>>
>> Henry
>>
>>
>> [1] http://nvie.com/posts/a-successful-git-branching-model/
>> [2] http://twill.incubator.apache.org/HowToContribute.html
>>
>> On Wed, Feb 17, 2016 at 10:52 PM, Jean-Baptiste Onofré 
>> wrote:
>>
>> Hi,
>>>
>>> Correct me if I'm wrong, but I'm assuming that develop == master (from a
>>> git perspective).
>>>
>>> I configured Jenkins this way as it's the "regular" naming ;)
>>>
>>> I think Frances said "develop" from a dev perspective. All projects use
>>> master (it's what I'm doing in Falcon, Lens, Karaf, Camel, etc, etc).
>>>
>>> Maybe I'm wrong ;)
>>>
>>> Regards
>>> JB
>>>
>>>
>>> On 02/18/2016 06:46 AM, Sandeep Deshmukh wrote:
>>>
>>> Hi All,

 I have some comments on the repository structure and most of them are
 wrt
 my experience in another Apache incubating project.


  1. Most active projects use *master* as default development branch
 than
  *develop*.  For example, Flink, Spark, Storm, Samza, Pig, Hive, and
  Hadoop use master branch.
  2. Released artifacts are always hosted on downloads page.Maser
 need
 not
  be the one with production ready state.
  3. It is quite intuitive to use *master* otherwise new contributors
  needs to go through documentation to understand process of each
 project.
  4. Overall, the process becomes simple if *master* is the default
 branch.


 Another suggestion is related to release with major version change.
 Major
 release twice a year is a lot of burden on the end user if they want to
 upgrade to a newer version. To address this issue, newly added APIs can
 be
 marked as @evolving so that users are aware of possible change in the
 upcoming release but the stable one should be carefully changed.

 Regards,
 Sandeep

 On Sat, Feb 13, 2016 at 2:34 AM, Frances Perry 
 wrote:

 Thanks for all the feedback! Please keep it coming as needed.

>
> We've gone ahead and created components matching this structure:
>
>
>
> https://issues.apache.org/jira/browse/BEAM/?selectedTab=com.atlassian.jira.jira-projects-plugin:components-panel
>
> We'll work on transition existing state from Google-internal tools into
> this over the next few weeks.
>
>
> On Fri, Feb 12, 2016 at 7:47 AM, Kenneth Knowles
> 
>>
>> wrote:
>
> On Thu, Feb 11, 2016 at 8:53 AM, Maximilian Michels 
>
>> wrote:
>>
>> As for the /develop branch, I would suggest to
>>
>>> make it mandatory to have it in a usable state at all times.
>>>
>>>
>>> +1
>>
>> If breakage is accidentally committed (as will happen) then a CTR
>>
>> rollback
>
> is a encouraged.
>>
>> Kenn
>>
>>
>>
>
 --
>>> Jean-Baptiste Onofré
>>> jbono...@apache.org
>>> http://blog.nanthrax.net
>>> Talend - http://www.talend.com
>>>
>>>
>>
> --
> Jean-Baptiste Onofré
> jbono...@apache.org
> http://blog.nanthrax.net
> Talend - http://www.talend.com
>