Re: New testSideInputsWithMultipleWindows and should DoFnRunner explode if DoFn contains a side input ?

2016-12-14 Thread Lukasz Cwik
Yes, it should explode if it has side inputs as well.

On Wed, Dec 14, 2016 at 11:03 AM, Amit Sela  wrote:

> Hi all,
>
> Yesterday a new test was added to ParDoTest suite:
> "testSideInputsWithMultipleWindows".
> To the best of my understanding, it's meant to test sideInputs for elements
> in multiple windows (unexploded).
>
> The Spark runner uses the DoFnRunner (Simple) to process DoFns, and it will
> explode compressed elements only if it's "tagged" as "Observes Window".
>
> Should it also explode if it has sideInputs ?
>
> Thanks,
> Amit
>


Re: [DISCUSS] Change "RunnableOnService" To A More Intuitive Name

2016-11-10 Thread Lukasz Cwik
The default is a crashing runner which throws an exception if its executed.
This makes SDK core/examples/... not depend on any implemented runners.

On Thu, Nov 10, 2016 at 12:37 PM, Robert Bradshaw <
rober...@google.com.invalid> wrote:

> +1 to ValidatesRunner. I'd be nice if it were (optionally?)
> parameterized by which feature it validates.
>
> @NeedsRunner is odd, as using a runner is the most natural way to
> write many (most) tests, but an annotation should be used to mark the
> exception, not the norm. (I'd just assume a runner is available for
> all tests, e.g. CoreTests depends on DirectRunner depends on Core).
>
> On Thu, Nov 10, 2016 at 10:14 AM, Mark Liu 
> wrote:
> > +1 ValidatesRunner
> >
> > On Thu, Nov 10, 2016 at 8:40 AM, Kenneth Knowles  >
> > wrote:
> >
> >> Nice. I like ValidatesRunner.
> >>
> >> On Nov 10, 2016 03:39, "Amit Sela"  wrote:
> >>
> >> > How about @ValidatesRunner ?
> >> > Seems to complement @NeedsRunner as well.
> >> >
> >> > On Thu, Nov 10, 2016 at 9:47 AM Aljoscha Krettek  >
> >> > wrote:
> >> >
> >> > > +1
> >> > >
> >> > > What I would really like to see is automatic derivation of the
> >> capability
> >> > > matrix from an extended Runner Test Suite. (As outlined in Thomas'
> >> doc).
> >> > >
> >> > > On Wed, 9 Nov 2016 at 21:42 Kenneth Knowles  >
> >> > > wrote:
> >> > >
> >> > > > Huge +1 to this.
> >> > > >
> >> > > > The two categories I care most about are:
> >> > > >
> >> > > > 1. Tests that need a runner, but are testing the other "thing
> under
> >> > > test";
> >> > > > today this is NeedsRunner.
> >> > > > 2. Tests that are intended to test a runner; today this is
> >> > > > RunnableOnService.
> >> > > >
> >> > > > Actually the lines are not necessary clear between them, but I
> think
> >> we
> >> > > can
> >> > > > make good choices, like we already do.
> >> > > >
> >> > > > The idea of two categories with a common superclass actually has a
> >> > > pitfall:
> >> > > > what if a test is put in the superclass category, when it does not
> >> > have a
> >> > > > clear meaning? And also, I don't have any good ideas for names.
> >> > > >
> >> > > > So I think just replacing RunnableOnService with RunnerTest to
> make
> >> > clear
> >> > > > that it is there just to test the runner is good. We might also
> want
> >> > > > RunnerIntegrationTest extends NeedsRunner to use in the IO
> modules.
> >> > > >
> >> > > > See also Thomas's doc on capability matrix testing* which is
> aimed at
> >> > > case
> >> > > > 2. Those tests should all have a category from the doc, or a new
> one
> >> > > added.
> >> > > >
> >> > > > *
> >> > > >
> >> > > >
> >> > > https://docs.google.com/document/d/1fICxq32t9yWn9qXhmT07xpclHeHX2
> >> > VlUyVtpi2WzzGM/edit
> >> > > >
> >> > > > Kenn
> >> > > >
> >> > > > On Wed, Nov 9, 2016 at 12:20 PM, Jean-Baptiste Onofré <
> >> j...@nanthrax.net
> >> > >
> >> > > > wrote:
> >> > > >
> >> > > > > Hi Mark,
> >> > > > >
> >> > > > > Generally speaking, I agree.
> >> > > > >
> >> > > > > As RunnableOnService extends NeedsRunner, @TestsWithRunner or
> >> > > > @RunOnRunner
> >> > > > > sound clearer.
> >> > > > >
> >> > > > > Regards
> >> > > > > JB
> >> > > > >
> >> > > > >
> >> > > > > On 11/09/2016 09:00 PM, Mark Liu wrote:
> >> > > > >
> >> > > > >> Hi all,
> >> > > > >>
> >> > > > >> I'm working on building RunnableOnService in Python SDK. After
> >> > having
> >> > > > >> discussions with folks, "RunnableOnService" looks like not a
> very
> >> > > > >> intuitive
> >> > > > >> name for those unit tests that require runners and build
> >> lightweight
> >> > > > >> pipelines to test specific components. Especially, they don't
> have
> >> > to
> >> > > > run
> >> > > > >> on a service.
> >> > > > >>
> >> > > > >> So I want to raise this idea to the community and see if anyone
> >> have
> >> > > > >> similar thoughts. Maybe we can come up with a name this is
> tight
> >> to
> >> > > > >> runner.
> >> > > > >> Currently, I have two names in my head:
> >> > > > >>
> >> > > > >> - TestsWithRunners
> >> > > > >> - RunnerExecutable
> >> > > > >>
> >> > > > >> Any thoughts?
> >> > > > >>
> >> > > > >> Thanks,
> >> > > > >> Mark
> >> > > > >>
> >> > > > >>
> >> > > > > --
> >> > > > > Jean-Baptiste Onofré
> >> > > > > jbono...@apache.org
> >> > > > > http://blog.nanthrax.net
> >> > > > > Talend - http://www.talend.com
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
>


Re: PCollection to PCollection Conversion

2016-11-08 Thread Lukasz Cwik
The suggestions you give seem good except for the the XML cases.

Might want to have the XML be a document per line similar to the JSON
examples you have been giving.

On Tue, Nov 8, 2016 at 12:00 PM, Jesse Anderson <je...@smokinghand.com>
wrote:

> @lukasz Agreed there would have to be KV handling. I was more think that
> whatever the addition, it shouldn't just handle KV. It should handle
> Iterables, Lists, Sets, and KVs.
>
> For JSON and XML, I wonder if we'd be able to give someone something
> general purpose enough that you would just end up writing your own code to
> handle it anyway.
>
> Here are some ideas on what it could look like with a method and the
> resulting string output:
> *Stringify.toJSON()*
>
> With KV:
> {"key": "value"}
>
> With Iterables:
> ["one", "two", "three"]
>
> *Stringify.toXML("rootelement")*
>
> With KV:
> 
>
> With Iterables:
> 
>   one
>   two
>   three
> 
>
> *Stringify.toDelimited(",")*
>
> With KV:
> key,value
>
> With Iterables:
> one,two,three
>
> Do you think that would strike a good balance between reusable code and
> writing your own for more difficult formatting?
>
> Thanks,
>
> Jesse
>
> On Tue, Nov 8, 2016 at 11:01 AM Lukasz Cwik <lc...@google.com.invalid>
> wrote:
>
> Jesse, I believe if one format gets special treatment in TextIO, people
> will then ask why doesn't JSON, XML, ... also not supported.
>
> Also, the example that you provide is using the fact that the input format
> is an Iterable. You had posted a question about using KV with
> TextIO.Write which wouldn't align with the proposed input format and still
> would require to write a type conversion function, this time from KV to
> Iterable instead of KV to string.
>
> On Tue, Nov 8, 2016 at 9:50 AM, Jesse Anderson <je...@smokinghand.com>
> wrote:
>
> > Lukasz,
> >
> > I don't think you'd need complicated logic for TextIO.Write. For CSV the
> > call would look like:
> > Stringify.to("", ",", "\n");
> >
> > Where the arguments would be Stringify.to(prefix, delimiter, suffix).
> >
> > The code would be something like:
> > StringBuffer buffer = new StringBuffer(prefix);
> >
> > for (Item item : list) {
> >   buffer.append(item.toString());
> >
> >   if(notLast) {
> > buffer.append(delimiter);
> >   }
> > }
> >
> > buffer.append(suffix);
> >
> > c.output(buffer.toString());
> >
> > That would allow you to do the basic CSV, TSV, and other formats without
> > complicated logic. The same sort of thing could be done for TextIO.Write.
> >
> > Thanks,
> >
> > Jesse
> >
> > On Tue, Nov 8, 2016 at 10:30 AM Lukasz Cwik <lc...@google.com.invalid>
> > wrote:
> >
> > > The conversion from object to string will have uses outside of just
> > > TextIO.Write so it seems logical that we would want to have a ParDo do
> > the
> > > conversion.
> > >
> > > Text file formats have a lot of variance, even if you consider the
> subset
> > > of CSV like formats where it could have fixed width fields, or escaping
> > and
> > > quoting around other fields, or headers that should be placed at the
> top.
> > >
> > > Having all these format conversions within TextIO.Write seems like a
> lot
> > of
> > > logic to contain in that transform which should just focus on writing
> to
> > > files.
> > >
> > > On Tue, Nov 8, 2016 at 8:15 AM, Jesse Anderson <je...@smokinghand.com>
> > > wrote:
> > >
> > > > This is a thread moved over from the user mailing list.
> > > >
> > > > I think there needs to be a way to convert a PCollection to
> > > > PCollection Conversion.
> > > >
> > > > To do a minimal WordCount, you have to manually convert the KV to a
> > > String:
> > > > p
> > > > .apply(TextIO.Read.from("playing_cards.tsv"))
> > > > .apply(Regex.split("\\W+"))
> > > > .apply(Count.perElement())
> > > > *.apply(MapElements.via((KV<String, Long> count) ->*
> > > > *count.getKey() + ":" + count.getValue()*
> > > > *).withOutputType(
> TypeDescriptors.strings()))*
> > > > .apply(TextIO.Write.to("output/stringcount

Re: PCollection to PCollection Conversion

2016-11-08 Thread Lukasz Cwik
Jesse, I believe if one format gets special treatment in TextIO, people
will then ask why doesn't JSON, XML, ... also not supported.

Also, the example that you provide is using the fact that the input format
is an Iterable. You had posted a question about using KV with
TextIO.Write which wouldn't align with the proposed input format and still
would require to write a type conversion function, this time from KV to
Iterable instead of KV to string.

On Tue, Nov 8, 2016 at 9:50 AM, Jesse Anderson <je...@smokinghand.com>
wrote:

> Lukasz,
>
> I don't think you'd need complicated logic for TextIO.Write. For CSV the
> call would look like:
> Stringify.to("", ",", "\n");
>
> Where the arguments would be Stringify.to(prefix, delimiter, suffix).
>
> The code would be something like:
> StringBuffer buffer = new StringBuffer(prefix);
>
> for (Item item : list) {
>   buffer.append(item.toString());
>
>   if(notLast) {
> buffer.append(delimiter);
>   }
> }
>
> buffer.append(suffix);
>
> c.output(buffer.toString());
>
> That would allow you to do the basic CSV, TSV, and other formats without
> complicated logic. The same sort of thing could be done for TextIO.Write.
>
> Thanks,
>
> Jesse
>
> On Tue, Nov 8, 2016 at 10:30 AM Lukasz Cwik <lc...@google.com.invalid>
> wrote:
>
> > The conversion from object to string will have uses outside of just
> > TextIO.Write so it seems logical that we would want to have a ParDo do
> the
> > conversion.
> >
> > Text file formats have a lot of variance, even if you consider the subset
> > of CSV like formats where it could have fixed width fields, or escaping
> and
> > quoting around other fields, or headers that should be placed at the top.
> >
> > Having all these format conversions within TextIO.Write seems like a lot
> of
> > logic to contain in that transform which should just focus on writing to
> > files.
> >
> > On Tue, Nov 8, 2016 at 8:15 AM, Jesse Anderson <je...@smokinghand.com>
> > wrote:
> >
> > > This is a thread moved over from the user mailing list.
> > >
> > > I think there needs to be a way to convert a PCollection to
> > > PCollection Conversion.
> > >
> > > To do a minimal WordCount, you have to manually convert the KV to a
> > String:
> > > p
> > > .apply(TextIO.Read.from("playing_cards.tsv"))
> > > .apply(Regex.split("\\W+"))
> > > .apply(Count.perElement())
> > > *.apply(MapElements.via((KV<String, Long> count) ->*
> > > *count.getKey() + ":" + count.getValue()*
> > > *).withOutputType(TypeDescriptors.strings()))*
> > > .apply(TextIO.Write.to("output/stringcounts"));
> > >
> > > This code really should be something like:
> > > p
> > > .apply(TextIO.Read.from("playing_cards.tsv"))
> > > .apply(Regex.split("\\W+"))
> > > .apply(Count.perElement())
> > > *.apply(ToString.stringify())*
> > > .apply(TextIO.Write.to("output/stringcounts"));
> > >
> > > To summarize the discussion:
> > >
> > >- JA: Add a method to StringDelegateCoder to output any KV or list
> > >- JA and DH: Add a SimpleFunction that takes an type and runs
> > toString()
> > >on it:
> > >class ToStringFn extends SimpleFunction<InputT, String> {
> > >public static String apply(InputT input) {
> > >return input.toString();
> > >}
> > >}
> > >- JB: Add a general purpose type converter like in Apache Camel.
> > >- JA: Add Object support to TextIO.Write that would write out the
> > >toString of any Object.
> > >
> > > My thoughts:
> > >
> > > Is converting to a PCollection mostly needed when you're using
> > > TextIO.Write? Will a general purpose transform only work in certain
> cases
> > > and you'll normally have to write custom code format the strings the
> way
> > > you want them?
> > >
> > > IMHO, it's yes to both. I'd prefer to add Object support to
> TextIO.Write
> > or
> > > a SimpleFunction that takes a delimiter as an argument. Making a
> > > SimpleFunction that's able to specify a delimiter (and perhaps a prefix
> > and
> > > suffix) should cover the majority of formats and cases.
> > >
> > > Thanks,
> > >
> > > Jesse
> > >
> >
>


Re: PCollection to PCollection Conversion

2016-11-08 Thread Lukasz Cwik
The conversion from object to string will have uses outside of just
TextIO.Write so it seems logical that we would want to have a ParDo do the
conversion.

Text file formats have a lot of variance, even if you consider the subset
of CSV like formats where it could have fixed width fields, or escaping and
quoting around other fields, or headers that should be placed at the top.

Having all these format conversions within TextIO.Write seems like a lot of
logic to contain in that transform which should just focus on writing to
files.

On Tue, Nov 8, 2016 at 8:15 AM, Jesse Anderson 
wrote:

> This is a thread moved over from the user mailing list.
>
> I think there needs to be a way to convert a PCollection to
> PCollection Conversion.
>
> To do a minimal WordCount, you have to manually convert the KV to a String:
> p
> .apply(TextIO.Read.from("playing_cards.tsv"))
> .apply(Regex.split("\\W+"))
> .apply(Count.perElement())
> *.apply(MapElements.via((KV count) ->*
> *count.getKey() + ":" + count.getValue()*
> *).withOutputType(TypeDescriptors.strings()))*
> .apply(TextIO.Write.to("output/stringcounts"));
>
> This code really should be something like:
> p
> .apply(TextIO.Read.from("playing_cards.tsv"))
> .apply(Regex.split("\\W+"))
> .apply(Count.perElement())
> *.apply(ToString.stringify())*
> .apply(TextIO.Write.to("output/stringcounts"));
>
> To summarize the discussion:
>
>- JA: Add a method to StringDelegateCoder to output any KV or list
>- JA and DH: Add a SimpleFunction that takes an type and runs toString()
>on it:
>class ToStringFn extends SimpleFunction {
>public static String apply(InputT input) {
>return input.toString();
>}
>}
>- JB: Add a general purpose type converter like in Apache Camel.
>- JA: Add Object support to TextIO.Write that would write out the
>toString of any Object.
>
> My thoughts:
>
> Is converting to a PCollection mostly needed when you're using
> TextIO.Write? Will a general purpose transform only work in certain cases
> and you'll normally have to write custom code format the strings the way
> you want them?
>
> IMHO, it's yes to both. I'd prefer to add Object support to TextIO.Write or
> a SimpleFunction that takes a delimiter as an argument. Making a
> SimpleFunction that's able to specify a delimiter (and perhaps a prefix and
> suffix) should cover the majority of formats and cases.
>
> Thanks,
>
> Jesse
>


Re: Why does `Combine.perKey(SerializableFunction)` require same input and output type

2016-10-31 Thread Lukasz Cwik
GlobalCombineFn and PerKeyCombineFn still expect an associative and
commutative function when accumulating.
GlobalCombineFn is shorthand for assigning everything to a single key,
doing the combine, and then discarding the key and extracting the single
output.
PerKeyCombineFn is shorthand for doing accumulation where the key doesn't
modify the accumulation in anyway.

On Fri, Oct 28, 2016 at 6:09 PM, Manu Zhang <owenzhang1...@gmail.com> wrote:

> Then what about the other interfaces, like Combine.perKey(GlobalCombineFn)
> and Combine.perKey(PerkeyCombineFn) ? Why not all of them have the
> requirements ?
>
> On Sat, Oct 29, 2016 at 12:06 AM Lukasz Cwik <lc...@google.com> wrote:
>
>> For it to be considered a combiner, the function needs to be associative
>> and commutative.
>>
>> The issue is that from an API perspective it would be easy to have a
>> Combine.perKey(SerializableFunction<Iterable, OutputT>). But
>> many people in the data processing world expect that this
>> parallelization/optimization is performed and thus exposing such a method
>> would be dangerous as it would be breaking users expectations so from the
>> design perspective it is a hard requirement. If PCollections ever become
>> ordered or gain other properties, these requirements may loosen but it
>> seems unlikely in the short term.
>>
>> At this point, I think your looking for a MapElements which you pass in a
>> SerializableFunction<KV<K, Iterable, KV<K, OutputT>>.
>> Creating a wrapper SerializableFunction<KV<K, Iterable, KV<K,
>> OutputT>> which can delegate to a SerializableFunction<Iterable,
>> OutputT> should be trivial.
>>
>>
>> On Thu, Oct 27, 2016 at 6:17 PM, Manu Zhang <owenzhang1...@gmail.com>
>> wrote:
>>
>> Thanks for the thorough explanation. I see the benefits for such a
>> function.
>> My follow-up question is whether this is a hard requirement.
>> There are computations that don't satisfy this (I think it's monoid rule)
>> but possible and easier to write with Combine.perKey(
>> SerializableFunction<Iterable, OutputT>). It's not difficult to
>> provide an underlying CombineFn.
>>
>>
>> On Thu, Oct 27, 2016 at 9:47 AM Lukasz Cwik <lc...@google.com.invalid>
>> wrote:
>>
>> Combine.perKey takes a single SerializableFunction which knows how to
>> convert from Iterable to V.
>>
>> It turns out that many runners implement optimizations which allow them to
>> run the combine operation across several machines to parallelize the work
>> and potentially reduce the amount of data they store during a GBK.
>> To be able to do such an optimization, it requires you to actually have
>> three functions:
>> InputT -> AccumulatorT : Creates the intermediate representation which
>> allows for associative combining
>> Iterable -> AccumulatorT: Performs the actual combining
>> AccumT -> OutputT: Extracts the output
>>
>> In the case of Combine.perKey with a SerializableFunction, your providing
>> Iterable -> AccumulatorT and the other two functions are the
>> identity functions.
>>
>> To be able to support a Combine.perKey which can go from Iterable
>> -> OutputT would require that this occurred within a single machine
>> removing the parallelization benefits that runners provide and for almost
>> all cases is not a good idea.
>>
>> On Wed, Oct 26, 2016 at 6:23 PM, Manu Zhang <owenzhang1...@gmail.com>
>> wrote:
>>
>> > Hi all,
>> >
>> > I'm wondering why `Combine.perKey(SerializableFunction)` requires input
>> > and
>> > output to be of the same type while `Combine.PerKey` doesn't have this
>> > restriction.
>> >
>> > Thanks,
>> > Manu
>> >
>>
>>
>>


Re: Why does `Combine.perKey(SerializableFunction)` require same input and output type

2016-10-28 Thread Lukasz Cwik
For it to be considered a combiner, the function needs to be associative
and commutative.

The issue is that from an API perspective it would be easy to have a
Combine.perKey(SerializableFunction<Iterable, OutputT>). But many
people in the data processing world expect that this
parallelization/optimization is performed and thus exposing such a method
would be dangerous as it would be breaking users expectations so from the
design perspective it is a hard requirement. If PCollections ever become
ordered or gain other properties, these requirements may loosen but it
seems unlikely in the short term.

At this point, I think your looking for a MapElements which you pass in a
SerializableFunction<KV<K, Iterable, KV<K, OutputT>>.
Creating a wrapper SerializableFunction<KV<K, Iterable, KV<K,
OutputT>> which can delegate to a SerializableFunction<Iterable,
OutputT> should be trivial.


On Thu, Oct 27, 2016 at 6:17 PM, Manu Zhang <owenzhang1...@gmail.com> wrote:

> Thanks for the thorough explanation. I see the benefits for such a
> function.
> My follow-up question is whether this is a hard requirement.
> There are computations that don't satisfy this (I think it's monoid rule)
> but possible and easier to write with Combine.perKey(
> SerializableFunction<Iterable, OutputT>). It's not difficult to
> provide an underlying CombineFn.
>
>
> On Thu, Oct 27, 2016 at 9:47 AM Lukasz Cwik <lc...@google.com.invalid>
> wrote:
>
>> Combine.perKey takes a single SerializableFunction which knows how to
>> convert from Iterable to V.
>>
>> It turns out that many runners implement optimizations which allow them to
>> run the combine operation across several machines to parallelize the work
>> and potentially reduce the amount of data they store during a GBK.
>> To be able to do such an optimization, it requires you to actually have
>> three functions:
>> InputT -> AccumulatorT : Creates the intermediate representation which
>> allows for associative combining
>> Iterable -> AccumulatorT: Performs the actual combining
>> AccumT -> OutputT: Extracts the output
>>
>> In the case of Combine.perKey with a SerializableFunction, your providing
>> Iterable -> AccumulatorT and the other two functions are the
>> identity functions.
>>
>> To be able to support a Combine.perKey which can go from Iterable
>> -> OutputT would require that this occurred within a single machine
>> removing the parallelization benefits that runners provide and for almost
>> all cases is not a good idea.
>>
>> On Wed, Oct 26, 2016 at 6:23 PM, Manu Zhang <owenzhang1...@gmail.com>
>> wrote:
>>
>> > Hi all,
>> >
>> > I'm wondering why `Combine.perKey(SerializableFunction)` requires input
>> > and
>> > output to be of the same type while `Combine.PerKey` doesn't have this
>> > restriction.
>> >
>> > Thanks,
>> > Manu
>> >
>>
>


Re: Why does `Combine.perKey(SerializableFunction)` require same input and output type

2016-10-26 Thread Lukasz Cwik
Combine.perKey takes a single SerializableFunction which knows how to
convert from Iterable to V.

It turns out that many runners implement optimizations which allow them to
run the combine operation across several machines to parallelize the work
and potentially reduce the amount of data they store during a GBK.
To be able to do such an optimization, it requires you to actually have
three functions:
InputT -> AccumulatorT : Creates the intermediate representation which
allows for associative combining
Iterable -> AccumulatorT: Performs the actual combining
AccumT -> OutputT: Extracts the output

In the case of Combine.perKey with a SerializableFunction, your providing
Iterable -> AccumulatorT and the other two functions are the
identity functions.

To be able to support a Combine.perKey which can go from Iterable
-> OutputT would require that this occurred within a single machine
removing the parallelization benefits that runners provide and for almost
all cases is not a good idea.

On Wed, Oct 26, 2016 at 6:23 PM, Manu Zhang  wrote:

> Hi all,
>
> I'm wondering why `Combine.perKey(SerializableFunction)` requires input
> and
> output to be of the same type while `Combine.PerKey` doesn't have this
> restriction.
>
> Thanks,
> Manu
>


Re: Placement of temporary files by FileBasedSink

2016-10-20 Thread Lukasz Cwik
The issue manifests when a completely different pipeline uses the output of
the last pipeline as input to the new pipeline and then these temporary
files are matched in the glob expression.

This happens because FileBasedSource is responsible for creating the
temporary paths which occurs while processing a bundle. If that bundle
processing fails, there is no way to guarantee for the runner to even know
that it existed in our current execution model.

I think there are other potential solutions which require support from the
runner that aren't being considered since this would all fall under a
general cleanup API which Eugene referred to. The question for now is the
solution good enough?

I'm in favor of #1 as well.

I'm against #4 since FileBasedSource could do a pretty good job for all
filesystems and once there is support for cleanup, FileBasedSource could
migrate to use it without any changes to the various IOChannelFactory's.
This prevents us from getting to the place where Hadoop filesystem
implementation has many many methods.


On Thu, Oct 20, 2016 at 1:57 AM, Chamikara Jayalath 
wrote:

> Can this be prevented by moving temporary files (copy + delete
> individually) at finalization instead of copying all of them and performing
> a bulk delete ? You can support task failures by ignoring renames when the
> destination exists. Python SDK currently does this (and puts temp files in
> a sub-directory).
>
> Thanks,
> Cham
>
> On Wed, Oct 19, 2016 at 6:25 PM Eugene Kirpichov
>  wrote:
>
> Hello,
>
> This is a continuation of the discussion on PR
> https://github.com/apache/incubator-beam/pull/1050 which turned out more
> complex than expected.
>
> Short summary:
> Currently FileBasedSink, when writing to /path/to/foo (in practice,
> /path/to/foo-x-of-y where y is the total number of output
> files), puts temporary files into /path/to/foo-temp-$uid, and when
> finalizing the sink, it removes the temporary files by matching the pattern
> /path/to/foo-temp-* and removing everything that matches.
>
> There are a couple of issues with this:
> - FileBasedSink uses IOChannelFactory, which currently supports local
> filesystems and Google Cloud Storage (GCS). GCS's match() operation is
> currently eventually consistent. So, it may fail to return some of the
> files, so we won't remove them.
> - If the Beam job is cancelled or fails midway, then the temp files won't
> be deleted at all (that's subject to a separate discussion on cleanup API -
> AFAIK there's no JIRA for it yet, I believe peihe@ was thinking about this
> and was going to file one).
> - If a follow-up data processing job is reading /path/to/foo, then the way
> temp files are named, they will likely match the same glob pattern (e.g.
> "/path/to/foo*") as the one intending to match the final output in
> /path/to/foo, so if some temp files are leftover, the follow-up job will
> effectively read duplicate records (some from /path/to/foo, some from
> /path/to/foo-temp-$blah).
>
> I think, in the absence of a way to guarantee that all temp files will be
> deleted (I think it'd be very difficult or impossible to provide a hard
> guarantee of this, considering various possible failure conditions such as
> zombie workers), the cleanest way to solve this is put temp files in a
> location that's unlikely to match the same glob pattern as one that matches
> the final output.
>
> Some options for what that could be:
> 1. A subdirectory that is a sibling of the final path, sufficiently unique,
> and unlikely to match the same glob -
> /path/to/temp-beam-foo-$uid/$another_uid (that's the approach the PR
> currently takes)
> 2. A subdirectory under PipelineOptions.tempLocation - this might be flawed
> because PipelineOptions.tempLocation might be on a different filesystem, or
> have different ACLs, than the output of the FileBasedSink.
> 3. A subdirectory that the user *must* explicitly provide on their
> FileBasedSink. This is a reduction in usability, but there may be cases
> when this is necessary - e.g. if the final location of the FileBasedSink is
> such that we can't create siblings to it (e.g. the root path in a GCS
> bucket - gs://some-bucket/)
> 4. A subdirectory generated by a new IOChannelFactory call ("give me a temp
> directory for the given final path") which would do one of the above -
> reasonable, and simplifies FileBasedSink, but we still need to choose which
> of #1-#3 this call should do.
>
> There might be other things I missed. There might be radical restructurings
> of FileBasedSink that work around this problem entirely, though I couldn't
> think of any.
>
> In general, the requirements on the solution are:
> - It should be very unlikely that somebody reads the temp files in the same
> glob pattern as the final output by mistake.
> - It should continue to make sense as IOChannelFactory is extended with
> support for other filesystems.
> - It should ideally use the same 

AppVeyor for Windows compatibility testing

2016-10-19 Thread Lukasz Cwik
I noticed that the Maven exec plugin was using AppVeyor to get testing to
occur on windows. Since this is currently a gap in our coverage today, is
this something we can enable much like our Travis CI for the Apache Beam
project?


Re: [jira] [Commented] (BEAM-755) beam-runners-core-java NeedsRunner tests not executing

2016-10-19 Thread Lukasz Cwik
At the point in time this was created, there were `NeedsRunner` tests.

On Wed, Oct 19, 2016 at 9:34 AM, Kenneth Knowles (JIRA) 
wrote:

>
> [ https://issues.apache.org/jira/browse/BEAM-755?page=com.
> atlassian.jira.plugin.system.issuetabpanels:comment-
> tabpanel=15589182#comment-15589182 ]
>
> Kenneth Knowles commented on BEAM-755:
> --
>
> Noting that the resolution is that `beam-runners-core-java` actually just
> doesn't have any `NeedsRunner` tests. We should still get the `pom` set up
> right so that if one is added it will execute.
>
> > beam-runners-core-java NeedsRunner tests not executing
> > --
> >
> > Key: BEAM-755
> > URL: https://issues.apache.org/jira/browse/BEAM-755
> > Project: Beam
> >  Issue Type: Bug
> >  Components: runner-core
> >Reporter: Luke Cwik
> >Assignee: Kenneth Knowles
> >Priority: Minor
> > Fix For: 0.3.0-incubating
> >
> >
> > org.apache.beam:beam-runners-core-java is not specified as an
> integration test dependency to scan within runners/pom.xml
> > There is also in runners/direct-java/pom.xml where its
> org.apache.beam:beam-runners-java-core and should be
> org.apache.beam:beam-runners-core-java
> > Finally, even if these dependencies are added and the typo fixed. When
> running the runnable on service integration tests, SplittableParDoTest
> which contains @RunnableOnService tests (part of runners/core-java) doesn't
> execute.
>
>
>
> --
> This message was sent by Atlassian JIRA
> (v6.3.4#6332)
>


Re: Exploring Performance Testing

2016-10-18 Thread Lukasz Cwik
FYI, there was a PR which was outstanding which was about adding the
Nexmark suite: https://github.com/apache/incubator-beam/pull/366

On Tue, Oct 18, 2016 at 1:12 PM, Ismaël Mejía  wrote:

> @Jason, Just some additional refs for ideas, since I already researched a
> little
> bit about how people evaluated this in other Apache projects.
>
> Yahoo published one benchmarking analysis in different streaming frameworks
> like
> a year ago:
> https://github.com/yahoo/streaming-benchmarks
>
> And the flink guys extended it:
> https://github.com/dataArtisans/yahoo-streaming-benchmark
>
> Notice that the common approach comes from the classical database world,
> and it
> is to take one of the TPC queries suites (TPC-H or TPC-DS) and evaluate a
> data
> processing framework against it, Spark does this to evaluate their SQL
> performance.
>
> https://github.com/databricks/spark-sql-perf
>
> However this approach is not 100% aligned with Beam because AFAIK there is
> not a
> TPC suite for continuous processing, that's the reason why I found the
> NexMark
> suite as a more appropriate example.
>
>
> On Tue, Oct 18, 2016 at 9:50 PM, Ismaël Mejía  wrote:
>
> > Hello,
> >
> > Now that we are discussing about the subject of performance testing, I
> > want to
> > jump into the conversation to remind everybody that we have a really
> > interesting
> > benchmarking suite already contributed by google that has (sadly) not
> been
> > merged yet.
> >
> > https://github.com/apache/incubator-beam/pull/366
> > https://issues.apache.org/jira/browse/BEAM-160
> >
> > This is not exactly the kind of benchmark of the current discussion, but
> > for me
> > is a super valuable contribution that I hope we can use/refine to
> evaluate
> > the
> > runners.
> >
> > Ismaël Mejía
> >
> >
> > On Tue, Oct 18, 2016 at 8:16 PM, Jean-Baptiste Onofré 
> > wrote:
> >
> >> It sounds like a good idea to me.
> >>
> >> Regards
> >> JB
> >>
> >>
> >> On 10/18/2016 08:08 PM, Amit Sela wrote:
> >>
> >>> @Jesse how about runners "tracing" the constructed DAG (by Beam) so
> that
> >>> it's clear what the runner actually executed ?
> >>>
> >>> Example:
> >>> For the SparkRunner, a ParDo translates to a mapPartitions
> >>> transformation.
> >>>
> >>> That could provide transparency when debugging/benchmarking pipelines
> >>> per-runner.
> >>>
> >>> On Tue, Oct 18, 2016 at 8:25 PM Jesse Anderson 
> >>> wrote:
> >>>
> >>> @Dan before starting with Beam, I'd want to know how much performance
>  I've
>  giving up by not programming directly to the API.
> 
>  On Tue, Oct 18, 2016 at 10:03 AM Dan Halperin
>   
> >
> > wrote:
> 
>  I think there are lots of excellent one-off performance studies, but
> I'm
> > not sure how useful that is to Beam.
> >
> > From a test infra point of view, I'm wondering more about tracking of
> > performance over time, identifying regressions, etc.
> >
> > Google has some tools like PerfKit
> >  which is
> > basically a skin on a database + some scripts to load and query data;
> >
>  but I
> 
> > don't love it. Do other Apache projects do public, long-term
> > benchmarking
> > and performance regression testing?
> >
> > Dan
> >
> > On Tue, Oct 18, 2016 at 8:52 AM, Jesse Anderson <
> je...@smokinghand.com
> > >
> > wrote:
> >
> > I found data Artisan's benchmarking post
> >>  >> exactly-once-stream-processing-with-apache-flink/>.
> >> They also shared the code  >> ns/performance
> >>
> > .
> > I
> >
> >> didn't dig in much, but they did a wide range of algorithms. They
> have
> >>
> > the
> >
> >> native code, so you write the Beam code and check against the native
> >> performance.
> >>
> >> On Mon, Oct 17, 2016 at 5:14 PM amir bahmanyari
> >> 
> >> wrote:
> >>
> >> Hi Jason,I have been busy bench-marking Flink Cluster (Spark next)
> >>>
> >> under
> >
> >> Beam.I can share my experience. Can you list items of interest to
> >>>
> >> know
> 
> > so I
> >>
> >>> can answer them to the best of my knowledge.Cheers
> >>>
> >>>   From: Jason Kuster 
> >>>  To: dev@beam.incubator.apache.org
> >>>  Sent: Monday, October 17, 2016 5:06 PM
> >>>  Subject: Exploring Performance Testing
> >>>
> >>> Hey all,
> >>>
> >>> Now that we've covered some of the initial ground with regard to
> >>> correctness testing, I'm going to be starting work on performance
> >>>
> >> testing
> >
> >> and benchmarking. I wanted to reach out and see what 

Re: Documentation for IDE setup

2016-10-14 Thread Lukasz Cwik
I rely on having the Maven Eclipse integration and m2e-apt and do a maven
import of a project.

On Fri, Oct 14, 2016 at 8:10 AM, Jesse Anderson <je...@smokinghand.com>
wrote:

> I did a "mvn eclipse:eclipse" to generate the Eclipse projects and imported
> them. That didn't compile either.
>
> On Fri, Oct 14, 2016 at 8:06 AM Lukasz Cwik <lc...@google.com.invalid>
> wrote:
>
> > I use Eclipse for development but always defer to maven since its the
> > source of truth in the end.
> > I also have issues with getting it to compile on import and it has to do
> > with annotation processing and generally requires m2e-apt to be installed
> > and configured correctly.
> >
> > On Fri, Oct 14, 2016 at 7:25 AM, Neelesh Salian <nsal...@cloudera.com>
> > wrote:
> >
> > > I was looking for the same couple of days ago.
> > > But IntelliJ is less worrisome than Eclipse.
> > >
> > > Straight Import. No Hassle.
> > > +1 to docs, though.
> > >
> > > On Fri, Oct 14, 2016 at 7:19 AM, Jean-Baptiste Onofré <j...@nanthrax.net
> >
> > > wrote:
> > >
> > > > [Troll] Who's using Eclipse anymore ? [/Troll]
> > > >
> > > > ;)
> > > >
> > > > Regards
> > > > JB
> > > >
> > > >
> > > > On 10/14/2016 04:06 PM, Jesse Anderson wrote:
> > > >
> > > >> Last week I imported Beam with IntelliJ and everything worked.
> > > >>
> > > >> That said, I tried to import the Eclipse project and that doesn't
> > > compile
> > > >> anymore. I didn't have time to figure out what happened though.
> > > >>
> > > >> On Fri, Oct 14, 2016 at 1:21 AM Jean-Baptiste Onofré <
> j...@nanthrax.net
> > >
> > > >> wrote:
> > > >>
> > > >> Hi Christian,
> > > >>>
> > > >>> IntelliJ doesn't need any special config (maybe the code style can
> be
> > > >>> documented or imported).
> > > >>>
> > > >>> Anyway, agree to add such on website in the contribute directory. I
> > > >>> think it could be part of the contribution-guide as it's first
> setup
> > > >>> step.
> > > >>>
> > > >>> Regards
> > > >>> JB
> > > >>>
> > > >>> On 10/14/2016 10:17 AM, Christian Schneider wrote:
> > > >>>
> > > >>>> Hello all,
> > > >>>>
> > > >>>> I am new to the beam community and currently start making myself
> > > >>>> familiar with the code.  I quickly found the contribution guide
> and
> > > was
> > > >>>> able to clone the code and build beam using maven.
> > > >>>>
> > > >>>> The first obstacle I faced was getting the code build in eclipse.
> I
> > > >>>> naively imported as existing maven projects but got lots of
> compile
> > > >>>> errors. After talking to Dan Kulp we found that this is due to the
> > apt
> > > >>>> annotation processing for auto value types. Dan explained me how I
> > > need
> > > >>>> to setup eclipse to make it work.
> > > >>>>
> > > >>>> I still got 5 compile errors (Some bound mismatch at Read.bounded,
> > and
> > > >>>> one ambiguous method empty). These errors seem to be present for
> > > >>>> everyone using eclipse and Dan works on it. So I think this is
> not a
> > > >>>> permanent problem.
> > > >>>>
> > > >>>> To make it easier for new people I would like to write a
> > documentation
> > > >>>> about the IDE setup. I can cover the eclipse part but I think
> > intellij
> > > >>>> should also be described.
> > > >>>>
> > > >>>> I already started with it and placed it in /contribute/ide-setup.
> > Does
> > > >>>> that make sense?
> > > >>>>
> > > >>>> I currently did not link to it from anywhere. I think it should be
> > > >>>> linked in the contribute/index and in the Contribute menu.
> > > >>>>
> > > >>>> Christian
> > > >>>>
> > > >>>>
> > > >>> --
> > > >>> Jean-Baptiste Onofré
> > > >>> jbono...@apache.org
> > > >>> http://blog.nanthrax.net
> > > >>> Talend - http://www.talend.com
> > > >>>
> > > >>>
> > > >>
> > > > --
> > > > Jean-Baptiste Onofré
> > > > jbono...@apache.org
> > > > http://blog.nanthrax.net
> > > > Talend - http://www.talend.com
> > > >
> > >
> > >
> > >
> > > --
> > > Neelesh Srinivas Salian
> > > Customer Operations Engineer
> > >
> >
>


Re: [DISCUSS] Introduce DoFnWithStore

2016-10-14 Thread Lukasz Cwik
The only way we have today is to use BoundedReadFromUnboundedSource or use
a side input to bridge an unbounded portion of the pipeline with a bounded
portion of the pipeline.
The model allows the side input bridge between these two portions of the
pipeline to happen but I can't comment as to how well it will work with the
runners we have today.
The bounded portion of the pipeline would need to know some set of windows
it wanted to wait for upfront from the unbounded portion so that the side
input trigger would fire correctly and allow the bounded portion of the
pipeline to be scheduled to execute.

On Fri, Oct 14, 2016 at 7:59 AM, Jean-Baptiste Onofré <j...@nanthrax.net>
wrote:

> Thanks for the update Lukasz.
>
> How would you implement a "transform" from unbounded PCollection to
> bounded PCollection ?
>
> Even if I use a GroupByKey with something like KV<K, Iterable>, it
> doesn't change the type of the PCollection.
>
> You are right with State API. My proposal is more a way to implicitly use
> State in DoFn.
>
> Regards
> JB
>
>
> On 10/14/2016 04:51 PM, Lukasz Cwik wrote:
>
>> SplittableDoFn is about taking a single element and turning it into
>> potentially many in a parallel way by allowing an element to be split
>> across bundles.
>>
>> I believe a user could do what you describe by using a GBK to group their
>> data how they want. In your example it would be a single key, then they
>> would have KV<K, Iterable> for all the values when reading from that
>> GBK. The proposed State API seems to also overlap with what your trying to
>> achieve.
>>
>>
>>
>> On Fri, Oct 14, 2016 at 5:12 AM, Jean-Baptiste Onofré <j...@nanthrax.net>
>> wrote:
>>
>> Hi guys,
>>>
>>> When testing the different IOs, we want to have the best possible
>>> coverage
>>> and be able to test with different use cases.
>>>
>>> We create integration test pipelines, and, one "classic" use case is to
>>> implement a pipeline starting from an unbounded source (providing an
>>> unbounded PCollection like Kafka, JMS, MQTT, ...) and sending data to a
>>> bounded sink (TextIO for instance) expected a bounded PCollection.
>>>
>>> This use case is not currently possible. Even when using a Window, it
>>> will
>>> create a chunk of the unbounded PCollection, but the PCollection is still
>>> unbounded.
>>>
>>> That's why I created: https://issues.apache.org/jira/browse/BEAM-638.
>>>
>>> However, I don't think a Window Fn/Trigger is the best approach.
>>>
>>> A possible solution would be to create a specific IO
>>> (BoundedWriteFromUnboundedSourceIO similar to the one we have for Read
>>> ;)) to do that, but I think we should provide a more global way, as this
>>> use case is not specific to IO. For instance, a sorting PTransform will
>>> work only on a bounded PCollection (not an unbounded).
>>>
>>> I wonder if we could not provide a DoFnWithStore. The purpose is to store
>>> unbounded PCollection elements (squared by a Window for instance) into a
>>> pluggable store and read from the store to provide a bounded PCollection.
>>> The store/read trigger could be on the finish bundle.
>>> We could provide "store service", for instance based on GS, HDFS, or any
>>> other storage (Elasticsearch, Cassandra, ...).
>>>
>>> Spark users might be "confused", as in Spark, this behavior is "native"
>>> thanks to the micro-batches. In spark-streaming, basically a DStream is a
>>> bounded collection of RDDs.
>>>
>>> Basically, the DoFnWithStore will look like a DoFn with implicit
>>> store/read from the store. Something like:
>>>
>>> public abstract class DoFnWithStore extends DoFn {
>>>
>>>   @ProcessElement
>>>   @Store(Window)
>>>   
>>>
>>> }
>>>
>>> Generally, SDF sounds like a native way to let users implement this
>>> behavior explicitly.
>>>
>>> My proposal is to do it implicitly and transparently for the end users
>>> (they just have to provide the Window definition and the store service to
>>> use).
>>>
>>> Thoughts ?
>>>
>>> Regards
>>> JB
>>> --
>>> Jean-Baptiste Onofré
>>> jbono...@apache.org
>>> http://blog.nanthrax.net
>>> Talend - http://www.talend.com
>>>
>>>
>>
> --
> Jean-Baptiste Onofré
> jbono...@apache.org
> http://blog.nanthrax.net
> Talend - http://www.talend.com
>


Re: [DISCUSS] Introduce DoFnWithStore

2016-10-14 Thread Lukasz Cwik
SplittableDoFn is about taking a single element and turning it into
potentially many in a parallel way by allowing an element to be split
across bundles.

I believe a user could do what you describe by using a GBK to group their
data how they want. In your example it would be a single key, then they
would have KV for all the values when reading from that
GBK. The proposed State API seems to also overlap with what your trying to
achieve.



On Fri, Oct 14, 2016 at 5:12 AM, Jean-Baptiste Onofré 
wrote:

> Hi guys,
>
> When testing the different IOs, we want to have the best possible coverage
> and be able to test with different use cases.
>
> We create integration test pipelines, and, one "classic" use case is to
> implement a pipeline starting from an unbounded source (providing an
> unbounded PCollection like Kafka, JMS, MQTT, ...) and sending data to a
> bounded sink (TextIO for instance) expected a bounded PCollection.
>
> This use case is not currently possible. Even when using a Window, it will
> create a chunk of the unbounded PCollection, but the PCollection is still
> unbounded.
>
> That's why I created: https://issues.apache.org/jira/browse/BEAM-638.
>
> However, I don't think a Window Fn/Trigger is the best approach.
>
> A possible solution would be to create a specific IO
> (BoundedWriteFromUnboundedSourceIO similar to the one we have for Read
> ;)) to do that, but I think we should provide a more global way, as this
> use case is not specific to IO. For instance, a sorting PTransform will
> work only on a bounded PCollection (not an unbounded).
>
> I wonder if we could not provide a DoFnWithStore. The purpose is to store
> unbounded PCollection elements (squared by a Window for instance) into a
> pluggable store and read from the store to provide a bounded PCollection.
> The store/read trigger could be on the finish bundle.
> We could provide "store service", for instance based on GS, HDFS, or any
> other storage (Elasticsearch, Cassandra, ...).
>
> Spark users might be "confused", as in Spark, this behavior is "native"
> thanks to the micro-batches. In spark-streaming, basically a DStream is a
> bounded collection of RDDs.
>
> Basically, the DoFnWithStore will look like a DoFn with implicit
> store/read from the store. Something like:
>
> public abstract class DoFnWithStore extends DoFn {
>
>   @ProcessElement
>   @Store(Window)
>   
>
> }
>
> Generally, SDF sounds like a native way to let users implement this
> behavior explicitly.
>
> My proposal is to do it implicitly and transparently for the end users
> (they just have to provide the Window definition and the store service to
> use).
>
> Thoughts ?
>
> Regards
> JB
> --
> Jean-Baptiste Onofré
> jbono...@apache.org
> http://blog.nanthrax.net
> Talend - http://www.talend.com
>


Re: [REMINDER] Technical discussion on the mailing list

2016-10-05 Thread Lukasz Cwik
+1 for the notice
+1 for the usage of AutoValue

On Wed, Oct 5, 2016 at 4:51 AM, Jean-Baptiste Onofré 
wrote:

> Hi team,
>
> I would like to excuse myself to have forgotten to discuss and share with
> you a technical point and generally speaking do a small reminder.
>
> When we work with Eugene on the JdbcIO, we experimented AutoValue to deal
> with IO configuration. AutoValue provides a nice way to reduce and limit
> the boilerplate code required by the IO configuration.
> We used AutoValue in JdbcIO and, regarding the good improvements we saw,
> we started to refactor the other IOs.
>
> The use of AutoValue should have been notice and discussed on the mailing
> list.
>
> "If it doesn't exist on the mailing list, it doesn't exist at all."
>
> So, any comment happening on a GitHub pull request, or discussion on
> hangouts which can impact the project (generally speaking) has to happen on
> the mailing list.
>
> It provides project transparency and facilitates the new contribution
> onboarding.
>
> Thanks !
>
> Regards
> JB
>
>


Re: IntervalWindow toString()

2016-09-19 Thread Lukasz Cwik
Follows math conventions that represent ranges where [ ] represent
inclusivity and ( ) means upto but don't include. So [2, 3) means the range
from 2 upto but not including 3.

On Mon, Sep 19, 2016 at 8:56 AM, Jesse Anderson 
wrote:

> The toString() to IntervalWindow starts with a square bracket and ends with
> a parenthesis. Is this a type of notation or a bug? Code:
>
>   @Override
>   public String toString() {
> return "[" + start + ".." + end + ")";
>   }
>
> Thanks,
>
> Jesse
>


Re: TextIO.Read.Bound class

2016-08-25 Thread Lukasz Cwik
Yes, that makes sense, feel free to create an issue or create a PR
resolving this discrepancy.
Taking a pass over the existing IO transforms would also be helpful.

On Thu, Aug 25, 2016 at 11:42 AM, Gaurav Gupta 
wrote:

> Hi All,
>
> I am new to Apache beam and I was going through the word count example. I
> found that TextIO.Read.Bound is used to read file.
>
> Should TextIO.Read.Bound not extend PTransform
> instead of PTransform similar to KafkaIO.Read  and
> JMSIO.Read that extend PTransform?
>
> Thanks
> Gaurav
>


Re: Remove legacy import-order?

2016-08-24 Thread Lukasz Cwik
+1 for import order

On Wed, Aug 24, 2016 at 9:01 AM, Amit Sela  wrote:

> +1 on import order as well.
> Kenneth has a good point about history if we reformat.
>
> On Wed, Aug 24, 2016, 18:59 Kenneth Knowles 
> wrote:
>
> > +1 to import order
> >
> > I don't care about actually enforcing formatting, but would add it to IDE
> > tips and just make it an "OK topic for code review". Enforcing it would
> > result in obscuring a lot of history for who to talk to about pieces of
> > code.
> >
> > And by the way there is a recent build of the IntelliJ plugin for
> > https://github.com/google/google-java-format, available through the
> usual
> > plugin search functionality. I use it and it is very nice.
> >
> > On Tue, Aug 23, 2016 at 11:26 PM, Aljoscha Krettek 
> > wrote:
> >
> > > +1 on the import order
> > >
> > > +1 on also starting a discussion about enforced formatting
> > >
> > > On Wed, 24 Aug 2016 at 06:43 Jean-Baptiste Onofré 
> > wrote:
> > >
> > > > Agreed.
> > > >
> > > > It makes sense for the import order.
> > > >
> > > > Regards
> > > > JB
> > > >
> > > > On 08/24/2016 02:32 AM, Ben Chambers wrote:
> > > > > I think introducing formatting should be a separate discussion.
> > > > >
> > > > > Regarding the import order: this PR demonstrates the change
> > > > > https://github.com/apache/incubator-beam/pull/869
> > > > >
> > > > > I would need to update the second part (applying optimize imports)
> > > prior
> > > > to
> > > > > actually merging.
> > > > >
> > > > > On Tue, Aug 23, 2016 at 5:08 PM Eugene Kirpichov
> > > > >  wrote:
> > > > >
> > > > >> Two cents: While we're at it, we could consider enforcing
> formatting
> > > as
> > > > >> well (https://github.com/google/google-java-format). That's a
> > bigger
> > > > >> change
> > > > >> though, and I don't think it has checkstyle integration or
> anything
> > > like
> > > > >> that.
> > > > >>
> > > > >> On Tue, Aug 23, 2016 at 4:54 PM Dan Halperin
> > > > 
> > > > >> wrote:
> > > > >>
> > > > >>> yeah I think that we would be SO MUCH better off if we worked
> with
> > an
> > > > >>> out-of-the-box IDE. We don't even distribute an IntelliJ/Eclipse
> > > config
> > > > >>> file right now, and I'd like to not have to.
> > > > >>>
> > > > >>> But, ugh, it will mess up ongoing PRs. I guess committers could
> fix
> > > > them
> > > > >> in
> > > > >>> merge, or we could just make proposers rebase. (Since committers
> > are
> > > > most
> > > > >>> proposers, probably little harm in the latter).
> > > > >>>
> > > > >>> On Tue, Aug 23, 2016 at 4:11 PM, Jesse Anderson <
> > > je...@smokinghand.com
> > > > >
> > > > >>> wrote:
> > > > >>>
> > > >  Please. That's the one that always trips me up.
> > > > 
> > > >  On Tue, Aug 23, 2016, 4:10 PM Ben Chambers <
> bchamb...@apache.org>
> > > > >> wrote:
> > > > 
> > > > > When Beam was contributed it inherited an import order [1] that
> > was
> > > >  pretty
> > > > > arbitrary. We've added org.apache.beam [2], but continue to use
> > > this
> > > > > ordering.
> > > > >
> > > > > Both Eclipse and IntelliJ default to grouping imports into
> > > alphabetic
> > > > > order. I think it would simplify development if we switched our
> > > >  checkstyle
> > > > > ordering to agree with these IDEs. This also removes special
> > > > >> treatment
> > > >  for
> > > > > specific packages.
> > > > >
> > > > > If people agree, I'll send out a PR that changes the checkstyle
> > > > > configuration and runs IntelliJ's sort-imports on the existing
> > > files.
> > > > >
> > > > > -- Ben
> > > > >
> > > > > [1]
> > > > > org.apache.beam,com.google,android,com,io,Jama,junit,net,
> > > >  org,sun,java,javax
> > > > > [2] com.google,android,com,io,Jama,junit,net,org,sun,java,
> javax
> > > > >
> > > > 
> > > > >>>
> > > > >>
> > > > >
> > > >
> > > > --
> > > > Jean-Baptiste Onofré
> > > > jbono...@apache.org
> > > > http://blog.nanthrax.net
> > > > Talend - http://www.talend.com
> > > >
> > >
> >
>


Re: Utility for Serializing PipelineOptions

2016-08-03 Thread Lukasz Cwik
The core Java SDK already contains the code to serialize/deserialize
PipelineOptions using Jackson's ObjectMapper since PipelineOptions has
Jackson databind annotations @JsonSerialize and @JsonDeserialize.

Is there something more specific that you were looking for?

On Wed, Aug 3, 2016 at 12:54 PM, P. Taylor Goetz  wrote:

> Looking at the runner implementations for Spark, Flink, and Gearpump, they
> all have the same requirement of being able to serialize/deserialize
> PipelineOptions, and they each seem to use slightly different approaches.
> In working on a Beam runner for Storm I have the same requirement, and
> imagine other runner implementations might as well.
>
> Would it make sense to add this functionality in the core SDK?
>
> -Taylor
>


Re: [PROPOSAL] CoGBK as primitive transform instead of GBK

2016-07-26 Thread Lukasz Cwik
I created https://issues.apache.org/jira/browse/BEAM-490 which will track
the work for swapping the primitive from being GBK to CoGBK.

On Thu, Jul 21, 2016 at 9:24 AM, Lukasz Cwik <lc...@google.com> wrote:

> As of today, Cloud Dataflow will also be executed as a GBK.
>
> On Thu, Jul 21, 2016 at 2:56 AM, Aljoscha Krettek <aljos...@apache.org>
> wrote:
>
>> +1
>>
>> Out of curiosity, does Cloud Dataflow have a CoGBK primitive or will it
>> also be executed as a GBK there?
>>
>> On Thu, 21 Jul 2016 at 02:29 Kam Kasravi <kamkasr...@gmail.com> wrote:
>>
>> > +1 - awesome Manu.
>> >
>> > On Wednesday, July 20, 2016 1:53 PM, Kenneth Knowles
>> > <k...@google.com.INVALID> wrote:
>> >
>> >
>> >  +1
>> >
>> > I assume that the intent is for the semantics of both GBK and CoGBK to
>> be
>> > unchanged, just swapping their status as primitives.
>> >
>> > This seems like a good change, with strictly positive impact on users
>> and
>> > SDK authors, with only an extremely minor burden (doing an insertion of
>> the
>> > provided implementation in the worst case) on runner authors.
>> >
>> > Kenn
>> >
>> >
>> > On Wed, Jul 20, 2016 at 10:38 AM, Lukasz Cwik <lc...@google.com.invalid
>> >
>> > wrote:
>> >
>> > > I would like to propose a change to Beam to make CoGBK the basis for
>> > > grouping instead of GBK. The idea behind this proposal is that CoGBK
>> is a
>> > > more powerful operator then GBK allowing for two key benefits:
>> > >
>> > > 1) SDKs are simplified: transforming a CoGBK into a GBK is trivial
>> while
>> > > the reverse is not.
>> > > 2) It will be easier for runners to provide more efficient
>> > implementations
>> > > of CoGBK as they will be responsible for the logic which takes their
>> own
>> > > internal grouping implementation and maps it onto a CoGBK.
>> > >
>> > > This requires the following modifications to the Beam code base:
>> > >
>> > > 1) Make GBK a composite transform in terms of CoGBK.
>> > > 2) Move the CoGBK from contrib to runners-core as an adapter*. Runners
>> > that
>> > > more naturally support GBK can just use this and everything executes
>> > > exactly as before.
>> > >
>> > > *just like GroupByKeyViaGroupByKeyOnly and
>> UnboundedReadFromBoundedSource
>> > >
>> >
>> >
>> >
>>
>
>


Re: Beam/Flink : State access

2016-07-26 Thread Lukasz Cwik
I think your interested in https://issues.apache.org/jira/browse/BEAM-25
which is about exposing access to state in a runner independent way in the
Beam model. You can watch that issue and comment on design / development as
it is currently being worked on.

On Tue, Jul 26, 2016 at 8:53 AM, Aparup Banerjee (apbanerj) <
apban...@cisco.com> wrote:

> Yes and no, I think state access is a valid use cases , and should surface
> in beam layer , through the sdk abstraction, or through a separate function
> like RichMapFunction of Flink. Hence the question.
>
> Aparup
>
>
>
>
> On 7/26/16, 4:29 AM, "Aljoscha Krettek"  wrote:
>
> >Hi,
> >the purpose of Beam is to abstract the user from the underlying execution
> >engine. IMHO, allowing access to state of the underlying execution engine
> >will never be a goal for the Beam project.
> >
> >If you want/need to access Flink state, I think this is a good indicator
> >that you should use Flink directly because your programs would never run
> on
> >another runner anyways.
> >
> >Cheers,
> >Aljoscha
> >
> >On Mon, 25 Jul 2016 at 17:04 Aparup Banerjee (apbanerj) <
> apban...@cisco.com>
> >wrote:
> >
> >> I am looking for a way to access streaming engine state (Flink) in my
> beam
> >> transformations. I understand this can be accessed only though the
> runtime
> >> context. Has any one tried accessing flink runtime context in beam? May
> be
> >> roll it up as a custom API of some sort. Might need some changes in
> >> FlinkRunner is my hunch.
> >>
> >> Thanks,
> >> Aparup
> >>
>


Re: [Proposal] Add waitToFinish(), cancel(), waitToRunning() to PipelineResult.

2016-07-25 Thread Lukasz Cwik
+1 for your proposal Pei

On Mon, Jul 25, 2016 at 5:54 PM, Pei He  wrote:

> Looks to me that followings are agreed:
> (1). adding cancel() and waitUntilFinish() to PipelineResult.
> (In streaming mode, "all data watermarks reach to infinity" is
> considered as finished.)
> (2). PipelineRunner.run() should return relatively quick as soon as
> the pipeline/job is started/running. The blocking logic should be left
> to users' code to handle with PipelineResult.waitUntilFinish(). (Test
> runners that finish quickly can block run() until the execution is
> done. So, it is cleaner to verify test results after run())
>
> I will send out PR for (1), and create jira issues to improve runners for
> (2).
>
> waitToRunning() is controversial, and we have several half way agreed
> proposals.
> I will pull them out from this thread, so we can close this proposal
> with cancel() and waitUntilFinish(). And, i will create a jira issue
> to track how to support ''waiting until other states".
>
> Does that sound good with anyone?
>
> Thanks
> --
> Pei
>
> On Thu, Jul 21, 2016 at 4:32 PM, Robert Bradshaw
>  wrote:
> > On Thu, Jul 21, 2016 at 4:18 PM, Ben Chambers 
> wrote:
> >> This health check seems redundant with just waiting a while and then
> >> checking on the status, other than returning earlier in the case of
> >> reaching a terminal state. What about adding:
> >>
> >> /**
> >>  * Returns the state after waiting the specified duration. Will return
> >> earlier if the pipeline
> >>  * reaches a terminal state.
> >>  */
> >> State getStateAfter(Duration duration);
> >>
> >> This seems to be a useful building block, both for the user's pipeline
> (in
> >> case they wanted to build something like wait and then check health) and
> >> also for the SDK (to implement waitUntilFinished, etc.)
> >
> > A generic waitFor(Duration) which may return early if a terminal state
> > is entered seems useful. I don't know that we need a return value
> > here, given that we an then query the PipelineResult however we want
> > once this returns. waitUntilFinished is simply
> > waitFor(InfiniteDuration).
> >
> >> On Thu, Jul 21, 2016 at 4:11 PM Pei He 
> wrote:
> >>
> >>> I am not in favor of supporting wait for every states or
> >>> waitUntilState(...).
> >>> One reason is PipelineResult.State is not well defined and is not
> >>> agreed upon runners.
> >>> Another reason is users might not want to wait for a particular state.
> >>> For example,
> >>> waitUntilFinish() is to wait for a terminal state.
> >>> So, even runners have different states, we still can define shared
> >>> properties, such as finished/terminal.
> >
> > +1. Running is an intermediate state that doesn't have an obvious
> > mapping onto all runners, which is another reason it's odd to wait
> > until then. All runners have terminal states.
> >
> >>> I think when users call waitUntilRunning(), they want to make sure the
> >>> pipeline is up running and is healthy.
> >> > Maybe we want to wait for at
> >>> least one element went through the pipeline.
> >
> > -1, That might be a while... Also, you may not start generating data
> > until you pipline is up.
> >
> >>> What about changing the waitUntilRunning() to the following?
> >>>
> >>> /**
> >>> * Check if the pipeline is health for the duration.
> >>> *
> >>> * Return true if the pipeline is healthy at the end of duration.
> >>> * Return false if the pipeline is not healthy at the end of duration.
> >>> * It may return early if the pipeline is in an unrecoverable failure
> >>> state.
> >>> */
> >>> boolean PipelineResult.healthCheck(Duration duration)
> >>>
> >>> (I think this also addressed Robert's comment about waitToRunning())
> >>>
> >>> On Thu, Jul 21, 2016 at 1:08 PM, Kenneth Knowles
> 
> >>> wrote:
> >>> > Some more comments:
> >>> >
> >>> >  - What are the allowed/expected state transitions prior to RUNNING?
> >>> Today,
> >>> > I presume it is any nonterminal state, so it can be UNKNOWN or
> STOPPED
> >>> > (which really means "not yet started") prior to RUNNING. Is this
> what we
> >>> > want?
> >>> >
> >>> >  - If a job can be paused, a transition from RUNNING to STOPPED, then
> >>> > waitUntilPaused(Duration) makes sense.
> >>> >
> >>> >  - Assuming there is some polling under the hood, are runners
> required to
> >>> > send back a full history of transitions? Or can transitions be
> missed,
> >>> with
> >>> > only the latest state retrieved?
> >>> >
> >>> >  - If the latter, then does waitUntilRunning() only wait until
> RUNNING or
> >>> > does it also return when it sees STOPPED, which could certainly
> indicate
> >>> > that the job transitioned to RUNNING then STOPPED in between polls.
> In
> >>> that
> >>> > case it is, today, the same as waitUntilStateIsKnown().
> >>> >
> >>> >  - The obvious limit of this discussion is waitUntilState(Duration,
> >>> > Set), which is the same 

Re: [PROPOSAL] CoGBK as primitive transform instead of GBK

2016-07-21 Thread Lukasz Cwik
As of today, Cloud Dataflow will also be executed as a GBK.

On Thu, Jul 21, 2016 at 2:56 AM, Aljoscha Krettek <aljos...@apache.org>
wrote:

> +1
>
> Out of curiosity, does Cloud Dataflow have a CoGBK primitive or will it
> also be executed as a GBK there?
>
> On Thu, 21 Jul 2016 at 02:29 Kam Kasravi <kamkasr...@gmail.com> wrote:
>
> > +1 - awesome Manu.
> >
> > On Wednesday, July 20, 2016 1:53 PM, Kenneth Knowles
> > <k...@google.com.INVALID> wrote:
> >
> >
> >  +1
> >
> > I assume that the intent is for the semantics of both GBK and CoGBK to be
> > unchanged, just swapping their status as primitives.
> >
> > This seems like a good change, with strictly positive impact on users and
> > SDK authors, with only an extremely minor burden (doing an insertion of
> the
> > provided implementation in the worst case) on runner authors.
> >
> > Kenn
> >
> >
> > On Wed, Jul 20, 2016 at 10:38 AM, Lukasz Cwik <lc...@google.com.invalid>
> > wrote:
> >
> > > I would like to propose a change to Beam to make CoGBK the basis for
> > > grouping instead of GBK. The idea behind this proposal is that CoGBK
> is a
> > > more powerful operator then GBK allowing for two key benefits:
> > >
> > > 1) SDKs are simplified: transforming a CoGBK into a GBK is trivial
> while
> > > the reverse is not.
> > > 2) It will be easier for runners to provide more efficient
> > implementations
> > > of CoGBK as they will be responsible for the logic which takes their
> own
> > > internal grouping implementation and maps it onto a CoGBK.
> > >
> > > This requires the following modifications to the Beam code base:
> > >
> > > 1) Make GBK a composite transform in terms of CoGBK.
> > > 2) Move the CoGBK from contrib to runners-core as an adapter*. Runners
> > that
> > > more naturally support GBK can just use this and everything executes
> > > exactly as before.
> > >
> > > *just like GroupByKeyViaGroupByKeyOnly and
> UnboundedReadFromBoundedSource
> > >
> >
> >
> >
>


[PROPOSAL] CoGBK as primitive transform instead of GBK

2016-07-20 Thread Lukasz Cwik
I would like to propose a change to Beam to make CoGBK the basis for
grouping instead of GBK. The idea behind this proposal is that CoGBK is a
more powerful operator then GBK allowing for two key benefits:

1) SDKs are simplified: transforming a CoGBK into a GBK is trivial while
the reverse is not.
2) It will be easier for runners to provide more efficient implementations
of CoGBK as they will be responsible for the logic which takes their own
internal grouping implementation and maps it onto a CoGBK.

This requires the following modifications to the Beam code base:

1) Make GBK a composite transform in terms of CoGBK.
2) Move the CoGBK from contrib to runners-core as an adapter*. Runners that
more naturally support GBK can just use this and everything executes
exactly as before.

*just like GroupByKeyViaGroupByKeyOnly and UnboundedReadFromBoundedSource


Re: [DISCUSS] Beam data plane serialization tech

2016-07-15 Thread Lukasz Cwik
Just to give people an update, I'm still working on collecting data.

On Wed, Jun 29, 2016 at 10:47 AM, Aljoscha Krettek 
wrote:

> My bad, I didn't know that. Thanks for the clarification!
>
> On Wed, 29 Jun 2016 at 16:38 Daniel Kulp  wrote:
>
> >
> > > On Jun 27, 2016, at 10:24 AM, Aljoscha Krettek 
> > wrote:
> > >
> > > Out of the systems you suggested Thrift and ProtoBuf3 + gRPC are
> probably
> > > best suited for the task. Both of these provide a way for generating
> > > serializers as well as for specifying an RPC interface. Avro and
> > > FlatBuffers are only dealing in serializers and we would have to roll
> our
> > > own RPC system on top of these.
> >
> >
> > Just a point of clarification, Avro does handle RPC as well as
> > serialization.   It's one of the main bullets on their overview page:
> >
> > http://avro.apache.org/docs/current/index.html
> >
> > Unfortunately, their documentation around the subject really sucks.  Some
> > info at:
> >
> >
> https://cwiki.apache.org/confluence/display/AVRO/Porting+Existing+RPC+Frameworks
> >
> > and a “quick start”:
> >
> > https://github.com/phunt/avro-rpc-quickstart
> >
> >
> >
> > --
> > Daniel Kulp
> > dk...@apache.org - http://dankulp.com/blog
> > Talend Community Coder - http://coders.talend.com
> >
> >
>


Re: [jira] [Commented] (BEAM-434) When examples write output to file it creates many output files instead of one

2016-07-12 Thread Lukasz Cwik
If we go with any option that restricts the number of outputs then in the
example we should discuss what it does and why it is not considered a good
thing.

On Tue, Jul 12, 2016 at 2:11 AM, Amit Sela (JIRA)  wrote:

>
> [
> https://issues.apache.org/jira/browse/BEAM-434?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15372225#comment-15372225
> ]
>
> Amit Sela commented on BEAM-434:
> 
>
> I sort of prefer 2, but by letting the user pass the numShards
> configuration (which may need a better name)
> Like I mentioned in the PR, if we want to give a simple example result on
> one hand, while keeping in the user's mind the fact that multiple shards
> are a thing to consider, we could add a --numShards option and add it to
> the examples code with a default of 1 (or 3).
> If we want the users to know about multiple output shards, why should we
> keep the examples "pure" ?
>
> How about adding an option named "--numOutputShards" with default value 1
> (or 3, I could live with 3 :) ) and adding this to the examples README,
> thus giving a better experience in terms of "seeing" the output, while
> keeping the multiple-shards "on the table" and as a bonus, the Travis CI
> tests could still run with as many shards as we want (while I wanted
> examples to be easy enough, I definitely didn't want that for Travis!)
>
> WDYT ?
>
>
> > When examples write output to file it creates many output files instead
> of one
> >
> --
> >
> > Key: BEAM-434
> > URL: https://issues.apache.org/jira/browse/BEAM-434
> > Project: Beam
> >  Issue Type: Bug
> >  Components: examples-java
> >Reporter: Amit Sela
> >Assignee: Amit Sela
> >Priority: Minor
> >
> > When using `TextIO.Write.to("/path/to/output")` without any
> restrictions on the number of shards, it might generate many output files
> (depending on your input), for WordCount for example, you'll get as many
> output files as unique words in your input.
> > Since I think examples are expected to execute in a friendly manner to
> "see" what it does and not optimize for performance in some way, I suggest
> to use `withoutSharding()` when writing the example output to an output
> file.
> > Examples I could find that behave this way:
> > org.apache.beam.examples.WordCount
> > org.apache.beam.examples.complete.TfIdf
> > org.apache.beam.examples.cookbook.DeDupExample
>
>
>
> --
> This message was sent by Atlassian JIRA
> (v6.3.4#6332)
>


Re: [DISCUSS] Spark runner packaging

2016-07-07 Thread Lukasz Cwik
That makes a lot of sense. I can see other runners following suit where
there is a packaged up version for different scenarios / backend cluster
runtimes.

Should this be part of Apache Beam as a separate maven module or another
sub-module inside of Apache Beam, or something else?

On Thu, Jul 7, 2016 at 1:49 PM, Amit Sela  wrote:

> Hi everyone,
>
> Lately I've encountered a number of issues concerning the fact that the
> Spark runner does not package Spark along with it and forcing people to do
> this on their own.
> In addition, this seems to get in the way of having beam-examples executed
> against the Spark runner, again because it would have to add Spark
> dependencies.
>
> When running on a cluster (which I guess was the original goal here), it is
> recommended to have Spark provided by the cluster - this makes sense for
> Spark clusters and more so for Spark + YARN clusters where you might have
> your Spark built against a specific Hadoop version or using a vendor
> distribution.
>
> In order to make the runner more accessible to new adopters, I suggest to
> consider releasing a "spark-included" artifact as well.
>
> Thoughts ?
>
> Thanks,
> Amit
>


Re: Improvements to issue/version tracking

2016-06-28 Thread Lukasz Cwik
+1

On Tue, Jun 28, 2016 at 12:15 PM, Kenneth Knowles 
wrote:

> +1
>
> On Tue, Jun 28, 2016 at 12:06 AM, Jean-Baptiste Onofré 
> wrote:
>
> > +1
> >
> > Regards
> > JB
> >
> >
> > On 06/28/2016 01:01 AM, Davor Bonaci wrote:
> >
> >> Hi everyone,
> >> I'd like to propose a simple change in Beam JIRA that will hopefully
> >> improve our issue and version tracking -- to actually use the "Fix
> >> Versions" field as intended [1].
> >>
> >> The goal would be to simplify issue tracking, streamline generation of
> >> release notes, add a view of outstanding work towards a release, and
> >> clearly communicate which Beam version contains fixes for each issue.
> >>
> >> The standard usage of the field is:
> >> * For open (or in-progress/re-opened) issues, "Fix Versions" field is
> >> optional and indicates an unreleased version that this issue is
> targeting.
> >> The release is not expected to proceed unless this issue is fixed, or
> the
> >> field is changed.
> >> * For closed (or resolved) issues, "Fix Versions" field indicates a
> >> released or unreleased version that has the fix.
> >>
> >> I think the field should be mandatory once the issue is resolved/closed
> >> [4], so we make a deliberate choice about this. I propose we use "Not
> >> applicable" for all those issues that aren't being resolved as Fixed
> >> (e.g.,
> >> duplicates, working as intended, invalid, etc.) and those that aren't
> >> released (e.g., website, build system, etc.).
> >>
> >> We can then trivially view outstanding work for the next release [2], or
> >> generate release notes [3].
> >>
> >> I'd love to hear if there are any comments! I know that at least JB
> >> agrees,
> >> as he was convincing me on this -- thanks ;).
> >>
> >> Thanks,
> >> Davor
> >>
> >> [1]
> >>
> >>
> https://confluence.atlassian.com/adminjiraserver071/managing-versions-802592484.html
> >> [2]
> >>
> >>
> https://issues.apache.org/jira/browse/BEAM/fixforversion/12335766/?selectedTab=com.atlassian.jira.jira-projects-plugin:version-summary-panel
> >> [3]
> >>
> >>
> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12319527=12335764
> >> [4] https://issues.apache.org/jira/browse/INFRA-12120
> >>
> >>
> > --
> > Jean-Baptiste Onofré
> > jbono...@apache.org
> > http://blog.nanthrax.net
> > Talend - http://www.talend.com
> >
>


Re: [DISCUSS] Beam data plane serialization tech

2016-06-28 Thread Lukasz Cwik
a bundle? (example: side inputs)
> >
> > Any Fn API is required to have the same semantics as this simple
> proposal,
> > but should achieve it with superior performance. I'll leave off the
> details
> > since I am not authoring them personally. But let's assume as a baseline
> > the approach of executing a fused stage of same-language UDFs in a row
> > without any encoding/decoding or RPC, and making a single RPC call per
> > bundle (ignoring amortized round trips for streaming bytes).
> >
> > I gather from this thread these questions (which I may be interpreting
> > wrong; apologies if so) and I would like to answer them relative to this
> > design sketch:
> >
> > Q: Since we have one RPC per bundle and it goes through the whole fused
> > stage, and we have a whole stream of elements per call, doesn't the data
> > dominate the envelope?
> > A: In streaming executions, bundles can be very small, so the data will
> not
> > necessarily dominate.
> >
> > Q: Do we really need structured messages? Perhaps byte streams with
> fairly
> > trivial metadata suffice and we can just hand roll it?
> > A: I think that schematized tech is well-proven for adaptability and it
> is
> > also handy for code gen, regardless of performance. So to me the question
> > is whether or not we need structured messages at all, or if we can model
> > every high throughput communication as coder-encoded streams. I think
> that
> > things like commits to state, acknowledgements of timer firings,
> pull-based
> > requests like side inputs are probably best expressed via a schema. But
> > maybe I am overlooking some design ideas.
> >
> > Q: How will side inputs arrive?
> > A: This API is really designed to be pull-based, so it sort of implies a
> > great many small RPCs (and caching).
> >
> > I'm sure I've left off some discussion points, and maybe oversimplified
> > some things, but does this answer the questions somewhat? Does this
> clarify
> > the suggested choices of tech? Do you still think we don't need them?
> >
> > Kenn
> >
> > On Mon, Jun 20, 2016 at 7:48 AM, Bobby Evans <ev...@yahoo-inc.com.invalid
> >
> > wrote:
> >
> > > In storm we use JSON as the default communication between shell bolts
> and
> > > shell spouts, which allows for APIs in non JVM languages. It works
> rather
> > > well.  That being said it is also slow, and we made it a plugin so
> others
> > > could make their own, faster, implementations.  For storm both the data
> > and
> > > the control are serialized to JSON, so I am not sure how much of that
> is
> > > control and how much of it is the data that makes it slow.  I
> personally
> > > would like to see a simple benchmark that implements the basic protocol
> > > between the two so we can actually have a more numeric comparison.  As
> > well
> > > as any pain that someone experienced trying to implement even a proof
> of
> > > concept.
> > >
> > > I agree with Amit too that long term we may want to think about
> > supporting
> > > structured data, and rely less on POJOs.  It allows for a lot of
> > > optimizations in addition to having out of the box support for
> > > serializing/de-serializing them in another language. But perhaps that
> is
> > > more of a layer that sits on top of beam instead, because a lot of the
> > > optimizations really make the most since in a declarative DSL like
> > context.
> > >
> > >  - Bobby
> > >
> > >On Saturday, June 18, 2016 6:56 AM, Amit Sela <amitsel...@gmail.com
> >
> > > wrote:
> > >
> > >
> > >  My +1 for JSON was for the fact that it's common enough and simpler
> than
> > > Protbuff/Avro/Thrift, and I would guess that (almost) all languages
> > > acknowledge it, though I might be wrong here.
> > >
> > > As for KV & WindowedValue, I'm not sure what's the issue with Kryo, but
> > the
> > > "hardest" thing I had to do to get it working with Spark was to
> register
> > > 3rd party implementations for Guava Immutable collections. And I
> honestly
> > > don't know if there is one framework that covers everything in all
> > (common)
> > > languages.
> > >
> > > Finally, if I understand correctly, the suggestion is to transmit the
> > data
> > > as bytes with the appropriate coders, correct ? For the new Spark for
> > > example, they use Encoders
> > > <
> >

Re: Running examples with different runners

2016-06-24 Thread Lukasz Cwik
Below I outline a different approach than the DirectRunner which didn't
require an override for Create since it knows that there was no data
remaining and can correctly shut the pipeline down by pushing the watermark
all the way through the pipeline. This is a superior approach but I believe
is more difficult to get right.

PAssert emits an aggregator with a specific name which states that the
PAssert succeeded or failed:
https://github.com/apache/incubator-beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java#L110

The test Dataflow runner counts how many PAsserts were applied and then
polls itself every 10 seconds checking to see if the aggregator has any
failures or all the successes for streaming pipelines.
Polling logic:
https://github.com/apache/incubator-beam/blob/master/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunner.java#L114
Check logic:
https://github.com/apache/incubator-beam/blob/master/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunner.java#L177

As for overriding a transform, the runner is currently invoked during
application of a transform and is able to inject/replace/modify the
transform that was being applied. The test Dataflow runner uses this a
little bit to do the PAssert counting while the normal Dataflow runner does
this a lot for its own specific needs.

Finally, I believe Ken just made some changes which removed the requirement
to support View.YYY and replaced it with GroupByKey so the no translator
registered for View... may go away.


On Fri, Jun 24, 2016 at 4:52 PM, Thomas Weise <thomas.we...@gmail.com>
wrote:

> Kenneth and Lukasz, thanks for the direction.
>
> Is there any information about other requirements to run the cross runner
> tests and hints to troubleshoot. On first attempt they mosty fail due to
> missing translator:
>
> PAssertTest.testIsEqualTo:219 ▒ IllegalState no translator registered for
> View...
>
> Also, for run() to be synchronous or wait, there needs to be an exit
> condition. I know how to solve this for the Apex runner specific tests. But
> for the cross runner tests, what is the recommended way to do this? Kenneth
> mentioned that Create could signal end of stream. Should I look to override
> the Create transformation to configure the behavior ((just for this test
> suite) and if so, is there an example how to do this cleanly?
>
> Thanks,
> Thomas
>
>
>
>
> On Tue, Jun 21, 2016 at 7:32 PM, Kenneth Knowles <k...@google.com.invalid>
> wrote:
>
> > To expand on the RunnableOnService test suggestion, here [1] is the
> commit
> > from the Spark runner. You will get a lot more information if you can
> port
> > this for your runner than you would from an example end-to-end test.
> >
> > Note that this just pulls in the tests from the core SDK. For testing
> with
> > other I/O connectors, you'll add them to the dependenciesToScan.
> >
> > [1]
> >
> >
> https://github.com/apache/incubator-beam/commit/4254749bf103c4bb6f68e316768c0aa46d9f7df0
> >
> > On Tue, Jun 21, 2016 at 4:06 PM, Lukasz Cwik <lc...@google.com.invalid>
> > wrote:
> >
> > > There is a start to getting more e2e like integration tests going with
> > the
> > > first being WordCount.
> > >
> > >
> >
> https://github.com/apache/incubator-beam/blob/master/examples/java/src/test/java/org/apache/beam/examples/WordCountIT.java
> > > You could add WindowedWordCountIT.java which will be launched with the
> > > proper configuration of the Apex runner pom.xml
> > >
> > > I would also suggest that you take a look at the @RunnableOnService
> tests
> > > which are a comprehensive validation suite of ~200ish tests that test
> > > everything from triggers to side inputs. It requires some pom changes
> and
> > > creating a test runner which is able to setup an apex environment.
> > >
> > > Furthermore, we could really use an addition to the Beam wiki about
> > testing
> > > and how runners write tests/execute tests/...
> > >
> > > Some relevant links:
> > > Older presentation about getting cross runner tests going:
> > >
> > >
> >
> https://docs.google.com/presentation/d/1uTb7dx4-Y2OM_B0_3XF_whwAL2FlDTTuq2QzP9sJ4Mg/edit#slide=id.g127d614316_19_39
> > >
> > > Examples of test runners:
> > >
> > >
> >
> https://github.com/apache/incubator-beam/blob/master/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunner.java
> > >
> > >
> >
> https://github.com/a

Re: Scala DSL

2016-06-24 Thread Lukasz Cwik
+1 for dsls/scio for the already listed reasons

On Fri, Jun 24, 2016 at 11:21 AM, Rafal Wojdyla 
wrote:

> Hello. When it comes to SDK vs DSL - I fully agree with Frances. About
> dsls/java/scio or dsls/scio - dsls/java/scio may cause confusion, scio is a
> scala DSL but lives under java directory (?) - that makes sense only once
> you get that scio is using java SDK under the hood. Thus, +1 to dsls/scio.
> - Rafal
>
> On Fri, Jun 24, 2016 at 2:01 PM, Kenneth Knowles 
> wrote:
>
> > My +1 goes to dsls/scio. It already has a cool name, so let's use it. And
> > there might be other Scala-based DSLs.
> >
> > On Fri, Jun 24, 2016 at 8:39 AM, Ismaël Mejía  wrote:
> >
> > > ​Hello everyone,
> > >
> > > Neville, thanks a lot for your contribution. Your work is amazing and I
> > am
> > > really happy that this scala integration is finally happening.
> > > Congratulations to you and your team.
> > >
> > > I *strongly* disagree about the DSL classification for scio for one
> > reason,
> > > if you go to the root of the term, Domain Specific Languages are about
> a
> > > domain, and the domain in this case is writing Beam pipelines, which
> is a
> > > really broad domain.
> > >
> > > I agree with Frances’ argument that scio is not an SDK e.g. it reuses
> the
> > > existing Beam java SDK. My proposition is that scio will be called the
> > > Scala API because in the end this is what it is. I think the confusion
> > > comes from the common definition of SDK which is normally an API + a
> > > Runtime. In this case scio will share the runtime with what we call the
> > > Beam Java SDK.
> > >
> > > One additional point of using the term API is that it sends the clear
> > > message that Beam has a Scala API too (which is good for visibility as
> JB
> > > mentioned).
> > >
> > > Regards,
> > > Ismaël​
> > >
> > >
> > > On Fri, Jun 24, 2016 at 5:08 PM, Jean-Baptiste Onofré  >
> > > wrote:
> > >
> > > > Hi Dan,
> > > >
> > > > fair enough.
> > > >
> > > > As I'm also working on new DSLs (XML, JSON), I already created the
> dsls
> > > > module.
> > > >
> > > > So, I would say dsls/scala.
> > > >
> > > > WDYT ?
> > > >
> > > > Regards
> > > > JB
> > > >
> > > >
> > > > On 06/24/2016 05:07 PM, Dan Halperin wrote:
> > > >
> > > >> I don't think that sdks/scala is the right place -- scio is not a
> Beam
> > > >> Scala SDK; it wraps the existing Java SDK.
> > > >>
> > > >> Some options:
> > > >> * sdks/java/extensions  (Scio builds on the Java SDK) -- mentally
> > vetoed
> > > >> since Scio isn't an extension for the Java SDK, but rather a wrapper
> > > >>
> > > >> * dsls/java/scio (Scio is a Beam DSL that uses the Java SDK)
> > > >> * dsls/scio  (Scio is a Beam DSL that could eventually use multiple
> > > SDKs)
> > > >> * extensions/java/scio  (Scio is an extension of Beam that uses the
> > Java
> > > >> SDK)
> > > >> * extensions/scio  (Scio is an extension of Beam that is not limited
> > to
> > > >> one
> > > >> SDK)
> > > >>
> > > >> I lean towards either dsls/java/scio or extensions/java/scio, since
> I
> > > >> don't
> > > >> think there are plans for Scio to handle multiple different SDKs (in
> > > >> different languages). The question between these two is whether we
> > think
> > > >> DSLs are "big enough" to be a top level concept.
> > > >>
> > > >> On Thu, Jun 23, 2016 at 11:05 PM, Jean-Baptiste Onofré <
> > j...@nanthrax.net
> > > >
> > > >> wrote:
> > > >>
> > > >> Good point about new Fn and the fact it's based on the Java SDK.
> > > >>>
> > > >>> It's just that in term of "marketing", it's a good message to
> > provide a
> > > >>> Scala SDK even if technically it's more a DSL.
> > > >>>
> > > >>> For instance, a valid "marketing" DSL would be a Java fluent DSL on
> > top
> > > >>> of
> > > >>> the Java SDK, or a declarative XML DSL.
> > > >>>
> > > >>> However, from a technical perspective, it can go into dsl module.
> > > >>>
> > > >>> My $0.02 ;)
> > > >>>
> > > >>> Regards
> > > >>> JB
> > > >>>
> > > >>>
> > > >>> On 06/24/2016 06:51 AM, Frances Perry wrote:
> > > >>>
> > > >>> +Rafal & Andrew again
> > > 
> > >  I am leaning DSL for two reasons: (1) scio uses the existing java
> > >  execution
> > >  environment (and won't have a language-specific fn harness of its
> > > own),
> > >  and
> > >  (2) it changes the abstractions that users interact with.
> > > 
> > >  I recently saw a scio repl demo from Reuven -- there's some really
> > > cool
> > >  stuff in there. I'd love to dive into it a bit more and see what
> can
> > > be
> > >  generalized beyond scio. The repl-like interactive graph
> > construction
> > > is
> > >  very similar to what we've seen with ipython, in that it doesn't
> > > always
> > >  play nicely with the graph construction / graph execution
> > > distinction. I
> > >  wonder what changes to Beam might more generally support this. The
> > >  

Re: [DISCUSS] PTransform.named vs. named apply

2016-06-22 Thread Lukasz Cwik
+1 on your proposed solution

On Wed, Jun 22, 2016 at 3:17 PM, Ben Chambers  wrote:

> Based on a recent PR (https://github.com/apache/incubator-beam/pull/468) I
> was reminded of the confusion around the use of
> .apply(transform.named(someName)) and .apply(someName, transform). This is
> one of things I’ve wanted to cleanup for a while. I’d like to propose a
> path towards removing this redundancy.
>
> First, some background -- why are there two ways to name things? When we
> added support for updating existing pipelines, we needed all applications
> to have unique user-provided names to allow diff’ing the pipelines. We
> found a few problems with the first approach -- using .named() to create a
> new transform -- which led to the introduction of the named apply:
>
> 1. When receiving an error about an application not having a name, it is
> not obvious that a name should be given to the *transform*
> 2. When using .named() to construct a new transform either the type
> information is lost or the composite transform has to override .named()
>
> We now generally suggest the use of .apply(someName, transform). It is
> easier to use and doesn’t lead to as much confusion around PTransform names
> and PTransform application names.
>
> To that end, I'd like to propose the following changes to the code and
> documentation:
> 1. Replace the usage of .named(name) in all examples and composites with
> the named-apply syntax.
> 2. Replace .named(name) with a protected PTransform constructor which takes
> a default name. If not provided, the default name will be derived from the
> class of the PTransform.
> 3. Use the protected constructor in composites (where appropriate) to
> ensure that the default application has a reasonable name.
>
> Users will benefit from having a single way of naming applications while
> building a pipeline. Any breakages due to the removal of .named should be
> easily fixed by either using the named application or by passing the name
> to the constructor of a composite.
>
> I’d like to hear any comments or opinions on this topic from the wider
> community. Please let me know what you think!
>
> -- Ben
>


Re: Running examples with different runners

2016-06-21 Thread Lukasz Cwik
There is a start to getting more e2e like integration tests going with the
first being WordCount.
https://github.com/apache/incubator-beam/blob/master/examples/java/src/test/java/org/apache/beam/examples/WordCountIT.java
You could add WindowedWordCountIT.java which will be launched with the
proper configuration of the Apex runner pom.xml

I would also suggest that you take a look at the @RunnableOnService tests
which are a comprehensive validation suite of ~200ish tests that test
everything from triggers to side inputs. It requires some pom changes and
creating a test runner which is able to setup an apex environment.

Furthermore, we could really use an addition to the Beam wiki about testing
and how runners write tests/execute tests/...

Some relevant links:
Older presentation about getting cross runner tests going:
https://docs.google.com/presentation/d/1uTb7dx4-Y2OM_B0_3XF_whwAL2FlDTTuq2QzP9sJ4Mg/edit#slide=id.g127d614316_19_39

Examples of test runners:
https://github.com/apache/incubator-beam/blob/master/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunner.java
https://github.com/apache/incubator-beam/blob/master/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/TestFlinkRunner.java
https://github.com/apache/incubator-beam/blob/master/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkRunner.java

Section of pom dedicated to enabling runnable on service tests:
https://github.com/apache/incubator-beam/blob/master/runners/spark/pom.xml#L54

On Tue, Jun 21, 2016 at 2:21 PM, Thomas Weise 
wrote:

> Hi,
>
> As part of the Apex runner, we have a few unit tests for the supported
> transformations. Next, I would like to test the WindowedWordCount example.
>
> Is there an example of configuring this pipeline for another runner? Is it
> recommended to supply such configuration as a JUnit test? What is the
> general (repeatable?) approach to exercise different runners with the set
> of example pipelines?
>
> Thanks,
> Thomas
>


Re: Using GroupAlsoByWindowViaWindowSetDoFn for stream of input element

2016-06-21 Thread Lukasz Cwik
Maybe, the issue is that pushing the combine function upstream impacts the
windowing and triggering behavior of the GBK. I don't believe its as simple
as always being able to push the combiner upstream and it depends on how a
runner has decided to implement GBK.


On Tue, Jun 21, 2016 at 9:58 AM, Thomas Weise 
wrote:

> Hi Thomas,
>
> Thanks for the info. When the pipeline contains:
>
> .apply(Count.perElement())
>
> The translation looks as follows:
>
> 58   [main] DEBUG org.apache.beam.runners.apex.ApexPipelineTranslator  -
> entering composite transform Count.PerElement
> 58   [main] DEBUG org.apache.beam.runners.apex.ApexPipelineTranslator  -
> visiting transform Init [AnonymousParDo]
> 58   [main] DEBUG org.apache.beam.runners.apex.ApexPipelineTranslator  -
> visiting value Count.PerElement/Init.out [PCollection]
> 58   [main] DEBUG org.apache.beam.runners.apex.ApexPipelineTranslator  -
> entering composite transform Count.PerKey [Combine.PerKey]
> 58   [main] DEBUG org.apache.beam.runners.apex.ApexPipelineTranslator  -
> visiting transform GroupByKey
> 93   [main] DEBUG org.apache.beam.runners.apex.ApexPipelineTranslator  -
> visiting value Count.PerElement/Count.PerKey/GroupByKey.out [PCollection]
> 93   [main] DEBUG org.apache.beam.runners.apex.ApexPipelineTranslator  -
> entering composite transform Combine.GroupedValues
> 93   [main] DEBUG org.apache.beam.runners.apex.ApexPipelineTranslator  -
> visiting transform AnonymousParDo
> 93   [main] DEBUG org.apache.beam.runners.apex.ApexPipelineTranslator  -
> visiting value
> Count.PerElement/Count.PerKey/Combine.GroupedValues/AnonymousParDo.out
> [PCollection]
> 93   [main] DEBUG org.apache.beam.runners.apex.ApexPipelineTranslator  -
> leaving composite transform Combine.GroupedValues
> 93   [main] DEBUG org.apache.beam.runners.apex.ApexPipelineTranslator  -
> leaving composite transform Count.PerKey [Combine.PerKey]
> 93   [main] DEBUG org.apache.beam.runners.apex.ApexPipelineTranslator  -
> leaving composite transform Count.PerElement
>
> So the runner's translator needs to take care of pushing the combine
> function upstream, when it is possible. I was wondering whether this is
> something that could be handled in a runner independent way?
>
> Thanks,
> Thomas
>
>
>
>
> On Fri, Jun 17, 2016 at 10:19 AM, Thomas Groh 
> wrote:
>
> > Generally, the above code snippet will work, producing (after trigger
> > firing) an output Iterable containing all of the input elements. It
> may
> > be notable that timers (and TimerInternals) are also per-key, so that
> > interface must also be updated per element.
> >
> > By specifying the ReduceFn of the ReduceFnRunner, you can change how the
> > ReduceFnRunner adds and merges state. The combining ReduceFn is suitable
> > for use with upstream CombineFns, while buffering is suitable for general
> > use.
> >
> > On Fri, Jun 17, 2016 at 9:52 AM, Thomas Weise 
> > wrote:
> >
> > > The source for my windowed groupByKey experiment is here:
> > >
> > >
> > >
> >
> https://github.com/tweise/incubator-beam/blob/master/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/functions/ApexGroupByKeyOperator.java
> > >
> > > The result is Iterable. In cases such as counting, what is the
> > > recommended way to perform the incremental aggregation, without
> building
> > an
> > > intermediate collection?
> > >
> > > Thomas
> > >
> > > On Fri, Jun 17, 2016 at 8:27 AM, Thomas Weise 
> > > wrote:
> > >
> > > > Hi,
> > > >
> > > > I'm looking into using the GroupAlsoByWindowViaWindowSetDoFn to
> > > accumulate
> > > > the windowed state with the elements arriving one by one (stream).
> > > >
> > > > Once the window is complete, I would like to emit an Iterable or
> > > > another form of aggregation of the elements. Is the following
> supposed
> > to
> > > > lead to merging of current element with previously received elements
> > for
> > > > the same window:
> > > >
> > > > KeyedWorkItem kwi = KeyedWorkItems.elementsWorkItem(
> > > > kv.getKey(),
> > > > Collections.singletonList(updatedWindowedValue));
> > > >
> > > > context.setElement(kwi, getStateInternalsForKey(kwi.key()));
> > > > fn.processElement(context);
> > > >
> > > > The input here are always single elements.
> > > >
> > > > Thanks,
> > > > Thomas
> > > >
> > > >
> > > >
> > >
> >
>


Re: 0.1.0-incubating release

2016-06-07 Thread Lukasz Cwik
+1

On Tue, Jun 7, 2016 at 8:54 AM, Dan Halperin 
wrote:

> +2! This seems most concordant with other Apache products and the most
> future-proof.
>
> On Mon, Jun 6, 2016 at 9:35 PM, Jean-Baptiste Onofré 
> wrote:
>
> > +1
> >
> > Regards
> > JB
> >
> >
> > On 06/07/2016 02:49 AM, Davor Bonaci wrote:
> >
> >> After a few rounds of discussions and examining patterns of other
> >> projects,
> >> I think we are converging towards:
> >>
> >> * A flat group structure, where all artifacts belong to the
> >> org.apache.beam
> >> group.
> >> * Prefix all artifact ids with "beam-".
> >> * Name artifacts according to the existing directory/module layout:
> >> beam-sdks-java-core, beam-runners-google-cloud-dataflow-java, etc.
> >> * Suffix all parents with "-parent", e.g., "beam-parent",
> >> "sdks-java-parent", etc.
> >> * Create a "distributions" module, for the purpose of packaging the
> source
> >> code for the ASF release.
> >>
> >> I believe this approach takes into account everybody's feedback so far,
> >> and
> >> I think opposing positions have been retracted.
> >>
> >> Please comment if that's not the case, or if there are any additional
> >> points that we may have missed. If not, this is implemented in pending
> >> pull
> >> requests #420 and #423.
> >>
> >> Thanks!
> >>
> >> On Fri, Jun 3, 2016 at 9:59 AM, Thomas Weise 
> >> wrote:
> >>
> >> Another consideration for potential future packaging/distribution
> >>> solutions
> >>> is how the artifacts line up as files in a flat directory. For that it
> >>> may
> >>> be good to have a common prefix in the artifactId and unique
> artifactId.
> >>>
> >>> The name for the source archive (when relying on ASF parent POM) can
> also
> >>> be controlled without expanding the artifactId:
> >>>
> >>>   
> >>>  
> >>>
> >>>  maven-assembly-plugin
> >>>  
> >>>apache-beam
> >>>  
> >>>
> >>>  
> >>>   
> >>>
> >>> Thanks,
> >>> Thomas
> >>>
> >>> On Fri, Jun 3, 2016 at 9:39 AM, Davor Bonaci  >
> >>> wrote:
> >>>
> >>> BEAM-315 is definitely important. Normally, I'd always advocate for
> 
> >>> holding
> >>>
>  the release to pick that fix. For the very first release, however, I'd
>  prefer to proceed to get something out there and test the process. As
>  you
>  said, we can address this rather quickly once we have the fix merged
> in.
> 
>  In terms of Maven coordinates, there are two basic approaches:
>  * flat structure, where artifacts live under "org.apache.beam" group
> and
>  are differentiated by their artifact id.
>  * hierarchical structure, where we use different groups for different
> 
> >>> types
> >>>
>  of artifacts (org.apache.beam.sdks; org.apache.beam.runners).
> 
>  There are pros and cons on the both sides of the argument. Different
>  projects made different choices. Flat structure is easier to find and
>  navigate, but often breaks down with too many artifacts. Hierarchical
>  structure is just the opposite.
> 
>  On my end, the only important thing is consistency. We used to have
> it,
> 
> >>> and
> >>>
>  it got broken by PR #365. This part should be fixed -- we should
> either
>  finish the vision of the hierarchical structure, or rollback that PR
> to
> 
> >>> get
> >>>
>  back to a fully flat structure.
> 
>  My general biases tend to be:
>  * hierarchical structure, since we have many artifacts already.
>  * short identifiers; no need to repeat a part of the group id in the
>  artifact id.
> 
>  On Fri, Jun 3, 2016 at 4:03 AM, Jean-Baptiste Onofré  >
>  wrote:
> 
>  Hi Max,
> >
> > I discussed with Davor yesterday. Basically, I proposed:
> >
> > 1. To rename all parent with a prefix (beam-parent,
> >
>  flink-runner-parent,
> >>>
>  spark-runner-parent, etc).
> > 2. For the groupId, I prefer to use different groupId, it's clearer
> to
> >
>  me,
> 
> > and it's exactly the usage of the groupId. Some projects use a single
> > groupId (spark, hadoop, etc), others use multiple (camel, karaf,
> >
>  activemq,
> 
> > etc). I prefer different groupIds but ok to go back to single one.
> >
> > Anyway, I'm preparing a PR to introduce a new Maven module:
> > "distribution". The purpose is to address both BEAM-319 (first) and
> > BEAM-320 (later). It's where we will be able to define the different
> > distributions we plan to publish (source and binaries).
> >
> > Regards
> > JB
> >
> >
> > On 06/03/2016 11:02 AM, Maximilian Michels wrote:
> >
> > Thanks for getting us ready for the first release, Davor! We would
> >> like to fix BEAM-315 next week. Is there already a timeline for 

Re: PROPOSAL: Apache Beam (virtual) meeting: 05/11/2016 08:00 - 11:00 Pacific time

2016-04-12 Thread Lukasz Cwik
5/4/2016 works for me.

On Tue, Apr 12, 2016 at 8:05 AM, James Malone <
jamesmal...@google.com.invalid> wrote:

> Hey JB,
>
> Sorry for the late reply! That is a good point; apologies I missed noticing
> that conflict. For everyone in the community, how would one of the
> following alternatives work?
>
> 5/4/2016 - 8:00 - 11:00 AM Pacific time
> -or-
> 5/18/2016 - 8:00 - 11:00 AM Pacific time
>
> Best,
>
> James
>
> On Mon, Apr 11, 2016 at 11:17 AM, Lukasz Cwik <lc...@google.com.invalid>
> wrote:
>
> > That works for me.
> > But it would be best if people just posted when they are available
> > depending on the goal/scope of the meeting and then a date is chosen.
> >
> > On Sun, Apr 10, 2016 at 9:40 PM, Jean-Baptiste Onofré <j...@nanthrax.net>
> > wrote:
> >
> > > OK, what about the week before ApacheCon ?
> > >
> > > Regards
> > > JB
> > >
> > >
> > > On 04/11/2016 04:22 AM, Lukasz Cwik wrote:
> > >
> > >> I will be gone May 14th - 31st so would prefer a date before that.
> > >>
> > >> On Fri, Apr 8, 2016 at 10:23 PM, Jean-Baptiste Onofré <
> j...@nanthrax.net>
> > >> wrote:
> > >>
> > >> Hi James,
> > >>>
> > >>> May 11th is during the ApacheCon Vancouver.
> > >>>
> > >>> As some Beam current and potential contributors could be busy at
> > >>> ApacheCon, maybe it's better to postpone to May 18th.
> > >>>
> > >>> WDYT ?
> > >>>
> > >>> Regards
> > >>> JB
> > >>>
> > >>>
> > >>> On 04/08/2016 10:37 PM, James Malone wrote:
> > >>>
> > >>> Hello everyone,
> > >>>>
> > >>>> I'd like to propose holding a meeting in May to discuss a few Apache
> > >>>> Beam
> > >>>> topics. This could be a good venue to discuss design proposals,
> gather
> > >>>> technical feedback, and the state of the Beam community. My thinking
> > is
> > >>>> we
> > >>>> will be able to cover two or three Apache Beam topics in depth over
> > the
> > >>>> course of a few hours.
> > >>>>
> > >>>> To make the meeting accessible to the community, I propose a virtual
> > >>>> meeting on:
> > >>>>
> > >>>> Wednesday May 11th (2016/05/11)
> > >>>> 8:00 AM - 11:00 AM Pacific
> > >>>>
> > >>>> Since time may be limited, I propose agenda items recommended by the
> > >>>> PPMC
> > >>>> are given preferences. Before the meeting we can finalize the method
> > >>>> used
> > >>>> for the virtual meeting (like Google hangouts) and the finalized
> > agenda.
> > >>>> I'm also happy to volunteer myself for taking notes and coordinating
> > the
> > >>>> event.
> > >>>>
> > >>>> Best,
> > >>>>
> > >>>> James
> > >>>>
> > >>>>
> > >>>> --
> > >>> Jean-Baptiste Onofré
> > >>> jbono...@apache.org
> > >>> http://blog.nanthrax.net
> > >>> Talend - http://www.talend.com
> > >>>
> > >>>
> > >>
> > > --
> > > Jean-Baptiste Onofré
> > > jbono...@apache.org
> > > http://blog.nanthrax.net
> > > Talend - http://www.talend.com
> > >
> >
>


Re: PROPOSAL: Apache Beam (virtual) meeting: 05/11/2016 08:00 - 11:00 Pacific time

2016-04-10 Thread Lukasz Cwik
I will be gone May 14th - 31st so would prefer a date before that.

On Fri, Apr 8, 2016 at 10:23 PM, Jean-Baptiste Onofré 
wrote:

> Hi James,
>
> May 11th is during the ApacheCon Vancouver.
>
> As some Beam current and potential contributors could be busy at
> ApacheCon, maybe it's better to postpone to May 18th.
>
> WDYT ?
>
> Regards
> JB
>
>
> On 04/08/2016 10:37 PM, James Malone wrote:
>
>> Hello everyone,
>>
>> I'd like to propose holding a meeting in May to discuss a few Apache Beam
>> topics. This could be a good venue to discuss design proposals, gather
>> technical feedback, and the state of the Beam community. My thinking is we
>> will be able to cover two or three Apache Beam topics in depth over the
>> course of a few hours.
>>
>> To make the meeting accessible to the community, I propose a virtual
>> meeting on:
>>
>> Wednesday May 11th (2016/05/11)
>> 8:00 AM - 11:00 AM Pacific
>>
>> Since time may be limited, I propose agenda items recommended by the PPMC
>> are given preferences. Before the meeting we can finalize the method used
>> for the virtual meeting (like Google hangouts) and the finalized agenda.
>> I'm also happy to volunteer myself for taking notes and coordinating the
>> event.
>>
>> Best,
>>
>> James
>>
>>
> --
> Jean-Baptiste Onofré
> jbono...@apache.org
> http://blog.nanthrax.net
> Talend - http://www.talend.com
>


Re: Gearpump Runner Design Document

2016-04-08 Thread Lukasz Cwik
I went through the doc and added some comments.

On Fri, Apr 8, 2016 at 7:32 AM, Jean-Baptiste Onofré 
wrote:

> Hi Manu,
>
> awesome !
>
> Thanks for sharing this !
>
> I will take a look.
>
> Regards
> JB
>
>
> On 04/08/2016 03:38 AM, Manu Zhang wrote:
>
>> Hi JB,
>>
>> - it seems the Gearpump Serializer (based on Kryo) can be expressed as a
>>
>>> Coder. You mean because you have only binary "input", whereas the Coder
>>> needs a specific type. Right ?
>>>
>>
>>
>> Yes. I'm thinking about adding a typed serializer interface to Gearpump
>> and
>>   the default serializer based on Kryo could be type of "Any" in Scala.
>> Then
>> it's trivial to add support for Coder.
>>
>>
>>   - Any plan for the side inputs (I don't know if we can do it easily in
>>
>>> Gearpump) ?
>>>
>>
>>
>> Certainly, it's in our plan. Off the top of my head, Gearpump is built on
>> Akka Actors and side inputs could be actor states which we can query with
>> messages. Any good ideas about how a do it in a streaming engine (not
>> specifically on Gearpump) ?
>>
>> The code is at
>> https://github.com/manuzhang/incubator-beam/tree/gearpump_runner. Nothing
>> is working yet and I still need to fix styles, add docs and tests. The
>> focus now is to get a working example first.
>>
>> I also created an umbrella jira (
>> https://issues.apache.org/jira/browse/GEARPUMP-21) to track status and
>> related issues at Gearpump side and notify the Gearpump community.
>>
>> Regards,
>> Manu
>>
>> On Thu, Apr 7, 2016 at 1:45 PM, Jean-Baptiste Onofré 
>> wrote:
>>
>> Hi Manu,
>>>
>>> I replied in the Jira.
>>>
>>> It sounds good to me.
>>>
>>> Just a question about the gaps:
>>> - it seems the Gearpump Serializer (based on Kryo) can be expressed as a
>>> Coder. You mean because you have only binary "input", whereas the Coder
>>> needs a specific type. Right ?
>>> - Any plan for the side inputs (I don't know if we can do it easily in
>>> Gearpump) ?
>>>
>>> Is it possible to see the code in progress ?
>>>
>>> Thanks !
>>>
>>> Regards
>>> JB
>>>
>>> On 04/07/2016 06:24 AM, Manu Zhang wrote:
>>>
>>> Hi all,

 I've drafted a design document
 <

 https://drive.google.com/open?id=1nw64QUWVfT8L7FUprPGLEeNjSBpDMkn1otfLt2rHM5g

> for
>
 Gearpump Runner (https://issues.apache.org/jira/browse/BEAM-79). Please
 kindly offer your comment.


 Thanks,
 Manu


 --
>>> Jean-Baptiste Onofré
>>> jbono...@apache.org
>>> http://blog.nanthrax.net
>>> Talend - http://www.talend.com
>>>
>>>
>>
> --
> Jean-Baptiste Onofré
> jbono...@apache.org
> http://blog.nanthrax.net
> Talend - http://www.talend.com
>


Re: [jira] [Commented] (BEAM-68) Support for limiting parallelism of a step

2016-03-31 Thread Lukasz Cwik
I believe you can guarantee the number of shards (with a more complex set
of transforms). You just need to figure out which shards are empty, and
force the write operation. We can have two implementations of write, one
which doesn't write when zero elements (the default), and one which does go
through the motions of doing the write for zero elements.

num shards is a parallel limit control, it doesn't scale already. The thing
we lose most is the ability to dynamically rebalance work if there is a
straggler.

overly restrictive implementation, this is one of those cases where you
have a composite ptransform which has a basic implementation using GBK
underneath the hood which runners can override if they can force the
parallelism constraint in a better way.


On Wed, Mar 30, 2016 at 10:28 PM, Daniel Halperin (JIRA) 
wrote:

>
> [
> https://issues.apache.org/jira/browse/BEAM-68?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15219376#comment-15219376
> ]
>
> Daniel Halperin commented on BEAM-68:
> -
>
> Okay, I think I'm partially wrong.
>
> KV -> ParDo(process all elements in a single DoFn with
> per-K startBundle/endBundle/etc) is doable as a solution to BEAM-92.
>-It won't of course work with empty K, so you can't in fact guarantee
> numShards is matched.
>-It won't scale.
>-It overly restricts implementation.
> but I think it works, in essence, without a model change.
>
> Would you prefer to dupe 169 against 92? I don't see a need for more bug
> bloat here tho. Have suggested edits to the text of either bug that will
> fix?
>
> > Support for limiting parallelism of a step
> > --
> >
> > Key: BEAM-68
> > URL: https://issues.apache.org/jira/browse/BEAM-68
> > Project: Beam
> >  Issue Type: New Feature
> >  Components: beam-model
> >Reporter: Daniel Halperin
> >
> > Users may want to limit the parallelism of a step. Two classic uses
> cases are:
> > - User wants to produce at most k files, so sets
> TextIO.Write.withNumShards(k).
> > - External API only supports k QPS, so user sets a limit of k/(expected
> QPS/step) on the ParDo that makes the API call.
> > Unfortunately, there is no way to do this effectively within the Beam
> model. A GroupByKey with exactly k keys will guarantee that only k elements
> are produced, but runners are free to break fusion in ways that each
> element may be processed in parallel later.
> > To implement this functionaltiy, I believe we need to add this support
> to the Beam Model.
>
>
>
> --
> This message was sent by Atlassian JIRA
> (v6.3.4#6332)
>


Re: [PROPOSAL] MultiLineIO

2016-03-19 Thread Lukasz Cwik
Have you considered expanding TextIO to support an arbitrary delimiter
instead of defining MultiLineIO?
https://github.com/apache/incubator-beam/blob/master/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/TextIO.java#L737

TextIO currently splits on '\n', '\r\n', or '\r'. It seems as though having
it split on any arbitrary delimiter would be useful.
Note that even though TextIO implies its used for strings, this is not
necessarily required since a user can use any coder to decode the bytes
between two delimiters.

On Thu, Mar 17, 2016 at 12:53 AM, Dan Halperin 
wrote:

> Hi Peter,
>
> Echoing Eugene's and JB's thoughts -- we'd love a PR!
>
> I also wanted to say: we've hit you with a lot of recommendations in this
> email thread. If you have any questions, you can ask us here -- but we'll
> of course be happy to answer them during code review as well. Do not feel
> like meeting all these many criteria is a pre-requisite for opening a Pull
> Request -- we just may give you feedback and ask for changes before merging
> :).
>
> Thanks!
> Dan
>
> On Mon, Mar 14, 2016 at 12:27 PM, Jean-Baptiste Onofré 
> wrote:
>
> > Yes, you already use the "new style" as you use BoundedSource.
> >
> > Regards
> > JB
> >
> >
> > On 03/14/2016 08:08 PM, Giesin, Peter wrote:
> >
> >> The MultiLineIO is a BoundedSource and an extension of FileBasedSource.
> >> Where the FileBasedSource reads a single line at a time the MultiLineIO
> >> allows the user to define an arbitrary “message” delimiter. It then
> reads
> >> through the file, removing newlines, until the separator is read,
> finally
> >> returning the character sequence that is built.
> >>
> >>
> >>
> >> I believe it is already built using the new style but I will compare it
> >> to the BigTableIO to confirm that.
> >>
> >> Peter
> >>
> >> On 3/14/16, 1:50 PM, "Jean-Baptiste Onofré"  wrote:
> >>
> >> I second Eugene here.
> >>>
> >>> In the past, I developed some IOs using the "old style" (as did in the
> >>> PubSubIO). I'm now refactoring it to use the "new style".
> >>>
> >>> Regards
> >>> JB
> >>>
> >>> On 03/14/2016 06:47 PM, Eugene Kirpichov wrote:
> >>>
>  Hi Peter,
>  Looking forward to your PR. Please note that source classes are
>  relatively
>  tricky to develop, so would you mind briefly explaining what your
> source
>  will do here over email, so that we hash out some possible issues
> early
>  rather than in PR comments?
>  Also note that now recommend to package IO connectors as PTransforms,
>  making the PTransform class itself be a builder - while the
> Source/Sink
>  classes should be kept package-private (rather than exposed to the
>  user).
>  For an example of a connector packaged in this style, see BigtableIO (
> 
> 
> https://urldefense.proofpoint.com/v2/url?u=https-3A__github.com_GoogleCloudPlatform_DataflowJavaSDK_blob_master_sdk_src_main_java_com_google_cloud_dataflow_sdk_io_bigtable_BigtableIO.java=BQIDaQ=3BfiSO86x5iKjpl2b39jud9R1NrKYqPq2js90dwBswk=Qm-l_hW9ETnsf6X4GnnKezFfnAEwc328ni8ljHdGYjo=spZLCFrFYTtUSPsGFMTVvmXPyfW-dr7Uouq-4BtWaPQ=qJJMaoRlOHxy1MRcAwa7aIJxwGYJyUKL93FdO4jZr1I=
>  ).
>  The advantage is that this style allows you to restructure the
>  connector or
>  add additional transforms into its implementation if necessary,
> without
>  changing the call sites. It might seem less important in case of a
>  simple
>  connector like reading lines from file, but it will become much more
>  important with things like SplittableDoFn
>  <
> 
> https://urldefense.proofpoint.com/v2/url?u=https-3A__issues.apache.org_jira_browse_BEAM-2D65=BQIDaQ=3BfiSO86x5iKjpl2b39jud9R1NrKYqPq2js90dwBswk=Qm-l_hW9ETnsf6X4GnnKezFfnAEwc328ni8ljHdGYjo=spZLCFrFYTtUSPsGFMTVvmXPyfW-dr7Uouq-4BtWaPQ=POJMhWDTbkUnHHLnKcH9FtzeP-lrZkuGZG3YPNNhXSU=
>  >.
> 
>  On Mon, Mar 14, 2016 at 10:29 AM Jean-Baptiste Onofré <
> j...@nanthrax.net>
>  wrote:
> 
>  Hi Peter,
> >
> > awesome !
> >
> > Yes, you can create the PR using the github mirror.
> >
> > Does your MultiLineIO use Bounded/Unbounded "new" classes ?
> >
> > Regards
> > JB
> >
> > On 03/14/2016 06:23 PM, Giesin, Peter wrote:
> >
> >> Hi all!
> >>
> >> I am looking to get involved in the project. I have a MultiLineIO
> >>
> > file-based source that I think would be useful. I know the project is
> > just
> > spinning up but can I simply clone the repo and create a PR for the
> > new IO?
> > Also looked over JIRA and there are some tickets I can help out with.
> >
> >>
> >> Best regards,
> >> Peter Giesin
> >> peter.gie...@fisglobal.com
> >>
> >>
> >> _
> >> The information contained in this message is proprietary and/or
> >>
> > confidential. If you are not the intended recipient, please: (i)
> > delete