Re: Enhancement for Joining Unbounded PCollections of different WindowFns

2019-07-26 Thread rahul patwari
Thanks for your detailed explanation Rui.

Like you said, the triggers for the PCollections should be compatible with
"Slowly Changing Lookup Cache" pattern.

Rui, If this feature makes sense, can you please create a JIRA for it.

I will start working on splitting BeamJoinRel.java to specific
implementations with SQL planner rules. I will also implement the "Slowly
Changing Lookup Cache" pattern with SQL planner rules.

Thanks,
Rahul

On Sat 27 Jul, 2019, 1:58 AM Rui Wang,  wrote:

>
>
>> PCollection mainStream = ...
>> *PCollectionView>>* lookupStream = ...  // Note:
>> PCollectionView not PCollection. I have referred to PCollection before. And 
>> *PCollectionView
>> should be of type Multimap*, to perform SideinputJoin.
>> PCollectionTuple tuple = PCollectionTuple.of(new TupleTag("MainTable"),
>> mainStream)).and(new TupleTag("LookupTable"), lookupStream);
>> //PCollectionTuple has to be enhanced to take PCollectionView also as an
>> argument.
>> tuple.apply(SqlTransform.of("MainTable JOIN LookupTable"));
>>
> and in BeamJoinRel.java, when Join has to be performed on a *PCollection*
>> and a *PCollectionView*(instanceof check), SideInputJoin will be applied.
>>
>
> Yes, I am thinking something similar to it.
>
>
>
>> I think that performing SideInputJoin on two unbounded PCollections with
>> different WindowFn(and satisfying the criteria for "Slowly Changing Lookup
>> Cache Pattern") is relatively straight forward if we take *PCollection*
>> itself as an argument for LookupTable in PCollectionTuple.
>>
> I think it's a hack in BeamJoinRel to check WindowFn and perform
> SideInput when one side is unbounded non-global windowing, another side is
> unbounded global windowing (and likely you need also check triggers?). For
> SQL, if you really want to do it, you should do it by planner rules to
> match exactly the case you want to support and decouple this join
> implementation from BeamJoinRel.
>
> Even current BeamJoinRel is too large and we should split it to different
> JoinRel to match different plans.
>
>
>
>> The conversion of PCollection to PCollectionView is hidden for the user
>> in this case(Which will be performed internally by SideInputJoin).
>> Moreover, if the user wants to perform some SQL Aggregations on
>> "lookupStream" before performing Join with "mainStream"(Multiple SQL
>> Queries separated by ";"), it is possible in this case, as the
>> "lookupStream" is a PCollection. But, it is not possible if the
>> "lookupStream" is a PCollectionView.
>>
> It's true that PCollectionView will limit further SQL operations. The
> workaround is do those operations by java before using SqlTransform, and
> within SqlTransfrom, start with the Join.
>
>
> So if your use case is support a general SQL operations on two unbounded
> PCollections but with a special need that to perform a SideInput join for
> these two unbounded PColleciton with a special WindowFn setting (maybe even
> trigger) checking, the best way then is to define SQL plan rules and have a
> separate Rel implementation.
>
>
>
> -Rui
>
>
>
>
>> Regards,
>> Rahul
>>
>> On Fri, Jul 26, 2019 at 9:19 AM Rui Wang  wrote:
>>
>>> I see.
>>>
>>> Actually I was still referring to make "LookupStream" as
>>> PCollectionView to perform sideinput join, which then doesn't have mismatch
>>> WindowFn problem. Otherwise, we shouldn't check special case of WindowFn to
>>> decide if perform a sideinput join for two unbounded PCollection when their
>>> WindowFn does not match.
>>>
>>> And "data completeness" really means is sideinput is triggered so it
>>> could change, and then the question is when sideinput is changed, should we
>>> refine previous data? It becomes harder to reason at this moment.
>>>
>>>
>>> Rui
>>>
>>> On Thu, Jul 25, 2019 at 6:17 PM rahul patwari <
>>> rahulpatwari8...@gmail.com> wrote:
>>>
 "*In terms of Join schematic, I think it's hard to reason data
 completeness since one side of the join is changing*"
 - As it is possible to apply [Global Windows with Non-Default Trigger]
 to Unbounded Data Source, say, Kafka, to distinguish this Kafka PCollection
 from "Slowly Changing lookup cache" Unbounded PCollection,  If we can check
 the condition that one of the PCollection being Joined have WindowFn as
 [Global Windows with Trigger Repeatedly.forever(AfterProcessingTime.
 pastFirstElementInPane())] is it sufficient to perform the Join of
 "MainStream" and this "LookupStream"?

 In other words, I mean to say that instead of directly throwing
 Exception
 
  when
 Joining two Unbounded PCollections with different WindowFns, If we can
 ensure that
 MainStream: one side of the join is Unbounded with WindowFn as
 [Non-Global Windows with DefaultTrigger] and
 LookupStream: the other side 

Re: [VOTE] Release 2.14.0, release candidate #1

2019-07-26 Thread Ahmet Altay
To confirm, I manuall validated leader board on python. It is working.

On Fri, Jul 26, 2019 at 5:23 PM Yifan Zou  wrote:

> AFAIK, there should not be any special prerequisites for this. Things the
> script does including:
> 1. download the python rc in zip
> 2. start virtualenv and install the sdk.
> 3. verify hash.
> 4. config settings.xml and start a Java pubsub message injector.
> 5. run game examples and validate.
>
> Could you double check if the sdk was installed properly (step 1&2)?
>

I also guessing this is the case. Probably something earlier in the
validation script did not run as expected.


>
>
Yifan
>
> On Fri, Jul 26, 2019 at 2:38 PM Anton Kedin  wrote:
>
>> Validation script fails for me when I try to run [1] python leaderboard
>> with direct runner:
>>
>> ```
>> *
>> * Running Python Leaderboard with DirectRunner
>> *
>> /usr/bin/python: No module named apache_beam.examples.complete.game
>> ```
>>
>> If someone has more context, what are the prerequisites for this step?
>> How does it look up the module?
>>
>> [1]
>> https://github.com/apache/beam/blob/master/release/src/main/scripts/run_rc_validation.sh#L424
>>
>> Regards,
>> Anton
>>
>> On Fri, Jul 26, 2019 at 10:23 AM Anton Kedin  wrote:
>>
>>> Cool, will make the post and will update the release guide as well then
>>>
>>> On Fri, Jul 26, 2019 at 10:20 AM Chad Dombrova 
>>> wrote:
>>>
 I think the release guide needs to be updated to remove the optionality
> of blog creation and avoid confusion. Thanks for pointing that out.
>

 +1




Re: [VOTE] Release 2.14.0, release candidate #1

2019-07-26 Thread Yifan Zou
AFAIK, there should not be any special prerequisites for this. Things the
script does including:
1. download the python rc in zip
2. start virtualenv and install the sdk.
3. verify hash.
4. config settings.xml and start a Java pubsub message injector.
5. run game examples and validate.

Could you double check if the sdk was installed properly (step 1&2)?

Yifan

On Fri, Jul 26, 2019 at 2:38 PM Anton Kedin  wrote:

> Validation script fails for me when I try to run [1] python leaderboard
> with direct runner:
>
> ```
> *
> * Running Python Leaderboard with DirectRunner
> *
> /usr/bin/python: No module named apache_beam.examples.complete.game
> ```
>
> If someone has more context, what are the prerequisites for this step? How
> does it look up the module?
>
> [1]
> https://github.com/apache/beam/blob/master/release/src/main/scripts/run_rc_validation.sh#L424
>
> Regards,
> Anton
>
> On Fri, Jul 26, 2019 at 10:23 AM Anton Kedin  wrote:
>
>> Cool, will make the post and will update the release guide as well then
>>
>> On Fri, Jul 26, 2019 at 10:20 AM Chad Dombrova  wrote:
>>
>>> I think the release guide needs to be updated to remove the optionality
 of blog creation and avoid confusion. Thanks for pointing that out.

>>>
>>> +1
>>>
>>>


Re: [POPOSAL] Integrate BigQuery-compatible HyperLogLog algorithm into Beam

2019-07-26 Thread Robin Qiu
Quick update: the PR implementing this feature has been sent out:
https://github.com/apache/beam/pull/9144. The design doc is also revamped
to reflect the design decisions we have made.

On Tue, Jun 25, 2019 at 2:05 PM Robin Qiu  wrote:

> Can you please add this to the design documents webpage.
>> https://beam.apache.org/contribute/design-documents/
>>
>
> Thanks for the reminder. Done! (https://github.com/apache/beam/pull/8947)
>
>
>> I am not sure if this feature should go into 'sdks/java/core' because
>> it seems a quite specific case, maybe it should go in the sketching
>> module so it can be easier to find,
>
>
> Adding it to a separate module under `extensions` sounds good to me.
>
>
>> or maybe in its own extension if
>> the 'mix' of dependencies may be an issue and then make this
>> dependency a requirement for the gcp module since I suppose the
>> ultimate goal is to integrate it there.
>>
>
> I guess we can shade dependencies of ZetaSketch if it creates a problem
> when integrated with Beam. But I would not relate it to a gcp module since
> I think it will be a useful feature regardless of whether users run it on
> GCP or not (although if run on GCP, it will get better integration with
> BigQuery).
>
> On Mon, Jun 24, 2019 at 1:55 PM Ismaël Mejía  wrote:
>
>> Thanks for bringing this Robin,
>>
>> Can you please add this to the design documents webpage.
>> https://beam.apache.org/contribute/design-documents/
>>
>> Let some comments in the doc, It is great that this is finally open
>> and even better that it becomes part of Beam.
>>
>> I am not sure if this feature should go into 'sdks/java/core' because
>> it seems a quite specific case, maybe it should go in the sketching
>> module so it can be easier to find, or maybe in its own extension if
>> the 'mix' of dependencies may be an issue and then make this
>> dependency a requirement for the gcp module since I suppose the
>> ultimate goal is to integrate it there.
>>
>> CC +arnaudfournier...@gmail.com original author of the sketching
>> library who may be interested on this.
>>
>>
>> On Mon, Jun 24, 2019 at 9:31 PM Rui Wang  wrote:
>> >
>> > Thanks Robin! It would also be interesting if we could offer HLL_COUNT
>> functions in BeamSQL based on your proposal!
>> >
>> >
>> > -Rui
>> >
>> > On Mon, Jun 24, 2019 at 10:47 AM Robin Qiu  wrote:
>> >>
>> >> Hi all,
>> >>
>> >> I have written a doc proposing we integrate the HyperLogLog++
>> algorithm into Beam as a new combiner. The algorithm solves the
>> count-distinct problem, and the intermediate sketch (aggregator) format
>> will be compatible with sketches computed via the HLL_COUNT functions in
>> Google Cloud BigQuery (because they will be based on the same
>> implementation: ZetaSketch). The tracking JIRA issue is BEAM-7013.
>> >>
>> >> The API design proposed in the doc is subject to change and open to
>> comments. Please feel free to comment if you have any thoughts.
>> >>
>> >> Cheers,
>> >> Robin
>>
>


Re: Enhancement for Joining Unbounded PCollections of different WindowFns

2019-07-26 Thread Rui Wang
>
> PCollection mainStream = ...
> *PCollectionView>>* lookupStream = ...  // Note:
> PCollectionView not PCollection. I have referred to PCollection before. And 
> *PCollectionView
> should be of type Multimap*, to perform SideinputJoin.
> PCollectionTuple tuple = PCollectionTuple.of(new TupleTag("MainTable"),
> mainStream)).and(new TupleTag("LookupTable"), lookupStream);
> //PCollectionTuple has to be enhanced to take PCollectionView also as an
> argument.
> tuple.apply(SqlTransform.of("MainTable JOIN LookupTable"));
>
and in BeamJoinRel.java, when Join has to be performed on a *PCollection*
> and a *PCollectionView*(instanceof check), SideInputJoin will be applied.
>

Yes, I am thinking something similar to it.



> I think that performing SideInputJoin on two unbounded PCollections with
> different WindowFn(and satisfying the criteria for "Slowly Changing Lookup
> Cache Pattern") is relatively straight forward if we take *PCollection*
> itself as an argument for LookupTable in PCollectionTuple.
>
I think it's a hack in BeamJoinRel to check WindowFn and perform
SideInput when one side is unbounded non-global windowing, another side is
unbounded global windowing (and likely you need also check triggers?). For
SQL, if you really want to do it, you should do it by planner rules to
match exactly the case you want to support and decouple this join
implementation from BeamJoinRel.

Even current BeamJoinRel is too large and we should split it to different
JoinRel to match different plans.



> The conversion of PCollection to PCollectionView is hidden for the user in
> this case(Which will be performed internally by SideInputJoin). Moreover,
> if the user wants to perform some SQL Aggregations on "lookupStream" before
> performing Join with "mainStream"(Multiple SQL Queries separated by ";"),
> it is possible in this case, as the "lookupStream" is a PCollection. But,
> it is not possible if the "lookupStream" is a PCollectionView.
>
It's true that PCollectionView will limit further SQL operations. The
workaround is do those operations by java before using SqlTransform, and
within SqlTransfrom, start with the Join.


So if your use case is support a general SQL operations on two unbounded
PCollections but with a special need that to perform a SideInput join for
these two unbounded PColleciton with a special WindowFn setting (maybe even
trigger) checking, the best way then is to define SQL plan rules and have a
separate Rel implementation.



-Rui




> Regards,
> Rahul
>
> On Fri, Jul 26, 2019 at 9:19 AM Rui Wang  wrote:
>
>> I see.
>>
>> Actually I was still referring to make "LookupStream" as PCollectionView
>> to perform sideinput join, which then doesn't have mismatch WindowFn
>> problem. Otherwise, we shouldn't check special case of WindowFn to decide
>> if perform a sideinput join for two unbounded PCollection when their
>> WindowFn does not match.
>>
>> And "data completeness" really means is sideinput is triggered so it
>> could change, and then the question is when sideinput is changed, should we
>> refine previous data? It becomes harder to reason at this moment.
>>
>>
>> Rui
>>
>> On Thu, Jul 25, 2019 at 6:17 PM rahul patwari 
>> wrote:
>>
>>> "*In terms of Join schematic, I think it's hard to reason data
>>> completeness since one side of the join is changing*"
>>> - As it is possible to apply [Global Windows with Non-Default Trigger]
>>> to Unbounded Data Source, say, Kafka, to distinguish this Kafka PCollection
>>> from "Slowly Changing lookup cache" Unbounded PCollection,  If we can check
>>> the condition that one of the PCollection being Joined have WindowFn as
>>> [Global Windows with Trigger Repeatedly.forever(AfterProcessingTime.
>>> pastFirstElementInPane())] is it sufficient to perform the Join of
>>> "MainStream" and this "LookupStream"?
>>>
>>> In other words, I mean to say that instead of directly throwing
>>> Exception
>>> 
>>>  when
>>> Joining two Unbounded PCollections with different WindowFns, If we can
>>> ensure that
>>> MainStream: one side of the join is Unbounded with WindowFn as
>>> [Non-Global Windows with DefaultTrigger] and
>>> LookupStream: the other side of the Join is a "Slowly Changing Lookup
>>> Cache"[Global Windows with Repeatedly.forever(AfterProcessingTime.
>>> pastFirstElementInPane()) Trigger],
>>> we can directly perform a SideInputJoin.
>>>
>>> Will we have "data completeness" problem even in "Slowly Changing lookup
>>> Cache Pattern"?
>>>
>>> On Fri, Jul 26, 2019 at 2:51 AM Rui Wang  wrote:
>>>
 To be more clear, I think it's useful if we can achieve the following
 that you wrote

 PCollection mainStream = ...;
 PCollection lookupStream = ...;
 PCollectionTuple tuple = PCollectionTuple.of(new TupleTag("MainTable"),
 new TupleTag("LookupTable"));

Beam metrics update

2019-07-26 Thread Mikhail Gryzykhin
Hello everybody,

I'm working on improving deployment scripts for beam metrics site
 and going to do some updates
over the weekend. This might bring site down for short periods of time.

Please respond to this message if you require metrics dashboards up.

Regards,
Mikhail.


Neat Beam integration - Ananas Analytics Desktop

2019-07-26 Thread Kenneth Knowles
One colleague pointed me to this project and another tweeted about it [1].
I see it was mentioned on hacker news around that time [2]. Looks like a
cool visual editor that produces Beam pipelines. Small # of contributors,
ASL2 licensed, from a brief glance the code arrived on GitHub in a few
large commits around the end of April. Nice to see Beam enabling downstream
cross-engine tools.

   - https://github.com/ananas-analytics/ananas-desktop

If you are looking for the Beam mentions, here they are in prose and code

   - https://ananasanalytics.com/docs/user-guide/engine-overview
   -
   
https://github.com/ananas-analytics/ananas-desktop/search?utf8=%E2%9C%93=beam=

Kenn

[1] https://twitter.com/datancoffee/status/1153471732193218560
[2] https://news.ycombinator.com/item?id=20498474


Re: Enhancement for Joining Unbounded PCollections of different WindowFns

2019-07-26 Thread rahul patwari
Is this the flow that you are referring to:

PCollection mainStream = ...
*PCollectionView>>* lookupStream = ...  // Note:
PCollectionView not PCollection. I have referred to PCollection
before. And *PCollectionView
should be of type Multimap*, to perform SideinputJoin.
PCollectionTuple tuple = PCollectionTuple.of(new TupleTag("MainTable"),
mainStream)).and(new TupleTag("LookupTable"), lookupStream);
//PCollectionTuple has to be enhanced to take PCollectionView also as an
argument.
tuple.apply(SqlTransform.of("MainTable JOIN LookupTable"));

and in BeamJoinRel.java, when Join has to be performed on a *PCollection*
and a *PCollectionView*(instanceof check), SideInputJoin will be applied.

I think that performing SideInputJoin on two unbounded PCollections with
different WindowFn(and satisfying the criteria for "Slowly Changing Lookup
Cache Pattern") is relatively straight forward if we take *PCollection*
itself as an argument for LookupTable in PCollectionTuple. The conversion
of PCollection to PCollectionView is hidden for the user in this case(Which
will be performed internally by SideInputJoin). Moreover, if the user wants
to perform some SQL Aggregations on "lookupStream" before performing Join
with "mainStream"(Multiple SQL Queries separated by ";"), it is possible in
this case, as the "lookupStream" is a PCollection. But, it is not possible
if the "lookupStream" is a PCollectionView.

Regards,
Rahul

On Fri, Jul 26, 2019 at 9:19 AM Rui Wang  wrote:

> I see.
>
> Actually I was still referring to make "LookupStream" as PCollectionView
> to perform sideinput join, which then doesn't have mismatch WindowFn
> problem. Otherwise, we shouldn't check special case of WindowFn to decide
> if perform a sideinput join for two unbounded PCollection when their
> WindowFn does not match.
>
> And "data completeness" really means is sideinput is triggered so it could
> change, and then the question is when sideinput is changed, should we
> refine previous data? It becomes harder to reason at this moment.
>
>
> Rui
>
> On Thu, Jul 25, 2019 at 6:17 PM rahul patwari 
> wrote:
>
>> "*In terms of Join schematic, I think it's hard to reason data
>> completeness since one side of the join is changing*"
>> - As it is possible to apply [Global Windows with Non-Default Trigger] to
>> Unbounded Data Source, say, Kafka, to distinguish this Kafka PCollection
>> from "Slowly Changing lookup cache" Unbounded PCollection,  If we can check
>> the condition that one of the PCollection being Joined have WindowFn as
>> [Global Windows with Trigger Repeatedly.forever(AfterProcessingTime.
>> pastFirstElementInPane())] is it sufficient to perform the Join of
>> "MainStream" and this "LookupStream"?
>>
>> In other words, I mean to say that instead of directly throwing Exception
>> 
>>  when
>> Joining two Unbounded PCollections with different WindowFns, If we can
>> ensure that
>> MainStream: one side of the join is Unbounded with WindowFn as
>> [Non-Global Windows with DefaultTrigger] and
>> LookupStream: the other side of the Join is a "Slowly Changing Lookup
>> Cache"[Global Windows with Repeatedly.forever(AfterProcessingTime.
>> pastFirstElementInPane()) Trigger],
>> we can directly perform a SideInputJoin.
>>
>> Will we have "data completeness" problem even in "Slowly Changing lookup
>> Cache Pattern"?
>>
>> On Fri, Jul 26, 2019 at 2:51 AM Rui Wang  wrote:
>>
>>> To be more clear, I think it's useful if we can achieve the following
>>> that you wrote
>>>
>>> PCollection mainStream = ...;
>>> PCollection lookupStream = ...;
>>> PCollectionTuple tuple = PCollectionTuple.of(new TupleTag("MainTable"),
>>> new TupleTag("LookupTable"));
>>> tuple.apply(SqlTransform.query("MainTable JOIN LookupTable"));
>>>
>>> -Rui
>>>
>>> On Thu, Jul 25, 2019 at 1:56 PM Rui Wang  wrote:
>>>
 Hi Rahul, thanks for your detailed writeup. It pretty much summarizes
 the slow changing table join problem.

 To your question: "Can we implement SideInputJoin for this case",
 there are two perspectives.

 In terms of implementing the slowing changing lookup cache pattern
 
  in
 BeamSQL, such sidinput join can be done that way. At least it worth
 exploring it until we identify blockers. I also think this pattern is
 already useful to users.

 In terms of Join schematic, I think it's hard to reason data
 completeness since one side of join is changing.

 -Rui


 On Thu, Jul 25, 2019 at 12:55 PM rahul patwari <
 rahulpatwari8...@gmail.com> wrote:

> Hi Kenn,
>
> If we consider the following two *Unbounded* PCollections:
> - PCollection1 => [*Non-Global* Window with Default 

Re: Choosing a coder for a class that contains a Row?

2019-07-26 Thread Reuven Lax
The metadata needed is already there - it's the encoding-position map in
Schema. However the code needs to be written to examine an old schema and a
new one in order to make the new schema encoding-compatible with the old
one. This shouldn't be difficult to write.

On Fri, Jul 26, 2019 at 10:21 AM Kenneth Knowles  wrote:

> The most challenging part, as I understand it, surrounds automatically
> inferred schemas from POJOs, where Java's nondeterministic iteration order,
> combined with a row's inherent ordering, means that even an identical
> pipeline will need some metadata to plumb the right fields to the right
> column indices.
>
> Most relational migration management I've done incorporates explicit
> migration logic along with changes to the schema. This is quite a lot more
> robust, but more implementation work, than having a default policy
> proto/avro/thrift style. I think there's a lot to explore here.
>
> Kenn
>
> On Thu, Jul 25, 2019 at 9:59 AM Brian Hulette  wrote:
>
>> I know Reuven has put some thought into evolving schemas, but I'm not
>> sure it's documented anywhere as of now. The only documentation I've come
>> across as I bump around the schema code are some comments deep in RowCoder
>> [1].
>> Essentially the current serialization format for a row includes a row
>> count as a prefix so we can detect "simple" schema changes like column
>> additions and deletions. When decoding a Row, if the current schema
>> contains *more* fields than the encoded Row, the remaining fields are
>> populated with nulls in the resulting Row object. If the current schema
>> contains *fewer* fields than the encoded Row, the additional ones are
>> just dropped.
>>
>> [1]
>> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/RowCoderGenerator.java#L296
>>
>> On Wed, Jul 24, 2019 at 6:00 AM Ryan Skraba  wrote:
>>
>>> I'm also really interested in the question of evolving schemas... It's
>>> something I've also put off figuring out :D
>>>
>>> With all its warts, the LazyAvroCoder technique (a coder backed by
>>> some sort of schema registry) _could_ work with "homogeneish" data
>>> (i.e. if the number of schemas in play for a single coder is much,
>>> much smaller than the number of elements), even if none of the the
>>> schemas are known at Pipeline construction.  The portability job
>>> server (which already stores and serves artifacts for running jobs)
>>> might be the right place to put a schema registry... but I'm not
>>> entirely convinced it's the right way to go either.
>>>
>>> At the same time, "simply" bumping a known schema to a new version is
>>> roughly equivalent to updating a pipeline in place.
>>>
>>> Sending the data as Java-serialized Rows will be equivalent to sending
>>> the entire schema with every record, so it _would_ work without
>>> involving a new, distributed state between one coders encode and
>>> anothers decode (at the cost of message size, of course).
>>>
>>> Ryan
>>>
>>>
>>> On Wed, Jul 24, 2019 at 1:40 AM Pablo Estrada 
>>> wrote:
>>> >
>>> > +dev
>>> > Thanks Ryan! This is quite helpful. Still not what I need : ) - but
>>> useful.
>>> >
>>> > The data is change data capture from databases, and I'm putting it
>>> into a Beam Row. The schema for the Row is generally homogeneous, but
>>> subject to change at some point in the future if the schema in the database
>>> changes. It's unusual and unlikely, but possible. I have no idea how Beam
>>> deals with evolving schemas. +Reuven Lax is there documentation / examples
>>> / anything around this? : )
>>> >
>>> > I think evolving schemas is an interesting question
>>> >
>>> > For now, I am going to Java-serialize the objects, and delay figuring
>>> this out. But I reckon I'll have to come back to this...
>>> >
>>> > Best
>>> > -P.
>>> >
>>> > On Tue, Jul 23, 2019 at 1:07 AM Ryan Skraba  wrote:
>>> >>
>>> >> Hello Pablo!  Just to clarify -- the Row schemas aren't known at
>>> >> pipeline construction time, but can be discovered from the instance of
>>> >> MyData?
>>> >>
>>> >> Once discovered, is the schema "homogeneous" for all instance of
>>> >> MyData?  (i.e. someRow will always have the same schema for all
>>> >> instances afterwards, and there won't be another someRow with a
>>> >> different schema).
>>> >>
>>> >> We've encountered a parallel "problem" with pure Avro data, where the
>>> >> instance is a GenericRecord containing it's own Avro schema but
>>> >> *without* knowing the schema until the pipeline is run.  The solution
>>> >> that we've been using is a bit hacky, but we're using an ad hoc
>>> >> per-job schema registry and a custom coder where each worker saves the
>>> >> schema in the `encode` before writing the record, and loads it lazily
>>> >> in the `decode` before reading.
>>> >>
>>> >> The original code is available[1] (be gentle, it was written with Beam
>>> >> 0.4.0-incubating... and has continued to work until now).
>>> >>
>>> >> In practice, the ad hoc schema 

Re: [VOTE] Release 2.14.0, release candidate #1

2019-07-26 Thread Anton Kedin
Cool, will make the post and will update the release guide as well then

On Fri, Jul 26, 2019 at 10:20 AM Chad Dombrova  wrote:

> I think the release guide needs to be updated to remove the optionality of
>> blog creation and avoid confusion. Thanks for pointing that out.
>>
>
> +1
>
>


Re: Choosing a coder for a class that contains a Row?

2019-07-26 Thread Kenneth Knowles
The most challenging part, as I understand it, surrounds automatically
inferred schemas from POJOs, where Java's nondeterministic iteration order,
combined with a row's inherent ordering, means that even an identical
pipeline will need some metadata to plumb the right fields to the right
column indices.

Most relational migration management I've done incorporates explicit
migration logic along with changes to the schema. This is quite a lot more
robust, but more implementation work, than having a default policy
proto/avro/thrift style. I think there's a lot to explore here.

Kenn

On Thu, Jul 25, 2019 at 9:59 AM Brian Hulette  wrote:

> I know Reuven has put some thought into evolving schemas, but I'm not sure
> it's documented anywhere as of now. The only documentation I've come across
> as I bump around the schema code are some comments deep in RowCoder [1].
> Essentially the current serialization format for a row includes a row
> count as a prefix so we can detect "simple" schema changes like column
> additions and deletions. When decoding a Row, if the current schema
> contains *more* fields than the encoded Row, the remaining fields are
> populated with nulls in the resulting Row object. If the current schema
> contains *fewer* fields than the encoded Row, the additional ones are
> just dropped.
>
> [1]
> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/RowCoderGenerator.java#L296
>
> On Wed, Jul 24, 2019 at 6:00 AM Ryan Skraba  wrote:
>
>> I'm also really interested in the question of evolving schemas... It's
>> something I've also put off figuring out :D
>>
>> With all its warts, the LazyAvroCoder technique (a coder backed by
>> some sort of schema registry) _could_ work with "homogeneish" data
>> (i.e. if the number of schemas in play for a single coder is much,
>> much smaller than the number of elements), even if none of the the
>> schemas are known at Pipeline construction.  The portability job
>> server (which already stores and serves artifacts for running jobs)
>> might be the right place to put a schema registry... but I'm not
>> entirely convinced it's the right way to go either.
>>
>> At the same time, "simply" bumping a known schema to a new version is
>> roughly equivalent to updating a pipeline in place.
>>
>> Sending the data as Java-serialized Rows will be equivalent to sending
>> the entire schema with every record, so it _would_ work without
>> involving a new, distributed state between one coders encode and
>> anothers decode (at the cost of message size, of course).
>>
>> Ryan
>>
>>
>> On Wed, Jul 24, 2019 at 1:40 AM Pablo Estrada  wrote:
>> >
>> > +dev
>> > Thanks Ryan! This is quite helpful. Still not what I need : ) - but
>> useful.
>> >
>> > The data is change data capture from databases, and I'm putting it into
>> a Beam Row. The schema for the Row is generally homogeneous, but subject to
>> change at some point in the future if the schema in the database changes.
>> It's unusual and unlikely, but possible. I have no idea how Beam deals with
>> evolving schemas. +Reuven Lax is there documentation / examples / anything
>> around this? : )
>> >
>> > I think evolving schemas is an interesting question
>> >
>> > For now, I am going to Java-serialize the objects, and delay figuring
>> this out. But I reckon I'll have to come back to this...
>> >
>> > Best
>> > -P.
>> >
>> > On Tue, Jul 23, 2019 at 1:07 AM Ryan Skraba  wrote:
>> >>
>> >> Hello Pablo!  Just to clarify -- the Row schemas aren't known at
>> >> pipeline construction time, but can be discovered from the instance of
>> >> MyData?
>> >>
>> >> Once discovered, is the schema "homogeneous" for all instance of
>> >> MyData?  (i.e. someRow will always have the same schema for all
>> >> instances afterwards, and there won't be another someRow with a
>> >> different schema).
>> >>
>> >> We've encountered a parallel "problem" with pure Avro data, where the
>> >> instance is a GenericRecord containing it's own Avro schema but
>> >> *without* knowing the schema until the pipeline is run.  The solution
>> >> that we've been using is a bit hacky, but we're using an ad hoc
>> >> per-job schema registry and a custom coder where each worker saves the
>> >> schema in the `encode` before writing the record, and loads it lazily
>> >> in the `decode` before reading.
>> >>
>> >> The original code is available[1] (be gentle, it was written with Beam
>> >> 0.4.0-incubating... and has continued to work until now).
>> >>
>> >> In practice, the ad hoc schema registry is just a server socket in the
>> >> Spark driver, in-memory for DirectRunner / local mode, and a a
>> >> read/write to a known location in other runners.  There are definitely
>> >> other solutions with side-inputs and providers, and the job server in
>> >> portability looks like an exciting candidate for per-job schema
>> >> registry story...
>> >>
>> >> I'm super eager to see if there are other ideas or a contribution we
>> >> 

Re: [VOTE] Release 2.14.0, release candidate #1

2019-07-26 Thread Chad Dombrova
>
> I think the release guide needs to be updated to remove the optionality of
> blog creation and avoid confusion. Thanks for pointing that out.
>

+1


Re: [VOTE] Release 2.14.0, release candidate #1

2019-07-26 Thread Thomas Weise
A quick look over the JIRA release notes reveals new features and important
improvements that call to be announced.

I think the release guide needs to be updated to remove the optionality of
blog creation and avoid confusion. Thanks for pointing that out.

On Fri, Jul 26, 2019 at 9:53 AM Anton Kedin  wrote:

> Hi Thomas, I haven't made it. I read that step of the guide as optional
> ("..if needed for this particular release..."). I am not sure if anything
> specific needs to be announced or highlighted for 2.14. I can go over the
> closed Jiras and create a blog post if it's expected.
>
> Regards,
> Anton
>
> On Fri, Jul 26, 2019 at 9:38 AM Thomas Weise  wrote:
>
>> Hi Anton,
>>
>> Thanks for working on the release.
>>
>> I don't find the release blog in https://github.com/apache/beam/pull/9157 or
>> elsewhere?
>>
>> This should be part of the release candidate [1] and I wonder why we keep
>> on missing it in RCs. Is there something that needs be be fixed in [1]?
>>
>> The reason why I now check for this as one of the first items is that we
>> traditionally have done poorly communicating releases to users when this is
>> actually very important. The blog needs many eyes to make sure we capture
>> what matters in a way that makes sense to users.
>>
>> Thomas
>>
>>
>>
>>
>>
>>
>> [1]
>> https://beam.apache.org/contribute/release-guide/#write-the-beam-blog-post-and-create-a-pull-request
>>
>>
>>
>> On Thu, Jul 25, 2019 at 4:25 PM Rui Wang  wrote:
>>
>>> Tried to verify RC1 by running Nexmark on Dataflow but found it's broken
>>> (at least based commands from Running+Nexmark
>>> ).
>>> Will try to debug it and rerun the process.
>>>
>>>
>>> -Rui
>>>
>>> On Thu, Jul 25, 2019 at 2:39 PM Anton Kedin  wrote:
>>>
 Hi everyone,
 Please review and vote on the release candidate #3 for the version
 2.14.0, as follows:
 [ ] +1, Approve the release
 [ ] -1, Do not approve the release (please provide specific comments)

 The complete staging area is available for your review, which includes:
 * JIRA release notes [1],
 * the official Apache source release to be deployed to dist.apache.org
 [2], which is signed with the key with fingerprint
 89E2FFCAE7E99CF6E6827CFEF7349F2310FFB193 [3],
 * all artifacts to be deployed to the Maven Central Repository [4],
 * source code tag "v2.14.0-RC1" [5], [6]
 * website pull request listing the release [7], publishing the API
 reference manual [8].
 * Python artifacts are deployed along with the source release to the
 dist.apache.org [2].
 * Validation sheet with a tab for 2.14.0 release to help with
 validation [9].

 The vote will be open for at least 72 hours. It is adopted by majority
 approval, with at least 3 PMC affirmative votes.

 Thanks,
 Anton

 [1]
 https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12319527=12345431
 [2] https://dist.apache.org/repos/dist/dev/beam/2.14.0/
 [3] https://dist.apache.org/repos/dist/release/beam/KEYS
 [4]
 https://repository.apache.org/content/repositories/orgapachebeam-1080/
 [5] https://github.com/apache/beam/tree/v2.14.0-RC1
 [6] https://github.com/apache/beam/tags
 [7] https://github.com/apache/beam/pull/9157
 [8] https://github.com/apache/beam-site/pull/591/
 [9] https://s.apache.org/beam-release-validation#gid=1082148452

>>>


Re: [VOTE] Release 2.14.0, release candidate #1

2019-07-26 Thread Anton Kedin
Hi Thomas, I haven't made it. I read that step of the guide as optional ("..if
needed for this particular release..."). I am not sure if anything specific
needs to be announced or highlighted for 2.14. I can go over the closed
Jiras and create a blog post if it's expected.

Regards,
Anton

On Fri, Jul 26, 2019 at 9:38 AM Thomas Weise  wrote:

> Hi Anton,
>
> Thanks for working on the release.
>
> I don't find the release blog in https://github.com/apache/beam/pull/9157 or
> elsewhere?
>
> This should be part of the release candidate [1] and I wonder why we keep
> on missing it in RCs. Is there something that needs be be fixed in [1]?
>
> The reason why I now check for this as one of the first items is that we
> traditionally have done poorly communicating releases to users when this is
> actually very important. The blog needs many eyes to make sure we capture
> what matters in a way that makes sense to users.
>
> Thomas
>
>
>
>
>
>
> [1]
> https://beam.apache.org/contribute/release-guide/#write-the-beam-blog-post-and-create-a-pull-request
>
>
>
> On Thu, Jul 25, 2019 at 4:25 PM Rui Wang  wrote:
>
>> Tried to verify RC1 by running Nexmark on Dataflow but found it's broken
>> (at least based commands from Running+Nexmark
>> ).
>> Will try to debug it and rerun the process.
>>
>>
>> -Rui
>>
>> On Thu, Jul 25, 2019 at 2:39 PM Anton Kedin  wrote:
>>
>>> Hi everyone,
>>> Please review and vote on the release candidate #3 for the version
>>> 2.14.0, as follows:
>>> [ ] +1, Approve the release
>>> [ ] -1, Do not approve the release (please provide specific comments)
>>>
>>> The complete staging area is available for your review, which includes:
>>> * JIRA release notes [1],
>>> * the official Apache source release to be deployed to dist.apache.org
>>> [2], which is signed with the key with fingerprint
>>> 89E2FFCAE7E99CF6E6827CFEF7349F2310FFB193 [3],
>>> * all artifacts to be deployed to the Maven Central Repository [4],
>>> * source code tag "v2.14.0-RC1" [5], [6]
>>> * website pull request listing the release [7], publishing the API
>>> reference manual [8].
>>> * Python artifacts are deployed along with the source release to the
>>> dist.apache.org [2].
>>> * Validation sheet with a tab for 2.14.0 release to help with validation
>>> [9].
>>>
>>> The vote will be open for at least 72 hours. It is adopted by majority
>>> approval, with at least 3 PMC affirmative votes.
>>>
>>> Thanks,
>>> Anton
>>>
>>> [1]
>>> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12319527=12345431
>>> [2] https://dist.apache.org/repos/dist/dev/beam/2.14.0/
>>> [3] https://dist.apache.org/repos/dist/release/beam/KEYS
>>> [4]
>>> https://repository.apache.org/content/repositories/orgapachebeam-1080/
>>> [5] https://github.com/apache/beam/tree/v2.14.0-RC1
>>> [6] https://github.com/apache/beam/tags
>>> [7] https://github.com/apache/beam/pull/9157
>>> [8] https://github.com/apache/beam-site/pull/591/
>>> [9] https://s.apache.org/beam-release-validation#gid=1082148452
>>>
>>


Re: [VOTE] Release 2.14.0, release candidate #1

2019-07-26 Thread Thomas Weise
Hi Anton,

Thanks for working on the release.

I don't find the release blog in https://github.com/apache/beam/pull/9157 or
elsewhere?

This should be part of the release candidate [1] and I wonder why we keep
on missing it in RCs. Is there something that needs be be fixed in [1]?

The reason why I now check for this as one of the first items is that we
traditionally have done poorly communicating releases to users when this is
actually very important. The blog needs many eyes to make sure we capture
what matters in a way that makes sense to users.

Thomas






[1]
https://beam.apache.org/contribute/release-guide/#write-the-beam-blog-post-and-create-a-pull-request



On Thu, Jul 25, 2019 at 4:25 PM Rui Wang  wrote:

> Tried to verify RC1 by running Nexmark on Dataflow but found it's broken
> (at least based commands from Running+Nexmark
> ). Will
> try to debug it and rerun the process.
>
>
> -Rui
>
> On Thu, Jul 25, 2019 at 2:39 PM Anton Kedin  wrote:
>
>> Hi everyone,
>> Please review and vote on the release candidate #3 for the version
>> 2.14.0, as follows:
>> [ ] +1, Approve the release
>> [ ] -1, Do not approve the release (please provide specific comments)
>>
>> The complete staging area is available for your review, which includes:
>> * JIRA release notes [1],
>> * the official Apache source release to be deployed to dist.apache.org
>> [2], which is signed with the key with fingerprint
>> 89E2FFCAE7E99CF6E6827CFEF7349F2310FFB193 [3],
>> * all artifacts to be deployed to the Maven Central Repository [4],
>> * source code tag "v2.14.0-RC1" [5], [6]
>> * website pull request listing the release [7], publishing the API
>> reference manual [8].
>> * Python artifacts are deployed along with the source release to the
>> dist.apache.org [2].
>> * Validation sheet with a tab for 2.14.0 release to help with validation
>> [9].
>>
>> The vote will be open for at least 72 hours. It is adopted by majority
>> approval, with at least 3 PMC affirmative votes.
>>
>> Thanks,
>> Anton
>>
>> [1]
>> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12319527=12345431
>> [2] https://dist.apache.org/repos/dist/dev/beam/2.14.0/
>> [3] https://dist.apache.org/repos/dist/release/beam/KEYS
>> [4]
>> https://repository.apache.org/content/repositories/orgapachebeam-1080/
>> [5] https://github.com/apache/beam/tree/v2.14.0-RC1
>> [6] https://github.com/apache/beam/tags
>> [7] https://github.com/apache/beam/pull/9157
>> [8] https://github.com/apache/beam-site/pull/591/
>> [9] https://s.apache.org/beam-release-validation#gid=1082148452
>>
>


Re: Sort Merge Bucket - Action Items

2019-07-26 Thread Kenneth Knowles
There is still considerable value in knowing data sources statically so you
can do things like fetch sizes and other metadata and adjust pipeline
shape. I would not expect to delete these, but to implement them on top of
SDF while still giving them a clear URN and payload so runners can know
that it is a statically-specified source.

Kenn

On Fri, Jul 26, 2019 at 3:23 AM Robert Bradshaw  wrote:

> On Thu, Jul 25, 2019 at 11:09 PM Eugene Kirpichov 
> wrote:
> >
> > Hi Gleb,
> >
> > Regarding the future of io.Read: ideally things would go as follows
> > - All runners support SDF at feature parity with Read (mostly this is
> just the Dataflow runner's liquid sharding and size estimation for bounded
> sources, and backlog for unbounded sources, but I recall that a couple of
> other runners also used size estimation)
> > - Bounded/UnboundedSource APIs are declared "deprecated" - it is
> forbidden to add any new implementations to SDK, and users shouldn't use
> them either (note: I believe it's already effectively forbidden to use them
> for cases where a DoFn/SDF at the current level of support will be
> sufficient)
> > - People one by one rewrite existing Bounded/UnboundedSource based
> PTransforms in the SDK to use SDFs instead
> > - Read.from() is rewritten to use a wrapper SDF over the given Source,
> and explicit support for Read is deleted from runners
> > - In the next major version of Beam - presumably 3.0 - the Read
> transform itself is deleted
> >
> > I don't know what's the current status of SDF/Read feature parity, maybe
> Luke or Cham can comment. An alternative path is offered in
> http://s.apache.org/sdf-via-source.
>
> Python supports initial splitting for SDF of all sources on portable
> runners. Dataflow support for batch SDF is undergoing testing, not yet
> rolled out. Dataflow support for streaming SDF is awaiting portable
> state/timer support.
>
> > On Thu, Jul 25, 2019 at 6:39 AM Gleb Kanterov  wrote:
> >>
> >> What is the long-term plan for org.apache.beam.sdk.io.Read? Is it going
> away in favor of SDF, or we are always going to have both?
> >>
> >> I was looking into AvroIO.read and AvroIO.readAll, both of them use
> AvroSource. AvroIO.readAll is using SDF, and it's implemented with
> ReadAllViaFileBasedSource that takes AvroSource as a parameter. Looking at
> ReadAllViaFileBasedSource I find it not necessary to use Source, it
> should be enough to have something like (KV,
> OutputReceiver), as we have discussed in this thread, and that should be
> fine for SMB as well. It would require duplicating code from AvroSource,
> but in the end, I don't see it as a problem if AvroSource is going away.
> >>
> >> I'm attaching a small diagram I put for myself to better understand the
> code.
> >>
> >> AvroIO.readAll :: PTransform> ->
> >>
> >> FileIO.matchAll :: PTransform,
> PCollection>
> >> FileIO.readMatches :: PTransform,
> PCollection>
> >> AvroIO.readFiles :: PTransform,
> PCollection> ->
> >>
> >> ReadAllViaFileBasedSource :: PTransform,
> PCollection> ->
> >>
> >> ParDo.of(SplitIntoRangesFn :: DoFn OffsetRange>>) (splittable do fn)
> >>
> >> Reshuffle.viaRandomKey()
> >>
> >> ParDo.of(ReadFileRangesFn(createSource) :: DoFn OffsetRange>, T>) where
> >>
> >> createSource :: String -> FileBasedSource
> >>
> >> createSource = AvroSource
> >>
> >>
> >> AvroIO.read without getHintMatchedManyFiles() :: PTransform PCollection> ->
> >>
> >> Read.Bounded.from(createSource) where
> >>
> >> createSource :: String -> FileBasedSource
> >>
> >> createSource = AvroSource
> >>
> >>
> >> Gleb
> >>
> >>
> >> On Thu, Jul 25, 2019 at 2:41 PM Robert Bradshaw 
> wrote:
> >>>
> >>> On Thu, Jul 25, 2019 at 12:35 AM Kenneth Knowles 
> wrote:
> >>> >
> >>> > From the peanut gallery, keeping a separate implementation for SMB
> seems fine. Dependencies are serious liabilities for both upstream and
> downstream. It seems like the reuse angle is generating extra work, and
> potentially making already-complex implementations more complex, instead of
> helping things.
> >>>
> >>> +1
> >>>
> >>> To be clear, what I care about is that WriteFiles(X) and
> >>> WriteSmbFiles(X) can share the same X, for X in {Avro, Parquet, Text,
> >>> TFRecord, ...}. In other words composability of the API (vs. manually
> >>> filling out the matrix). If WriteFiles and WriteSmbFiles find
> >>> opportunities for (easy, clean) implementation sharing, that'd be
> >>> nice, but not the primary goal.
> >>>
> >>> (Similarly for reading, though that's seem less obvious. Certainly
> >>> whatever T is useful for ReadSmb(T) could be useful for a
> >>> (non-liquid-shading) ReadAll(T) however.)
> >>>
> >>> > On Wed, Jul 24, 2019 at 11:59 AM Neville Li 
> wrote:
> >>> >>
> >>> >> I spoke too soon. Turns out for unsharded writes, numShards can't
> be determined until the last finalize transform, which is again different
> from the current SMB proposal (static number of buckets & shards).
> >>> >> I'll end up with more code specialized for SMB in order 

Re: Collecting metrics in JobInvocation - BEAM-4775

2019-07-26 Thread Kenneth Knowles
Took a look at the code, too. It seems like a mismatch in a few ways

 - PipelineRunner::run is async already and returns while the job is still
running
 - PipelineResult is a legacy name - it is really meant to be a handle to a
running job
 - cancel() on a future is just not really related to cancel() in a job. I
would expect to cancel a job with PipelineResult::cancel and I would expect
JobInvocation::cancel to cancel the "start job" RPC/request/whatever. So I
would not expect metrics for a job which I decided to not even start.

Kenn

On Fri, Jul 26, 2019 at 8:48 AM Łukasz Gajowy  wrote:

> Hi all,
>
> I'm currently working on BEAM-4775
> . The goal here is to
> pass portable MetricResults over the RPC API to the PortableRunner (SDK)
> part and allow reading them there. The metrics can be collected from the
> pipeline result that is available in JobInvocation's callbacks. The
> callbacks are registered in *start()
> *
>  and
> *cancel()
> 
>  *methods
> of JobInvocation. This is the place where my problems begin:
>
> I want to access the pipeline result and get the MetricResults from it.
> This is possible *only in onSuccess(PipelineResult result) method* of the
> callbacks registered in *start() and* *cancel() *in JobInvocation. Now,
> when I cancel the job invocation, *invocationFuture.cancel()
> *
>  is
> called and will result in invoking *onFailure(Throwable throwable) *in
> case the pipeline is still running. *onFailure()* has no PipelineResult
> parameter, hence there currently is no possibility to collect the metrics
> there.
>
> My questions currently are:
>
>- Should we collect metrics after the job is canceled? So far I
>assumed that we should.
>- If so, does anyone have some other ideas on how to collect metrics
>so that we could collect them when canceling the job?
>
> PR I'm working on with more discussions on the topic: PR 9020
> 
> The current idea on how the metrics could be collected in JobInvocation:
> link
> 
>
> Thanks,
> Łukasz
>
>


Collecting metrics in JobInvocation - BEAM-4775

2019-07-26 Thread Łukasz Gajowy
Hi all,

I'm currently working on BEAM-4775
. The goal here is to pass
portable MetricResults over the RPC API to the PortableRunner (SDK) part
and allow reading them there. The metrics can be collected from the
pipeline result that is available in JobInvocation's callbacks. The
callbacks are registered in *start()
*
and
*cancel()

*methods
of JobInvocation. This is the place where my problems begin:

I want to access the pipeline result and get the MetricResults from it.
This is possible *only in onSuccess(PipelineResult result) method* of the
callbacks registered in *start() and* *cancel() *in JobInvocation. Now,
when I cancel the job invocation, *invocationFuture.cancel()
*
is
called and will result in invoking *onFailure(Throwable throwable) *in case
the pipeline is still running. *onFailure()* has no PipelineResult
parameter, hence there currently is no possibility to collect the metrics
there.

My questions currently are:

   - Should we collect metrics after the job is canceled? So far I assumed
   that we should.
   - If so, does anyone have some other ideas on how to collect metrics so
   that we could collect them when canceling the job?

PR I'm working on with more discussions on the topic: PR 9020

The current idea on how the metrics could be collected in JobInvocation:
link


Thanks,
Łukasz


Re: Sort Merge Bucket - Action Items

2019-07-26 Thread Robert Bradshaw
On Thu, Jul 25, 2019 at 11:09 PM Eugene Kirpichov  wrote:
>
> Hi Gleb,
>
> Regarding the future of io.Read: ideally things would go as follows
> - All runners support SDF at feature parity with Read (mostly this is just 
> the Dataflow runner's liquid sharding and size estimation for bounded 
> sources, and backlog for unbounded sources, but I recall that a couple of 
> other runners also used size estimation)
> - Bounded/UnboundedSource APIs are declared "deprecated" - it is forbidden to 
> add any new implementations to SDK, and users shouldn't use them either 
> (note: I believe it's already effectively forbidden to use them for cases 
> where a DoFn/SDF at the current level of support will be sufficient)
> - People one by one rewrite existing Bounded/UnboundedSource based 
> PTransforms in the SDK to use SDFs instead
> - Read.from() is rewritten to use a wrapper SDF over the given Source, and 
> explicit support for Read is deleted from runners
> - In the next major version of Beam - presumably 3.0 - the Read transform 
> itself is deleted
>
> I don't know what's the current status of SDF/Read feature parity, maybe Luke 
> or Cham can comment. An alternative path is offered in 
> http://s.apache.org/sdf-via-source.

Python supports initial splitting for SDF of all sources on portable
runners. Dataflow support for batch SDF is undergoing testing, not yet
rolled out. Dataflow support for streaming SDF is awaiting portable
state/timer support.

> On Thu, Jul 25, 2019 at 6:39 AM Gleb Kanterov  wrote:
>>
>> What is the long-term plan for org.apache.beam.sdk.io.Read? Is it going away 
>> in favor of SDF, or we are always going to have both?
>>
>> I was looking into AvroIO.read and AvroIO.readAll, both of them use 
>> AvroSource. AvroIO.readAll is using SDF, and it's implemented with 
>> ReadAllViaFileBasedSource that takes AvroSource as a parameter. Looking at 
>> ReadAllViaFileBasedSource I find it not necessary to use Source, it 
>> should be enough to have something like (KV, 
>> OutputReceiver), as we have discussed in this thread, and that should be 
>> fine for SMB as well. It would require duplicating code from AvroSource, but 
>> in the end, I don't see it as a problem if AvroSource is going away.
>>
>> I'm attaching a small diagram I put for myself to better understand the code.
>>
>> AvroIO.readAll :: PTransform> ->
>>
>> FileIO.matchAll :: PTransform, 
>> PCollection>
>> FileIO.readMatches :: PTransform, 
>> PCollection>
>> AvroIO.readFiles :: PTransform, 
>> PCollection> ->
>>
>> ReadAllViaFileBasedSource :: PTransform, 
>> PCollection> ->
>>
>> ParDo.of(SplitIntoRangesFn :: DoFn> OffsetRange>>) (splittable do fn)
>>
>> Reshuffle.viaRandomKey()
>>
>> ParDo.of(ReadFileRangesFn(createSource) :: DoFn> OffsetRange>, T>) where
>>
>> createSource :: String -> FileBasedSource
>>
>> createSource = AvroSource
>>
>>
>> AvroIO.read without getHintMatchedManyFiles() :: PTransform> PCollection> ->
>>
>> Read.Bounded.from(createSource) where
>>
>> createSource :: String -> FileBasedSource
>>
>> createSource = AvroSource
>>
>>
>> Gleb
>>
>>
>> On Thu, Jul 25, 2019 at 2:41 PM Robert Bradshaw  wrote:
>>>
>>> On Thu, Jul 25, 2019 at 12:35 AM Kenneth Knowles  wrote:
>>> >
>>> > From the peanut gallery, keeping a separate implementation for SMB seems 
>>> > fine. Dependencies are serious liabilities for both upstream and 
>>> > downstream. It seems like the reuse angle is generating extra work, and 
>>> > potentially making already-complex implementations more complex, instead 
>>> > of helping things.
>>>
>>> +1
>>>
>>> To be clear, what I care about is that WriteFiles(X) and
>>> WriteSmbFiles(X) can share the same X, for X in {Avro, Parquet, Text,
>>> TFRecord, ...}. In other words composability of the API (vs. manually
>>> filling out the matrix). If WriteFiles and WriteSmbFiles find
>>> opportunities for (easy, clean) implementation sharing, that'd be
>>> nice, but not the primary goal.
>>>
>>> (Similarly for reading, though that's seem less obvious. Certainly
>>> whatever T is useful for ReadSmb(T) could be useful for a
>>> (non-liquid-shading) ReadAll(T) however.)
>>>
>>> > On Wed, Jul 24, 2019 at 11:59 AM Neville Li  wrote:
>>> >>
>>> >> I spoke too soon. Turns out for unsharded writes, numShards can't be 
>>> >> determined until the last finalize transform, which is again different 
>>> >> from the current SMB proposal (static number of buckets & shards).
>>> >> I'll end up with more code specialized for SMB in order to generalize 
>>> >> existing sink code, which I think we all want to avoid.
>>> >>
>>> >> Seems the only option is duplicating some logic like temp file handling, 
>>> >> which is exactly what we did in the original PR.
>>> >> I can reuse Compression & Sink for file level writes but that seems 
>>> >> about the most I can reuse right now.
>>> >>
>>> >> On Tue, Jul 23, 2019 at 6:36 PM Neville Li  wrote:
>>> >>>
>>> >>> So I spent one afternoon trying some ideas for reusing the last few 
>>>