Re: New DoFn and WindowedValue/WinowingInternals

2016-12-11 Thread Eugene Kirpichov
Hi Amit,

Yes, this is correct. Part of the motivation for this is that DoFn API is
user-facing, and the compressed representation of windowed elements (e.g.
access to all windows of an element), as well as the ability to emit
directly into a specified window, is an implementation detail of the runner
that is dangerous to expose to SDK users (even I got burnt by it while
working on SplittableParDo), so we would like to move WindowedValue into
runners-core and keep the semantically clean API in the SDK: access to the
current window, and assigning windows via Window.into().

On Sun, Dec 11, 2016 at 11:59 AM Kenneth Knowles 
wrote:

> You've got it right. My recommendations is to just directly implement it
> for the Spark runner. It will often actually clean things up a bit. Here's
> the analogous change for the Flink runner:
> https://github.com/apache/incubator-beam/pull/1435/files.
>
> With GABW, I tried going through the process of keeping some utility
> expansion in runners-core, making StateInternalsFactory, refactoring
> GroupAlsoByWindowsDoFn, then GroupByKeyViaGroupByKeyOnly,
> GroupAlsoByWindow. But it ended up simpler for each runner to just not use
> most of that and do it directly. (they all still share GABW but none of the
> surrounding bits, IIRC)
>
> On Sun, Dec 11, 2016 at 10:33 AM, Amit Sela  wrote:
>
> > So basically using new DoFn with *SplittableParDo*, *Window.Bound*, and
> > *GroupAlsoByWindow* requires a custom implementation by per runner as
> they
> > are not handled by DoFn anymore, right ?
> >
> > On Sun, Dec 11, 2016 at 3:42 PM Eugene Kirpichov
> >  wrote:
> >
> > > Hi Amit, I'll comment in more detail later, but meanwhile please take a
> > > look at https://github.com/apache/incubator-beam/pull/1565
> > > There is a small amount of relevant changes to spark runner.
> > > Take a look at implementation of SplittableParDo (already committed) in
> > > particular ProcessFn and it's usage in direct runner - this is exactly
> > what
> > > you're looking for, a new DoFn that with per-runner support is able to
> > emit
> > > multi-windowed values.
> > > On Sun, Dec 11, 2016 at 4:28 AM Amit Sela 
> wrote:
> > >
> > > > Hi all,
> > > >
> > > > I've been working on migrating the Spark runner to new DoFn and I've
> > > > stumbled upon a couple of cases where OldDoFn is used in a way that
> > > > accessed windowInternals (outputWindowedValue) such as
> > AssignWindowsDoFn.
> > > >
> > > > Since changing windows is no longer the responsibility of DoFn I was
> > > > wondering who and how is this done.
> > > >
> > > > Thanks,
> > > > Amit
> > > >
> > >
> >
>


Re: New DoFn and WindowedValue/WinowingInternals

2016-12-11 Thread Kenneth Knowles
You've got it right. My recommendations is to just directly implement it
for the Spark runner. It will often actually clean things up a bit. Here's
the analogous change for the Flink runner:
https://github.com/apache/incubator-beam/pull/1435/files.

With GABW, I tried going through the process of keeping some utility
expansion in runners-core, making StateInternalsFactory, refactoring
GroupAlsoByWindowsDoFn, then GroupByKeyViaGroupByKeyOnly,
GroupAlsoByWindow. But it ended up simpler for each runner to just not use
most of that and do it directly. (they all still share GABW but none of the
surrounding bits, IIRC)

On Sun, Dec 11, 2016 at 10:33 AM, Amit Sela  wrote:

> So basically using new DoFn with *SplittableParDo*, *Window.Bound*, and
> *GroupAlsoByWindow* requires a custom implementation by per runner as they
> are not handled by DoFn anymore, right ?
>
> On Sun, Dec 11, 2016 at 3:42 PM Eugene Kirpichov
>  wrote:
>
> > Hi Amit, I'll comment in more detail later, but meanwhile please take a
> > look at https://github.com/apache/incubator-beam/pull/1565
> > There is a small amount of relevant changes to spark runner.
> > Take a look at implementation of SplittableParDo (already committed) in
> > particular ProcessFn and it's usage in direct runner - this is exactly
> what
> > you're looking for, a new DoFn that with per-runner support is able to
> emit
> > multi-windowed values.
> > On Sun, Dec 11, 2016 at 4:28 AM Amit Sela  wrote:
> >
> > > Hi all,
> > >
> > > I've been working on migrating the Spark runner to new DoFn and I've
> > > stumbled upon a couple of cases where OldDoFn is used in a way that
> > > accessed windowInternals (outputWindowedValue) such as
> AssignWindowsDoFn.
> > >
> > > Since changing windows is no longer the responsibility of DoFn I was
> > > wondering who and how is this done.
> > >
> > > Thanks,
> > > Amit
> > >
> >
>


Re: New DoFn and WindowedValue/WinowingInternals

2016-12-11 Thread Amit Sela
So basically using new DoFn with *SplittableParDo*, *Window.Bound*, and
*GroupAlsoByWindow* requires a custom implementation by per runner as they
are not handled by DoFn anymore, right ?

On Sun, Dec 11, 2016 at 3:42 PM Eugene Kirpichov
 wrote:

> Hi Amit, I'll comment in more detail later, but meanwhile please take a
> look at https://github.com/apache/incubator-beam/pull/1565
> There is a small amount of relevant changes to spark runner.
> Take a look at implementation of SplittableParDo (already committed) in
> particular ProcessFn and it's usage in direct runner - this is exactly what
> you're looking for, a new DoFn that with per-runner support is able to emit
> multi-windowed values.
> On Sun, Dec 11, 2016 at 4:28 AM Amit Sela  wrote:
>
> > Hi all,
> >
> > I've been working on migrating the Spark runner to new DoFn and I've
> > stumbled upon a couple of cases where OldDoFn is used in a way that
> > accessed windowInternals (outputWindowedValue) such as AssignWindowsDoFn.
> >
> > Since changing windows is no longer the responsibility of DoFn I was
> > wondering who and how is this done.
> >
> > Thanks,
> > Amit
> >
>


Re: New DoFn and WindowedValue/WinowingInternals

2016-12-11 Thread Eugene Kirpichov
Hi Amit, I'll comment in more detail later, but meanwhile please take a
look at https://github.com/apache/incubator-beam/pull/1565
There is a small amount of relevant changes to spark runner.
Take a look at implementation of SplittableParDo (already committed) in
particular ProcessFn and it's usage in direct runner - this is exactly what
you're looking for, a new DoFn that with per-runner support is able to emit
multi-windowed values.
On Sun, Dec 11, 2016 at 4:28 AM Amit Sela  wrote:

> Hi all,
>
> I've been working on migrating the Spark runner to new DoFn and I've
> stumbled upon a couple of cases where OldDoFn is used in a way that
> accessed windowInternals (outputWindowedValue) such as AssignWindowsDoFn.
>
> Since changing windows is no longer the responsibility of DoFn I was
> wondering who and how is this done.
>
> Thanks,
> Amit
>