Re: [DISCUSS] Beam data plane serialization tech

2016-06-27 Thread Aljoscha Krettek
Thanks Kenn for expanding on your previous mail. I now have a better idea
of why we need this.

Out of the systems you suggested Thrift and ProtoBuf3 + gRPC are probably
best suited for the task. Both of these provide a way for generating
serializers as well as for specifying an RPC interface. Avro and
FlatBuffers are only dealing in serializers and we would have to roll our
own RPC system on top of these. (It seems the gRPC folks have some work
going on about integrating support for FlatBuffers but not sure when this
is going to be done: https://github.com/grpc/grpc/issues/5438).

>From the description and benchmarks FlatBuffers looks very nice, if it
really comes with a huge performance increase we might have to consider
using it with our own RPC instead of Thrift/gRPC+ProtoBuf3. This would mean
some overhead, however.

I would suggest to do some proof-of-concept implementations with both
Thrift and gPRC+ProtoBuf3 and see how it compares to the baseline (the
current implementation where we just directly call methods on the DoFn and
the DoFn calls methods on the outside directly.). This wouldn't have to
create a full stack, just enough to see how interaction with the DoFn would
work for the different systems.

On Wed, 22 Jun 2016 at 23:00 Kenneth Knowles  wrote:

> I wanted to say a bit more to clarify and enliven this discussion. My use
> of the term "data plane" may have been confusing. I didn't mean to focus it
> quite so much on the encoded elements. What I meant to discuss was the
> entirety of performance-sensitive interactions between the runner and
> user-defined functions. So let's drop the implied control/data distinction
> and just talk about the whole interface.
>
> At the risk of writing at length about something everyone knows... the
> motivation for the Fn API is this: we have a few types of user-definable
> functions (UDFs) that occur in pipelines, and we need to invoke them in a
> language-independent manner. These are DoFn, CombineFn, WindowFn,
> BoundedSource, UnboundedSource, ViewFn/PCollectionView, and Coder.
>
> I will show a bad idea: Take the interfaces of the above functions (minus
> Coder, which is special) and just turn them into RPC interfaces, and the
> SDK's job is just to be a trivial or near-trivial bridge from RPC to
> language-specific method calls. This is a bad proposal, but hopefully helps
> to show issues such as:
>
>  - How and when do we deserialize user code / launch a container? (my bad
> idea above doesn't answer; probably too often!)
>  - How and when do we encode/decode elements? (my bad idea above would
> require it between every UDF)
>  - How do we manage calls that are more than simply a stream of elements in
> a bundle? (example: side inputs)
>
> Any Fn API is required to have the same semantics as this simple proposal,
> but should achieve it with superior performance. I'll leave off the details
> since I am not authoring them personally. But let's assume as a baseline
> the approach of executing a fused stage of same-language UDFs in a row
> without any encoding/decoding or RPC, and making a single RPC call per
> bundle (ignoring amortized round trips for streaming bytes).
>
> I gather from this thread these questions (which I may be interpreting
> wrong; apologies if so) and I would like to answer them relative to this
> design sketch:
>
> Q: Since we have one RPC per bundle and it goes through the whole fused
> stage, and we have a whole stream of elements per call, doesn't the data
> dominate the envelope?
> A: In streaming executions, bundles can be very small, so the data will not
> necessarily dominate.
>
> Q: Do we really need structured messages? Perhaps byte streams with fairly
> trivial metadata suffice and we can just hand roll it?
> A: I think that schematized tech is well-proven for adaptability and it is
> also handy for code gen, regardless of performance. So to me the question
> is whether or not we need structured messages at all, or if we can model
> every high throughput communication as coder-encoded streams. I think that
> things like commits to state, acknowledgements of timer firings, pull-based
> requests like side inputs are probably best expressed via a schema. But
> maybe I am overlooking some design ideas.
>
> Q: How will side inputs arrive?
> A: This API is really designed to be pull-based, so it sort of implies a
> great many small RPCs (and caching).
>
> I'm sure I've left off some discussion points, and maybe oversimplified
> some things, but does this answer the questions somewhat? Does this clarify
> the suggested choices of tech? Do you still think we don't need them?
>
> Kenn
>
> On Mon, Jun 20, 2016 at 7:48 AM, Bobby Evans 
> wrote:
>
> > In storm we use JSON as the default communication between shell bolts and
> > shell spouts, which allows for APIs in non JVM languages. It works rather
> > well.  That being said it is also slow, and we made it a plugin so others

Improvements to issue/version tracking

2016-06-27 Thread Davor Bonaci
Hi everyone,
I'd like to propose a simple change in Beam JIRA that will hopefully
improve our issue and version tracking -- to actually use the "Fix
Versions" field as intended [1].

The goal would be to simplify issue tracking, streamline generation of
release notes, add a view of outstanding work towards a release, and
clearly communicate which Beam version contains fixes for each issue.

The standard usage of the field is:
* For open (or in-progress/re-opened) issues, "Fix Versions" field is
optional and indicates an unreleased version that this issue is targeting.
The release is not expected to proceed unless this issue is fixed, or the
field is changed.
* For closed (or resolved) issues, "Fix Versions" field indicates a
released or unreleased version that has the fix.

I think the field should be mandatory once the issue is resolved/closed
[4], so we make a deliberate choice about this. I propose we use "Not
applicable" for all those issues that aren't being resolved as Fixed (e.g.,
duplicates, working as intended, invalid, etc.) and those that aren't
released (e.g., website, build system, etc.).

We can then trivially view outstanding work for the next release [2], or
generate release notes [3].

I'd love to hear if there are any comments! I know that at least JB agrees,
as he was convincing me on this -- thanks ;).

Thanks,
Davor

[1]
https://confluence.atlassian.com/adminjiraserver071/managing-versions-802592484.html
[2]
https://issues.apache.org/jira/browse/BEAM/fixforversion/12335766/?selectedTab=com.atlassian.jira.jira-projects-plugin:version-summary-panel
[3]
https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12319527=12335764
[4] https://issues.apache.org/jira/browse/INFRA-12120


Beam Talks at Flink Forward

2016-06-27 Thread Tyler Akidau
Hello Beamers,

Flink Forward, the Apache Flink conference organized by data Artisans, is
happening again this year in Berlin, September 12-14. If you have a
Beam-related talk you'd like to give there, please submit a proposal at
http://flink-forward.org/submit-your-talk/ in the next day or two. The
Flink community is well positioned to take advantage of Beam, and we should
have space for a number of Beam-related talks in the roster (I'm on the PC,
trying to make sure Beam has a solid presence there :-). Note that anything
Beam-related is a reasonable candidate; doesn't have to be specifically
Flink focused, though that's of course a bonus.

-Tyler


Re: Beam Talks at Flink Forward

2016-06-27 Thread Jean-Baptiste Onofré

Hi Tyler,

thanks for the news !

I will submit something ;)

Regards
JB

On 06/27/2016 08:22 PM, Tyler Akidau wrote:

Hello Beamers,

Flink Forward, the Apache Flink conference organized by data Artisans, is
happening again this year in Berlin, September 12-14. If you have a
Beam-related talk you'd like to give there, please submit a proposal at
http://flink-forward.org/submit-your-talk/ in the next day or two. The
Flink community is well positioned to take advantage of Beam, and we should
have space for a number of Beam-related talks in the roster (I'm on the PC,
trying to make sure Beam has a solid presence there :-). Note that anything
Beam-related is a reasonable candidate; doesn't have to be specifically
Flink focused, though that's of course a bonus.

-Tyler



--
Jean-Baptiste Onofré
jbono...@apache.org
http://blog.nanthrax.net
Talend - http://www.talend.com


Re: Sliding-Windowed PCollectionView as SideInput

2016-06-27 Thread Shen Li
Hi Aljoscha,

Thanks for the explanation.

Shen

On Mon, Jun 27, 2016 at 4:38 AM, Aljoscha Krettek 
wrote:

> Hi,
> the WindowFn is responsible for mapping from main-input window to
> side-input window. Have a look at WindowFn.getSideInputWindow(). For
> SlidingWindows this takes the last possible sliding window as the
> side-input window.
>
> Cheers,
> Aljoscha
>
> On Sun, 26 Jun 2016 at 22:30 Shen Li  wrote:
>
> > Hi,
> >
> > I am little confused about how the runner should handle SideInput if it
> > comes from a sliding-windowed PCollection.
> >
> > Say we have two PCollections A and B. Apply
> > Window.into(SlidingWindows.of...) on B, and create a View from it (call
> it
> > VB).
> >
> > Then, a Pardo takes the PCollection A as the main input and VB as side
> > input: A.apply(ParDo.withSideInputs(VB).of(new DoFun() {...})).
> >
> > In the DoFun.processElement(), when the user code calls
> > ProcessContext.sideInput(VB), the view of which window in VB should be
> > returned if the event time of the current element in A corresponds to
> > multiple sliding windows in B?
> >
> >
> > Thanks,
> >
> > Shen
> >
>


Re: Scala DSL

2016-06-27 Thread Ismaël Mejía
Just to summarize, at this point:

- Everybody agrees about the fact that scio is not an SDK.
- Almost everybody agrees that given the current choice they would prefer
‘dsls/scio’
- Some of us are not particularly married with the DSL classification.

I have a proposition to make, we can define two concepts with their given
structure in the Beam repository:

1. Beam API: A set of abstractions to program the complete Beam Model in a
given programming language.

These are idiomatic versions of the Beam Model, and ideally should cover
the complete Beam Model e.g. scio is one example. The directory structure
for Beam APIs could be:

apis/scala
apis/clojure
apis/groovy
...

2. Beam DSL: A domain-specific set of abstractions that run on Beam, e.g.
graphs, machine learning, etc

These represent domain specific idioms, e.g. a graph DSL would represent
graph concepts. e.g. edges, vertex, etc as first citizens. The directory
structure for Beam DSLs could be:

dsls/graph
dsls/ml
dsls/cep
...

Given these definitions for the concrete scio case I think the most
accurate directory would be:

apis/scala
or
apis/scala/scio

I personally prefer the first one (apis/scala) because we don’t have any
other scala API for the moment and because I think that we shouldn’t have
more than one API per language to avoid confusion e.g. imagine that someone
creates apis/java/bcollections to represent Beam Pipelines as distributed
collections, that would be confusing. However I understand the arguments
for the second directory e.g. to support different APIs per language, and
to preserve their original names (scio). Anyway I would be ok with any of
the two.

I excuse myself for this long message, and for not choosing any of the two
structures proposed in this thread, but I think it is important to be clear
about the differences in scope of both Beam APIs and DSLs in particular if
we think about new users.

What do you think, do you think my proposition makes sense, any suggestions
?

Regards,
Ismaël

ps. One last thing, I found this text that in part corroborates my feeling
about scio been an API and not a DSL:

“… a Scala Dataflow API (a nascent open-source version of which already
exists, and which seems likely to flower into maturity in due time given
Dataflow's move to join the ASF).”
https://cloud.google.com/dataflow/blog/dataflow-beam-and-spark-comparison


On Mon, Jun 27, 2016 at 4:52 AM, Raghu Angadi 
wrote:

> On Fri, Jun 24, 2016 at 7:05 PM, Dan Halperin  >
> wrote:
>
> > > I love the
> > > name scio. But I think sdks/scala might be most appropriate and would
> > make
> > > it a first class citizen for Beam.
> > >
> >
> > I am strongly against it being in the 'sdks/' top-level module -- it's
> not
> > a Beam SDK. Unlike DSL, SDK is a very specific term in Beam.
> >
>
> +1. I agree, it is not Beam SDK in that sense.
>
> Raghu.
>
>
> >
> > > Where would a future python sdk reside?
> > >
> >
> > The Python SDK is in the python-sdk branch on Apache already, and it
> lives
> > in `sdks/python`. (And it is aiming to become a proper Beam SDK. ;)
>


Re: Sliding-Windowed PCollectionView as SideInput

2016-06-27 Thread Aljoscha Krettek
Hi,
the WindowFn is responsible for mapping from main-input window to
side-input window. Have a look at WindowFn.getSideInputWindow(). For
SlidingWindows this takes the last possible sliding window as the
side-input window.

Cheers,
Aljoscha

On Sun, 26 Jun 2016 at 22:30 Shen Li  wrote:

> Hi,
>
> I am little confused about how the runner should handle SideInput if it
> comes from a sliding-windowed PCollection.
>
> Say we have two PCollections A and B. Apply
> Window.into(SlidingWindows.of...) on B, and create a View from it (call it
> VB).
>
> Then, a Pardo takes the PCollection A as the main input and VB as side
> input: A.apply(ParDo.withSideInputs(VB).of(new DoFun() {...})).
>
> In the DoFun.processElement(), when the user code calls
> ProcessContext.sideInput(VB), the view of which window in VB should be
> returned if the event time of the current element in A corresponds to
> multiple sliding windows in B?
>
>
> Thanks,
>
> Shen
>