Re: [DISCUSS] Structuring Java based DSLs

2018-11-30 Thread Jan Lukavský
I think that the fact that SQL uses some other internal dependency 
should remain hidden implementation detail. I absolutely agree that the 
dependency should of course remain sdks-java-sql in all cases.


  Jan

On 12/1/18 12:54 AM, Robert Bradshaw wrote:

I suppose what I'm trying to say is that I see this module structure
as a tool for discoverability and enumerating end-user endpoints. In
other words, if one wants to use SQL, it would seem odd to have to
depend on sdks-java-euphoria-sql rather than just sdks-java-sql if
sdks-java-euphoria is also a DSL one might use. A sibling relationship
does not prohibit the layered approach to implementation that sounds
like it makes sense.

(As for merging Euphoria into core, my initial impression is that's
probably a good idea, and something we should consider for 3.0 at the
very least.)

On Fri, Nov 30, 2018 at 11:06 PM Jan Lukavský  wrote:

Hi Rui,

yes, there are optimizations that could be added by each layer. The purpose of 
Euphoria layer actually is not to reorder or modify any user operators that are 
present in the pipeline (because it might not have enough information to do 
this), but it can for instance choose between various join implementations 
(shuffle join, broadcast join, ...) - so the optimizations it can do are more 
low level. But this plays nicely with the DSL hierarchy - each layer adds a 
little more restrictions, but can therefore do more optimizations. And I think 
that the layer between SDK and SQL wouldn't have to support SQL optimizations, 
it would only have to support way for SQL to express these optimizations.

   Jan -- Původní e-mail --
Od: Rui Wang 
Komu: dev@beam.apache.org
Datum: 30. 11. 2018 22:43:04
Předmět: Re: [DISCUSS] Structuring Java based DSLs

SQL's optimization is another area to consider for integration. SQL 
optimization includes pushing down filters/projections, merging or removing or 
swapping plan nodes and comparing plan costs to choose best plan.  Add another 
layer between SQL and java core might need the layer to support SQL 
optimizations if there is a need.

I don't have a clear image on what SQL needs from Euphoria for 
optimization(best case is nothing). As those optimizations are happening or 
will happen, we might start to have a sense of it.

-Rui

On Fri, Nov 30, 2018 at 12:38 PM Robert Bradshaw  wrote:

I don't really see Euphoria as a subset of SQL or the other way
around, and I think it makes sense to use either without the other, so
by this criteria keeping them as siblings than a nesting.

That said, I think it's really good to have a bunch of shared code,
e.g. a join library that could be used by both. One could even depend
on the other without having to abandon the sibling relationship.
Something like retractions belong in the core SDK itself. Deeper than
that, actually, it should be part of the model.

- Robert

On Fri, Nov 30, 2018 at 7:20 PM David Morávek  wrote:

Jan, we made Kryo optional recently (it is a separate module and is used only in tests). 
From a quick look it seems that we forgot to remove compile time dependency from 
euphoria's build.gradle. Only "strong" dependencies I'm aware of are core SDK 
and guava. We'll be probably adding sketching extension dependency soon.

D.

On Fri, Nov 30, 2018 at 7:08 PM Jan Lukavský  wrote:

Hi Anton,
reactions inline.

-- Původní e-mail --
Od: Anton Kedin 
Komu: dev@beam.apache.org
Datum: 30. 11. 2018 18:17:06
Předmět: Re: [DISCUSS] Structuring Java based DSLs

I think this approach makes sense in general, Euphoria can be the 
implementation detail of SQL, similar to Join Library or core SDK Schemas.

I wonder though whether it would be better to bring Euphoria closer to core SDK 
first, maybe even merge them together. If you look at Reuven's recent work 
around schemas it seems like there are already similarities between that and 
Euphoria's approach, unless I'm missing the point (e.g. Filter transforms, 
FullJoin vs CoGroup... see [2]). And we're already switching parts of SQL to 
those transforms (e.g. SQL Aggregation is now implemented by core SDK's 
Group[3]).



Yes, these transforms seem to be very similar to those Euphoria has. Whether or 
not to merge Euphoria with core is essentially just a decision of the community 
(in my point of view).



Adding explicit Schema support to Euphoria will bring it both closer to core 
SDK and make it natural to use for SQL. Can this be a first step towards this 
integration?



Euphoria currently operates on pure PCollections, so when PCollection has a 
schema, it will be accessible by Euphoria. It makes sense to make use of the 
schema in Euphoria - it seems natural on inputs to Euphoria operators, but it 
might be tricky (not saying impossible) to actually produce schema-aware 
PCollections as outputs from Euphoria operators (generally speaking, in special 
cases that might be possible). Regarding inputs, there is actually intention to 
act on type of PCollection 

Re: [SDF] Why do we need markDone (or an equivalent claim)?

2018-11-30 Thread Lukasz Cwik
On Fri, Nov 30, 2018 at 4:43 PM Robert Bradshaw  wrote:

> On Fri, Nov 30, 2018 at 10:28 PM Lukasz Cwik  wrote:
> >
> > On Fri, Nov 30, 2018 at 12:47 PM Robert Bradshaw 
> wrote:
> >>
> >> On Fri, Nov 30, 2018 at 7:10 PM Lukasz Cwik  wrote:
> >> >
> >> > Uh, I'm not sure what your asking.
> >>
> >> I'm asking why we wanted a markDone in the first place.
> >
> > When looking at the byte key restriction tracker code, I found a couple
> of bugs around how ranges were being compared and how the byte key range
> was being claimed (we weren't computing the next key correctly). The usage
> of markDone seemed to be a crutch when attempting to correctly implement
> the tryClaim code. Also, all the framework code that powered SDF wasn't
> aware of markDone so it couldn't validate that the last claim failed. So I
> fixed the tryClaim code and then didn't need markDone and removed it since
> this was the only restriction tracker that had it.
> >
> >> > The SDF API already has a void return on processElement means that a
> call to tryClaim must have returned false
> >>
> >> We could widen this to "or finished the restriction."
> >
> > Yes, having markDone could be added to the API. Is it a crutch for
> subtle bugs in tryClaim though?
>
> I'm proposing removing the requirement of having either a markDone or
> a tryClaim(EndKey).
>

Yes, I could see that.


> >> > while a non void return allows the caller to either return STOP
> (tryClaim must have returned false) or return RESUME (with a time of when
> to resume).
> >>
> >> We could also return STOP if tryClaim never returned false but the
> >> restriction was finished.
> >>
> >> > This allows the framework code to prevent user errors by ensuring the
> restriction has been completed.
> >>
> >> I don't think the framework can ensure this. (It can enforce the above
> >> constraints that on a STOP tryClaim did indeed return false on the
> >> last call, but I'm fuzzy on the value this actually provides when it
> >> just means the use must artificially force it to return a false value.
> >> It also means we can't make it an error to try claiming values outside
> >> the initial restriction. If we want to make things more explicit, we
> >> could require a STOP or RESUME return rather than allow a void
> >> return.)
> >
> > I don't think we want SDF authors to ensure that their values are in the
> initial range first before attempting to claim them as this is the purpose
> of tryClaim. The SDF code would then be checking that the range is valid
> twice.
> >
> > processElement() {
> >   readElement
> >   isElementInRange?
> >   if (!tryClaim) {
> > return
> >   }
> > }
> > (both isElementInRange and tryClaim are now doing the same bounds
> checking which can lead to subtle bounds checking errors).
>
> Generally code would be iterating over the range, and it would likely
> be a bug to check past it, but if we want to support code that ignores
> range.getEndPosition() and lets tryClaim do all the work I buy that as
> a good argument to allow arbitrary claim attempts.
>
> >> Maybe I'm just not clever enough to come up with a kind of source
> >> where this could be good at catching errors?
> >
> > I think the value is that we expect to implement a few restriction
> tracker classes which will be re-used across many SDF implementations. In
> this case, we could point out to the SDF author that they haven't claimed
> all that they said they would process. This would be true whether markDone
> existed or not.
>
> The general pattern is
>
> processRestriction() {
>   for (element : source[restriction]) {
> if (!tryClaim(element)) {
>   return STOP
> } else {
>   emit(element)
> }
>   }
>   tryClaim(everything)
>   return STOP  // or if CONTINUE is returned, omit the above line
> }
>
> and I'm having a hard time coming up with any bugs that would be
> caught if we didn't require the (seemingly boilerplate)
> tryClaim(everything) line. Maybe I'm not thinking of the right source?


I was looking at the RestrictionTracker(
https://github.com/apache/beam/blob/176851192bba449dba0b2bc7cc45a2342b587dbd/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Watch.java#L869)
that
is part of the Watch PTransform and it is significantly more complicated
then the others since the RestrictionTracker is responsible for "polling"
for new elements to process and expects the caller to process everything
that is part of the set. This may not be the best way this
RestrictionTracker could have been implemented since it seemed like we were
misusing several of the RestrictionTracker concepts and significantly tying
the implementation to the implementation of Watch but this could be the
example worth studying.

The other case I could see that is worth investigating is whether it helps
ensure restrictions are completed in the case of poor/incorrect exception
handling on the part of the SDF implementation.

>> > Also, "" is the byte key range, the code could have just passed in
> 

Re: [DISCUSS] SplittableDoFn Java SDK User Facing API

2018-11-30 Thread Robert Bradshaw
On Fri, Nov 30, 2018 at 10:14 PM Lukasz Cwik  wrote:
>
> On Fri, Nov 30, 2018 at 1:02 PM Robert Bradshaw  wrote:
>>
>> On Fri, Nov 30, 2018 at 6:38 PM Lukasz Cwik  wrote:
>> >
>> > Sorry, for some reason I thought I had answered these.
>>
>> No problem, thanks for you patience :).
>>
>> > On Fri, Nov 30, 2018 at 2:20 AM Robert Bradshaw  
>> > wrote:
>> >>
>> >> I still have outstanding questions (above) about
>> >>
>> >> 1) Why we need arbitrary precision for backlog, instead of just using
>> >> a (much simpler) double.
>> >
>> >
>> > Double lacks the precision for reporting backlogs for byte key ranges 
>> > (HBase, Bigtable, ...). Scanning a key range such as ["a", "b") and with a 
>> > large number of keys with a really long common prefix such as 
>> > "aab" and "aac", ... leads 
>> > to the backlog not changing even though we are making progress through the 
>> > key space. This also prevents splitting within such an area since the 
>> > double can't provide that necessary precision (without multiple rounds of 
>> > splitting which adds complexity).
>>
>> We'll have to support multiple rounds of splitting regardless. I can
>> see how this gives more information up front though.
>
> I agree that we will need to support multiple rounds of splitting from the 
> SDK side but this adds complexity from the runner side since it can only 
> increase the accuracy for a split by performing multiple rounds of splitting 
> at once.
>
>> (As an aside, I've been thinking about some ways of solving the dark
>> matter problem, and it might depend on knowing the actual key, using
>> the fact that character boundaries are likely cut-off points for
>> changes in density, which would get obscured by alternative
>> representations.)
>
> Every time I think about this issue, I can never get it to apply meaningfully 
> for unbounded sources such as a message queue like pubsub.

Yeah, neither can I.

> Also, having an infinitely precise backlog such as the decimal format would 
> still provide density information as the rate of change through the backlog 
> for a bounded source would change once a "cluster" was hit.

This is getting to somewhat of a tangential topic, but the key insight
is that although it's easy to find the start of a cluster, to split
ideally one would want to know where the end of the cluster is. For
keyspaces, this is likely to be at binary fractions, and in particular
looking at the longevity of common prefixes of length n one could make
heuristic guesses as to where this density dropoff may be. (This also
requires splitting at a key, not splitting relative to a current
position, which has its issues...)

>> >> 2) Whether its's worth passing backlog back to split requests, rather
>> >> than (again) a double representing "portion of current remaining"
>> >> which may change over time. (The most common split request is into
>> >> even portions, and specifically half, which can't accurately be
>> >> requested from a stale backlog.)
>> >
>> > I see two scenarios here:
>> > * the fraction is exposed to the SDF author and then the SDF author needs 
>> > to map from their restriciton space to backlog and also map fractions onto 
>> > their restriction space meaning that they are required to write mappings 
>> > between three different models.
>> > * the fraction is not exposed to the SDF author and the framework code 
>> > multiplies the fraction against the backlog and provides the backlog to 
>> > the user (this solves the backlog skew issue but still has the limited 
>> > precision issue).
>>
>> Limited precision is not as much of an issue here because one can
>> express very small numbers to split close to the current position, and
>> don't need high precision for splitting further away.
>
> Agree. Would this also mean that skew when splitting at half doesn't really 
> matter?

Lots of times keyspaces have big pockets of low density. If one hits
one of these ranges between when the backlog is reported and when the
split is requested, the skew can get quite large. Basically using a
fraction means that a system does not have to be as concerned about
stale data, and can make reasonable choices without data at all (e.g.
imagine upscaling from 200 to 300 workers and asking for everyone to
just give 33% of their work back), and when it does make choices based
on actual backlog the discrepancy between what was ideal at the time
backlog was requested and what's ideal now is shared between the
primary and remainder(s) rather than one side or the other absorbing
this entire error

This of course gets exacerbated with multiple splits, e.g. if the
measured backlog was 100 and you wanted to split the work in 10
pieces, asking for a split at 10 would only result in 9 splits if the
cursor advanced by 10 in the meantime, and if it advanced by 9 you'd
probably want to use fractions anyway to spread the error out and
produce (10, 9, 9, 9, 9, 9, 9, 9, 9, 9) rather than (10, 10, 

Re: [SDF] Why do we need markDone (or an equivalent claim)?

2018-11-30 Thread Robert Bradshaw
On Fri, Nov 30, 2018 at 10:28 PM Lukasz Cwik  wrote:
>
> On Fri, Nov 30, 2018 at 12:47 PM Robert Bradshaw  wrote:
>>
>> On Fri, Nov 30, 2018 at 7:10 PM Lukasz Cwik  wrote:
>> >
>> > Uh, I'm not sure what your asking.
>>
>> I'm asking why we wanted a markDone in the first place.
>
> When looking at the byte key restriction tracker code, I found a couple of 
> bugs around how ranges were being compared and how the byte key range was 
> being claimed (we weren't computing the next key correctly). The usage of 
> markDone seemed to be a crutch when attempting to correctly implement the 
> tryClaim code. Also, all the framework code that powered SDF wasn't aware of 
> markDone so it couldn't validate that the last claim failed. So I fixed the 
> tryClaim code and then didn't need markDone and removed it since this was the 
> only restriction tracker that had it.
>
>> > The SDF API already has a void return on processElement means that a call 
>> > to tryClaim must have returned false
>>
>> We could widen this to "or finished the restriction."
>
> Yes, having markDone could be added to the API. Is it a crutch for subtle 
> bugs in tryClaim though?

I'm proposing removing the requirement of having either a markDone or
a tryClaim(EndKey).

>> > while a non void return allows the caller to either return STOP (tryClaim 
>> > must have returned false) or return RESUME (with a time of when to resume).
>>
>> We could also return STOP if tryClaim never returned false but the
>> restriction was finished.
>>
>> > This allows the framework code to prevent user errors by ensuring the 
>> > restriction has been completed.
>>
>> I don't think the framework can ensure this. (It can enforce the above
>> constraints that on a STOP tryClaim did indeed return false on the
>> last call, but I'm fuzzy on the value this actually provides when it
>> just means the use must artificially force it to return a false value.
>> It also means we can't make it an error to try claiming values outside
>> the initial restriction. If we want to make things more explicit, we
>> could require a STOP or RESUME return rather than allow a void
>> return.)
>
> I don't think we want SDF authors to ensure that their values are in the 
> initial range first before attempting to claim them as this is the purpose of 
> tryClaim. The SDF code would then be checking that the range is valid twice.
>
> processElement() {
>   readElement
>   isElementInRange?
>   if (!tryClaim) {
> return
>   }
> }
> (both isElementInRange and tryClaim are now doing the same bounds checking 
> which can lead to subtle bounds checking errors).

Generally code would be iterating over the range, and it would likely
be a bug to check past it, but if we want to support code that ignores
range.getEndPosition() and lets tryClaim do all the work I buy that as
a good argument to allow arbitrary claim attempts.

>> Maybe I'm just not clever enough to come up with a kind of source
>> where this could be good at catching errors?
>
> I think the value is that we expect to implement a few restriction tracker 
> classes which will be re-used across many SDF implementations. In this case, 
> we could point out to the SDF author that they haven't claimed all that they 
> said they would process. This would be true whether markDone existed or not.

The general pattern is

processRestriction() {
  for (element : source[restriction]) {
if (!tryClaim(element)) {
  return STOP
} else {
  emit(element)
}
  }
  tryClaim(everything)
  return STOP  // or if CONTINUE is returned, omit the above line
}

and I'm having a hard time coming up with any bugs that would be
caught if we didn't require the (seemingly boilerplate)
tryClaim(everything) line. Maybe I'm not thinking of the right source?

>> > Also, "" is the byte key range, the code could have just passed in 
>> > range.getEndPosition() in to the final tryClaim, its just that "" is 
>> > shorthand and would be similar to passing in Long.MAX_VALUE for the file 
>> > offset range.
>>
>> Having to choose a value pass depending on the restriction tracker
>> type is something that could simply be eliminated.
>>
>> > On Fri, Nov 30, 2018 at 2:45 AM Robert Bradshaw  
>> > wrote:
>> >>
>> >> In looking at the SDF examples, it seems error-prone to have to
>> >> remember to write
>> >>
>> >> tryClaim([fake-end-position])
>> >>
>> >> to indicate that a restriction is finished. IIRC, this was done to
>> >> decide whether the entire restriction had been processed on return in
>> >> the case that tryClaim never returned false. It seems preferable to
>> >> encode this into the return value (with a void return meaning iff
>> >> tryClaim returned false, and a non-void return being able to indicate
>> >> any hints as to when, if ever, process should be called again).
>> >>
>> >> Can someone job my memory as to if there was a case in which this 
>> >> wouldn't work?


Re: [DISCUSS] Structuring Java based DSLs

2018-11-30 Thread Robert Bradshaw
I suppose what I'm trying to say is that I see this module structure
as a tool for discoverability and enumerating end-user endpoints. In
other words, if one wants to use SQL, it would seem odd to have to
depend on sdks-java-euphoria-sql rather than just sdks-java-sql if
sdks-java-euphoria is also a DSL one might use. A sibling relationship
does not prohibit the layered approach to implementation that sounds
like it makes sense.

(As for merging Euphoria into core, my initial impression is that's
probably a good idea, and something we should consider for 3.0 at the
very least.)

On Fri, Nov 30, 2018 at 11:06 PM Jan Lukavský  wrote:
>
> Hi Rui,
>
> yes, there are optimizations that could be added by each layer. The purpose 
> of Euphoria layer actually is not to reorder or modify any user operators 
> that are present in the pipeline (because it might not have enough 
> information to do this), but it can for instance choose between various join 
> implementations (shuffle join, broadcast join, ...) - so the optimizations it 
> can do are more low level. But this plays nicely with the DSL hierarchy - 
> each layer adds a little more restrictions, but can therefore do more 
> optimizations. And I think that the layer between SDK and SQL wouldn't have 
> to support SQL optimizations, it would only have to support way for SQL to 
> express these optimizations.
>
>   Jan -- Původní e-mail --
> Od: Rui Wang 
> Komu: dev@beam.apache.org
> Datum: 30. 11. 2018 22:43:04
> Předmět: Re: [DISCUSS] Structuring Java based DSLs
>
> SQL's optimization is another area to consider for integration. SQL 
> optimization includes pushing down filters/projections, merging or removing 
> or swapping plan nodes and comparing plan costs to choose best plan.  Add 
> another layer between SQL and java core might need the layer to support SQL 
> optimizations if there is a need.
>
> I don't have a clear image on what SQL needs from Euphoria for 
> optimization(best case is nothing). As those optimizations are happening or 
> will happen, we might start to have a sense of it.
>
> -Rui
>
> On Fri, Nov 30, 2018 at 12:38 PM Robert Bradshaw  wrote:
>
> I don't really see Euphoria as a subset of SQL or the other way
> around, and I think it makes sense to use either without the other, so
> by this criteria keeping them as siblings than a nesting.
>
> That said, I think it's really good to have a bunch of shared code,
> e.g. a join library that could be used by both. One could even depend
> on the other without having to abandon the sibling relationship.
> Something like retractions belong in the core SDK itself. Deeper than
> that, actually, it should be part of the model.
>
> - Robert
>
> On Fri, Nov 30, 2018 at 7:20 PM David Morávek  wrote:
> >
> > Jan, we made Kryo optional recently (it is a separate module and is used 
> > only in tests). From a quick look it seems that we forgot to remove compile 
> > time dependency from euphoria's build.gradle. Only "strong" dependencies 
> > I'm aware of are core SDK and guava. We'll be probably adding sketching 
> > extension dependency soon.
> >
> > D.
> >
> > On Fri, Nov 30, 2018 at 7:08 PM Jan Lukavský  wrote:
> >>
> >> Hi Anton,
> >> reactions inline.
> >>
> >> -- Původní e-mail --
> >> Od: Anton Kedin 
> >> Komu: dev@beam.apache.org
> >> Datum: 30. 11. 2018 18:17:06
> >> Předmět: Re: [DISCUSS] Structuring Java based DSLs
> >>
> >> I think this approach makes sense in general, Euphoria can be the 
> >> implementation detail of SQL, similar to Join Library or core SDK Schemas.
> >>
> >> I wonder though whether it would be better to bring Euphoria closer to 
> >> core SDK first, maybe even merge them together. If you look at Reuven's 
> >> recent work around schemas it seems like there are already similarities 
> >> between that and Euphoria's approach, unless I'm missing the point (e.g. 
> >> Filter transforms, FullJoin vs CoGroup... see [2]). And we're already 
> >> switching parts of SQL to those transforms (e.g. SQL Aggregation is now 
> >> implemented by core SDK's Group[3]).
> >>
> >>
> >>
> >> Yes, these transforms seem to be very similar to those Euphoria has. 
> >> Whether or not to merge Euphoria with core is essentially just a decision 
> >> of the community (in my point of view).
> >>
> >>
> >>
> >> Adding explicit Schema support to Euphoria will bring it both closer to 
> >> core SDK and make it natural to use for SQL. Can this be a first step 
> >> towards this integration?
> >>
> >>
> >>
> >> Euphoria currently operates on pure PCollections, so when PCollection has 
> >> a schema, it will be accessible by Euphoria. It makes sense to make use of 
> >> the schema in Euphoria - it seems natural on inputs to Euphoria operators, 
> >> but it might be tricky (not saying impossible) to actually produce 
> >> schema-aware PCollections as outputs from Euphoria operators (generally 
> >> speaking, in special cases that might be possible). Regarding inputs, 
> 

Re: What is Jenkins job "Portable_Python" in PreCommit?

2018-11-30 Thread Mark Liu
Thank you for improving the test coverage. That's very helpful!

On the other hand, I have a PR  in
review to make time-consuming integration tests to run parallel in
PreCommit. This could benefits tests like portable_python to be enabled in
`:pythonPreCommit` without creating a Jenkins branch and run seed job for
testing. However, if we have other reasons, then it may not applied.

Would love to hear thoughts on this.

Mark

On Fri, Nov 30, 2018 at 12:27 PM Thomas Weise  wrote:

> This is a very valuable addition! Given the execution times of Python and
> Java pre-commit, having this run in parallel should not be an issue.
>
> Thanks,
> Thomas
>
>
> On Fri, Nov 30, 2018 at 11:01 AM Ankur Goenka  wrote:
>
>> We added new precommit which tests a wordcount pipeline on portable flink
>> runner.
>> In recent time we missed catching some obvious issues which broke
>> potability and could have been caught by this test.
>> The current test is fairly light weight and executes in ~5min which seems
>> to be reasonable for a precommit test.
>> This precommit is added just yesterday so we don't actively track it yet.
>> However please let me know if you see any issues with this precommit.
>>
>> On Fri, Nov 30, 2018 at 3:49 AM Maximilian Michels 
>> wrote:
>>
>>> This was merged with https://github.com/apache/beam/pull/6954.
>>>
>>> Eventually we want to run a portable WordCount on PreCommit. We will do
>>> some more testing on Jenkins before it becomes an official PreCommit
>>> task.
>>>
>>> Thanks,
>>> Max
>>>
>>> On 29.11.18 19:03, Mark Liu wrote:
>>> > ah, thanks Boyuan! Probably I created the PR in a bad timing. Looks
>>> like
>>> > a new PR will fix it.
>>> >
>>> > On Thu, Nov 29, 2018 at 9:50 AM Boyuan Zhang >> > > wrote:
>>> >
>>> > I believe it's in this pending PR:
>>> > https://github.com/apache/beam/pull/7157.
>>> >
>>> > On Thu, Nov 29, 2018 at 8:36 AM Mark Liu >> > > wrote:
>>> >
>>> > Hi guys,
>>> >
>>> > I made some changes 
>>> > to Python PreCommit Gradle and then Portable_Python is invoked
>>> > as a PrecCommit test and failed. However, I can't find where
>>> > it's defined / generated in Gradle or Jenkins groovy. Does
>>> > anyone know? My branch is synced to master yesterday.
>>> >
>>> > Thanks!
>>> > Mark
>>> >
>>>
>>


Re: [DISCUSS] Structuring Java based DSLs

2018-11-30 Thread Jan Lukavský

Hi Rui,

yes, there are optimizations that could be added by each layer. The purpose
of Euphoria layer actually is not to reorder or modify any user operators 
that are present in the pipeline (because it might not have enough
information to do this), but it can for instance choose between various join
implementations (shuffle join, broadcast join, ...) - so the optimizations
it can do are more low level. But this plays nicely with the DSL hierarchy -
each layer adds a little more restrictions, but can therefore do more
optimizations. And I think that the layer between SDK and SQL wouldn't have
to support SQL optimizations, it would only have to support way for SQL to
express these optimizations.
  Jan -- Původní e-mail --
Od: Rui Wang 
Komu: dev@beam.apache.org
Datum: 30. 11. 2018 22:43:04
Předmět: Re: [DISCUSS] Structuring Java based DSLs
"
SQL's optimization is another area to consider for integration. SQL
optimization includes pushing down filters/projections, merging or removing
or swapping plan nodes and comparing plan costs to choose best plan.  Add
another layer between SQL and java core might need the layer to support SQL
optimizations if there is a need.




I don't have a clear image on what SQL needs from Euphoria for optimization
(best case is nothing). As those optimizations are happening or will happen,
we might start to have a sense of it.




-Rui 




On Fri, Nov 30, 2018 at 12:38 PM Robert Bradshaw mailto:rober...@google.com)> wrote:

"I don't really see Euphoria as a subset of SQL or the other way
around, and I think it makes sense to use either without the other, so
by this criteria keeping them as siblings than a nesting.

That said, I think it's really good to have a bunch of shared code,
e.g. a join library that could be used by both. One could even depend
on the other without having to abandon the sibling relationship.
Something like retractions belong in the core SDK itself. Deeper than
that, actually, it should be part of the model.

- Robert

On Fri, Nov 30, 2018 at 7:20 PM David Morávek mailto:d...@apache.org)> wrote:
>
> Jan, we made Kryo optional recently (it is a separate module and is used
only in tests). From a quick look it seems that we forgot to remove compile
time dependency from euphoria's build.gradle. Only "strong" dependencies I'm
aware of are core SDK and guava. We'll be probably adding sketching
extension dependency soon.
>
> D.
>
> On Fri, Nov 30, 2018 at 7:08 PM Jan Lukavský mailto:je...@seznam.cz)> wrote:
>>
>> Hi Anton,
>> reactions inline.
>>
>> -- Původní e-mail --
>> Od: Anton Kedin mailto:ke...@google.com)>
>> Komu: dev@beam.apache.org(mailto:dev@beam.apache.org)
>> Datum: 30. 11. 2018 18:17:06
>> Předmět: Re: [DISCUSS] Structuring Java based DSLs
>>
>> I think this approach makes sense in general, Euphoria can be the
implementation detail of SQL, similar to Join Library or core SDK Schemas.
>>
>> I wonder though whether it would be better to bring Euphoria closer to 
core SDK first, maybe even merge them together. If you look at Reuven's 
recent work around schemas it seems like there are already similarities 
between that and Euphoria's approach, unless I'm missing the point (e.g. 
Filter transforms, FullJoin vs CoGroup... see [2]). And we're already
switching parts of SQL to those transforms (e.g. SQL Aggregation is now 
implemented by core SDK's Group[3]).
>>
>>
>>
>> Yes, these transforms seem to be very similar to those Euphoria has. 
Whether or not to merge Euphoria with core is essentially just a decision of
the community (in my point of view).
>>
>>
>>
>> Adding explicit Schema support to Euphoria will bring it both closer to
core SDK and make it natural to use for SQL. Can this be a first step
towards this integration?
>>
>>
>>
>> Euphoria currently operates on pure PCollections, so when PCollection has
a schema, it will be accessible by Euphoria. It makes sense to make use of
the schema in Euphoria - it seems natural on inputs to Euphoria operators,
but it might be tricky (not saying impossible) to actually produce schema-
aware PCollections as outputs from Euphoria operators (generally speaking,
in special cases that might be possible). Regarding inputs, there is
actually intention to act on type of PCollection - e.g. when PCollection is
already of type KV, then it is possible to make key extractor and value 
extractor optional in Euphoria builders, so it feels natural to enable
changing the builders when a schema-aware PCollection, and make use of the
provided schema. The rest of Euphoria team might correct me, if I'm wrong.
>>
>>
>>
>>
>> One question I have is, does Euphoria bring dependencies that are not 
needed by SQL, or does more or less only rely on the core SDK?
>>
>>
>>
>> I think the only relevant dependency that Euphoria has besides core SDK
is Kryo. It is the default coder when no coder is provided, but that could
be made optional - e.g. the default coder would be supported only if an 

Re: [DISCUSS] Structuring Java based DSLs

2018-11-30 Thread Jan Lukavský
Hi Robert,

Euphoria must be superset of SQL for the proposed approach to work. And I 
think that it already is, or at least can be made so. There might be some 
subtleties missing or be different, but that is the nice thing - by building
the DSLs bottom up, we can make sure that they are mutually consistent - i.
e. there are not multiple implementations of join semantics with slightly 
different behavior (due to multiple implementations). It is of course
possible to take some parts that are common and make a separate library, but
the way I see it, it should be possible to make this shared library Euphoria
itself, there are (currently) no known features that would imply
incompatibility between the two (which would force the approach you
propose).

 Jan -- Původní e-mail --
Od: Robert Bradshaw 
Komu: dev@beam.apache.org
Datum: 30. 11. 2018 21:39:01
Předmět: Re: [DISCUSS] Structuring Java based DSLs
"I don't really see Euphoria as a subset of SQL or the other way
around, and I think it makes sense to use either without the other, so
by this criteria keeping them as siblings than a nesting.

That said, I think it's really good to have a bunch of shared code,
e.g. a join library that could be used by both. One could even depend
on the other without having to abandon the sibling relationship.
Something like retractions belong in the core SDK itself. Deeper than
that, actually, it should be part of the model.

- Robert

On Fri, Nov 30, 2018 at 7:20 PM David Morávek  wrote:
>
> Jan, we made Kryo optional recently (it is a separate module and is used
only in tests). From a quick look it seems that we forgot to remove compile
time dependency from euphoria's build.gradle. Only "strong" dependencies I'm
aware of are core SDK and guava. We'll be probably adding sketching
extension dependency soon.
>
> D.
>
> On Fri, Nov 30, 2018 at 7:08 PM Jan Lukavský  wrote:
>>
>> Hi Anton,
>> reactions inline.
>>
>> -- Původní e-mail --
>> Od: Anton Kedin 
>> Komu: dev@beam.apache.org
>> Datum: 30. 11. 2018 18:17:06
>> Předmět: Re: [DISCUSS] Structuring Java based DSLs
>>
>> I think this approach makes sense in general, Euphoria can be the
implementation detail of SQL, similar to Join Library or core SDK Schemas.
>>
>> I wonder though whether it would be better to bring Euphoria closer to 
core SDK first, maybe even merge them together. If you look at Reuven's 
recent work around schemas it seems like there are already similarities 
between that and Euphoria's approach, unless I'm missing the point (e.g. 
Filter transforms, FullJoin vs CoGroup... see [2]). And we're already
switching parts of SQL to those transforms (e.g. SQL Aggregation is now 
implemented by core SDK's Group[3]).
>>
>>
>>
>> Yes, these transforms seem to be very similar to those Euphoria has. 
Whether or not to merge Euphoria with core is essentially just a decision of
the community (in my point of view).
>>
>>
>>
>> Adding explicit Schema support to Euphoria will bring it both closer to
core SDK and make it natural to use for SQL. Can this be a first step
towards this integration?
>>
>>
>>
>> Euphoria currently operates on pure PCollections, so when PCollection has
a schema, it will be accessible by Euphoria. It makes sense to make use of
the schema in Euphoria - it seems natural on inputs to Euphoria operators,
but it might be tricky (not saying impossible) to actually produce schema-
aware PCollections as outputs from Euphoria operators (generally speaking,
in special cases that might be possible). Regarding inputs, there is
actually intention to act on type of PCollection - e.g. when PCollection is
already of type KV, then it is possible to make key extractor and value 
extractor optional in Euphoria builders, so it feels natural to enable
changing the builders when a schema-aware PCollection, and make use of the
provided schema. The rest of Euphoria team might correct me, if I'm wrong.
>>
>>
>>
>>
>> One question I have is, does Euphoria bring dependencies that are not 
needed by SQL, or does more or less only rely on the core SDK?
>>
>>
>>
>> I think the only relevant dependency that Euphoria has besides core SDK
is Kryo. It is the default coder when no coder is provided, but that could
be made optional - e.g. the default coder would be supported only if an 
appropriate module would be available. That way I think that Euphoria has no
special dependencies.
>>
>>
>>
>> [1] https://github.com/apache/beam/blob/f66eb5fe23b2500b396e6f711cdf4aeef
6b31ab8/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/
Group.java#L73
>> [2] https://github.com/apache/beam/tree/f66eb5fe23b2500b396e6f711cdf4aeef
6b31ab8/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms
>> [3] https://github.com/apache/beam/blob/f66eb5fe23b2500b396e6f711cdf4aeef
6b31ab8/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/
extensions/sql/impl/rel/BeamAggregationRel.java#L179
>>
>>
>>
>> On Fri, Nov 30, 2018 at 6:29 AM Jan 

Re: [DISCUSS] Structuring Java based DSLs

2018-11-30 Thread Rui Wang
SQL's optimization is another area to consider for integration. SQL
optimization includes pushing down filters/projections, merging or removing
or swapping plan nodes and comparing plan costs to choose best plan.  Add
another layer between SQL and java core might need the layer to support SQL
optimizations if there is a need.

I don't have a clear image on what SQL needs from Euphoria for
optimization(best case is nothing). As those optimizations are happening or
will happen, we might start to have a sense of it.

-Rui

On Fri, Nov 30, 2018 at 12:38 PM Robert Bradshaw 
wrote:

> I don't really see Euphoria as a subset of SQL or the other way
> around, and I think it makes sense to use either without the other, so
> by this criteria keeping them as siblings than a nesting.
>
> That said, I think it's really good to have a bunch of shared code,
> e.g. a join library that could be used by both. One could even depend
> on the other without having to abandon the sibling relationship.
> Something like retractions belong in the core SDK itself. Deeper than
> that, actually, it should be part of the model.
>
> - Robert
>
> On Fri, Nov 30, 2018 at 7:20 PM David Morávek  wrote:
> >
> > Jan, we made Kryo optional recently (it is a separate module and is used
> only in tests). From a quick look it seems that we forgot to remove compile
> time dependency from euphoria's build.gradle. Only "strong" dependencies
> I'm aware of are core SDK and guava. We'll be probably adding sketching
> extension dependency soon.
> >
> > D.
> >
> > On Fri, Nov 30, 2018 at 7:08 PM Jan Lukavský  wrote:
> >>
> >> Hi Anton,
> >> reactions inline.
> >>
> >> -- Původní e-mail --
> >> Od: Anton Kedin 
> >> Komu: dev@beam.apache.org
> >> Datum: 30. 11. 2018 18:17:06
> >> Předmět: Re: [DISCUSS] Structuring Java based DSLs
> >>
> >> I think this approach makes sense in general, Euphoria can be the
> implementation detail of SQL, similar to Join Library or core SDK Schemas.
> >>
> >> I wonder though whether it would be better to bring Euphoria closer to
> core SDK first, maybe even merge them together. If you look at Reuven's
> recent work around schemas it seems like there are already similarities
> between that and Euphoria's approach, unless I'm missing the point (e.g.
> Filter transforms, FullJoin vs CoGroup... see [2]). And we're already
> switching parts of SQL to those transforms (e.g. SQL Aggregation is now
> implemented by core SDK's Group[3]).
> >>
> >>
> >>
> >> Yes, these transforms seem to be very similar to those Euphoria has.
> Whether or not to merge Euphoria with core is essentially just a decision
> of the community (in my point of view).
> >>
> >>
> >>
> >> Adding explicit Schema support to Euphoria will bring it both closer to
> core SDK and make it natural to use for SQL. Can this be a first step
> towards this integration?
> >>
> >>
> >>
> >> Euphoria currently operates on pure PCollections, so when PCollection
> has a schema, it will be accessible by Euphoria. It makes sense to make use
> of the schema in Euphoria - it seems natural on inputs to Euphoria
> operators, but it might be tricky (not saying impossible) to actually
> produce schema-aware PCollections as outputs from Euphoria operators
> (generally speaking, in special cases that might be possible). Regarding
> inputs, there is actually intention to act on type of PCollection - e.g.
> when PCollection is already of type KV, then it is possible to make key
> extractor and value extractor optional in Euphoria builders, so it feels
> natural to enable changing the builders when a schema-aware PCollection,
> and make use of the provided schema. The rest of Euphoria team might
> correct me, if I'm wrong.
> >>
> >>
> >>
> >>
> >> One question I have is, does Euphoria bring dependencies that are not
> needed by SQL, or does more or less only rely on the core SDK?
> >>
> >>
> >>
> >> I think the only relevant dependency that Euphoria has besides core SDK
> is Kryo. It is the default coder when no coder is provided, but that could
> be made optional - e.g. the default coder would be supported only if an
> appropriate module would be available. That way I think that Euphoria has
> no special dependencies.
> >>
> >>
> >>
> >> [1]
> https://github.com/apache/beam/blob/f66eb5fe23b2500b396e6f711cdf4aeef6b31ab8/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/Group.java#L73
> >> [2]
> https://github.com/apache/beam/tree/f66eb5fe23b2500b396e6f711cdf4aeef6b31ab8/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms
> >> [3]
> https://github.com/apache/beam/blob/f66eb5fe23b2500b396e6f711cdf4aeef6b31ab8/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamAggregationRel.java#L179
> >>
> >>
> >>
> >> On Fri, Nov 30, 2018 at 6:29 AM Jan Lukavský  wrote:
> >>
> >> Hi community,
> >>
> >> I'm part of Euphoria DSL team, and on behalf of this team, I'd like to
> >> discuss possible development of Java based 

Re: [SDF] Why do we need markDone (or an equivalent claim)?

2018-11-30 Thread Lukasz Cwik
On Fri, Nov 30, 2018 at 12:47 PM Robert Bradshaw 
wrote:

> On Fri, Nov 30, 2018 at 7:10 PM Lukasz Cwik  wrote:
> >
> > Uh, I'm not sure what your asking.
>
> I'm asking why we wanted a markDone in the first place.
>

When looking at the byte key restriction tracker code, I found a couple of
bugs around how ranges were being compared and how the byte key range was
being claimed (we weren't computing the next key correctly). The usage of
markDone seemed to be a crutch when attempting to correctly implement the
tryClaim code. Also, all the framework code that powered SDF wasn't aware
of markDone so it couldn't validate that the last claim failed. So I fixed
the tryClaim code and then didn't need markDone and removed it since this
was the only restriction tracker that had it.


> > The SDF API already has a void return on processElement means that a
> call to tryClaim must have returned false
>
> We could widen this to "or finished the restriction."
>

Yes, having markDone could be added to the API. Is it a crutch for subtle
bugs in tryClaim though?

> while a non void return allows the caller to either return STOP (tryClaim
> must have returned false) or return RESUME (with a time of when to resume).
>
> We could also return STOP if tryClaim never returned false but the
> restriction was finished.
>
> > This allows the framework code to prevent user errors by ensuring the
> restriction has been completed.
>
> I don't think the framework can ensure this. (It can enforce the above
> constraints that on a STOP tryClaim did indeed return false on the
> last call, but I'm fuzzy on the value this actually provides when it
> just means the use must artificially force it to return a false value.
> It also means we can't make it an error to try claiming values outside
> the initial restriction. If we want to make things more explicit, we
> could require a STOP or RESUME return rather than allow a void
> return.)


I don't think we want SDF authors to ensure that their values are in the
initial range first before attempting to claim them as this is the purpose
of tryClaim. The SDF code would then be checking that the range is valid
twice.

processElement() {
  readElement
  isElementInRange?
  if (!tryClaim) {
return
  }
}
(both isElementInRange and tryClaim are now doing the same bounds checking
which can lead to subtle bounds checking errors).


> Maybe I'm just not clever enough to come up with a kind of source
> where this could be good at catching errors?
>

I think the value is that we expect to implement a few restriction tracker
classes which will be re-used across many SDF implementations. In this
case, we could point out to the SDF author that they haven't claimed all
that they said they would process. This would be true whether markDone
existed or not.


> > Also, "" is the byte key range, the code could have just passed in
> range.getEndPosition() in to the final tryClaim, its just that "" is
> shorthand and would be similar to passing in Long.MAX_VALUE for the file
> offset range.
>
> Having to choose a value pass depending on the restriction tracker
> type is something that could simply be eliminated.

> On Fri, Nov 30, 2018 at 2:45 AM Robert Bradshaw 
> wrote:
> >>
> >> In looking at the SDF examples, it seems error-prone to have to
> >> remember to write
> >>
> >> tryClaim([fake-end-position])
> >>
> >> to indicate that a restriction is finished. IIRC, this was done to
> >> decide whether the entire restriction had been processed on return in
> >> the case that tryClaim never returned false. It seems preferable to
> >> encode this into the return value (with a void return meaning iff
> >> tryClaim returned false, and a non-void return being able to indicate
> >> any hints as to when, if ever, process should be called again).
> >>
> >> Can someone job my memory as to if there was a case in which this
> wouldn't work?
>


Re: [DISCUSS] SplittableDoFn Java SDK User Facing API

2018-11-30 Thread Lukasz Cwik
On Fri, Nov 30, 2018 at 1:02 PM Robert Bradshaw  wrote:

> On Fri, Nov 30, 2018 at 6:38 PM Lukasz Cwik  wrote:
> >
> > Sorry, for some reason I thought I had answered these.
>
> No problem, thanks for you patience :).
>
> > On Fri, Nov 30, 2018 at 2:20 AM Robert Bradshaw 
> wrote:
> >>
> >> I still have outstanding questions (above) about
> >>
> >> 1) Why we need arbitrary precision for backlog, instead of just using
> >> a (much simpler) double.
> >
> >
> > Double lacks the precision for reporting backlogs for byte key ranges
> (HBase, Bigtable, ...). Scanning a key range such as ["a", "b") and with a
> large number of keys with a really long common prefix such as
> "aab" and "aac", ... leads
> to the backlog not changing even though we are making progress through the
> key space. This also prevents splitting within such an area since the
> double can't provide that necessary precision (without multiple rounds of
> splitting which adds complexity).
>
> We'll have to support multiple rounds of splitting regardless. I can
> see how this gives more information up front though.
>

I agree that we will need to support multiple rounds of splitting from the
SDK side but this adds complexity from the runner side since it can only
increase the accuracy for a split by performing multiple rounds of
splitting at once.


> (As an aside, I've been thinking about some ways of solving the dark
> matter problem, and it might depend on knowing the actual key, using
> the fact that character boundaries are likely cut-off points for
> changes in density, which would get obscured by alternative
> representations.)
>

Every time I think about this issue, I can never get it to apply
meaningfully for unbounded sources such as a message queue like pubsub.
Also, having an infinitely precise backlog such as the decimal format would
still provide density information as the rate of change through the backlog
for a bounded source would change once a "cluster" was hit.


> >> 2) Whether its's worth passing backlog back to split requests, rather
> >> than (again) a double representing "portion of current remaining"
> >> which may change over time. (The most common split request is into
> >> even portions, and specifically half, which can't accurately be
> >> requested from a stale backlog.)
> >
> > I see two scenarios here:
> > * the fraction is exposed to the SDF author and then the SDF author
> needs to map from their restriciton space to backlog and also map fractions
> onto their restriction space meaning that they are required to write
> mappings between three different models.
> > * the fraction is not exposed to the SDF author and the framework code
> multiplies the fraction against the backlog and provides the backlog to the
> user (this solves the backlog skew issue but still has the limited
> precision issue).
>
> Limited precision is not as much of an issue here because one can
> express very small numbers to split close to the current position, and
> don't need high precision for splitting further away.
>

Agree. Would this also mean that skew when splitting at half doesn't really
matter?


> I also think it's nice that the space of possible splits is always
> (current position, restriction end) which a always double maps onto
> despite those both being moving targets. If you phrase things in terms
> of backlogs, you might ask for impossible things. I don't recall if
> the passed backlog is the amount that should be retained or the amount
> that should be returned, but if the latter, it'll be difficult to
> accurately split near the current position.
>

For the current proposal, it represents how much should be retained but as
was mentioned earlier, the semantics of returning multiple splits is still
up in the air.


> > I believe it is easier for an SDF author to write a two way mapping from
> backlog to their position space then to write two different types of
> mappings. For example, when a person is reading a file that has 100 bytes
> to process and is asked to split at 60.3%, they have to map 60.3% onto 100
> bytes figuring out that they are responsible for 60.3 bytes in which they
> round down to 60 bytes. In the scenario where the runner provides the
> backlog, 60.3 would have been sent across and the SDF author would only
> need to perform rounding.
>
> Yeah, that's something to mull on. Maybe with a set of concrete examples.
>
> >> There are also some questions about returning multiple remainders, and
> >> how that relates to/overlaps with the initial splitting, but those can
> >> probably be deferred.
> >
> >
> > Agree.
> >
> >>
> >> On Wed, Nov 28, 2018 at 2:23 AM Lukasz Cwik  wrote:
> >> >
> >> > I updated the PR addressing the last of Scott's comments and also
> migrated to use an integral fraction as Robert had recommended by using
> approach A for the proto representation and BigDecimal within the Java SDK:
> >> > A:
> >> > // Represents a non-negative decimal 

Re: [DISCUSS] SplittableDoFn Java SDK User Facing API

2018-11-30 Thread Robert Bradshaw
On Fri, Nov 30, 2018 at 6:38 PM Lukasz Cwik  wrote:
>
> Sorry, for some reason I thought I had answered these.

No problem, thanks for you patience :).

> On Fri, Nov 30, 2018 at 2:20 AM Robert Bradshaw  wrote:
>>
>> I still have outstanding questions (above) about
>>
>> 1) Why we need arbitrary precision for backlog, instead of just using
>> a (much simpler) double.
>
>
> Double lacks the precision for reporting backlogs for byte key ranges (HBase, 
> Bigtable, ...). Scanning a key range such as ["a", "b") and with a large 
> number of keys with a really long common prefix such as 
> "aab" and "aac", ... leads to 
> the backlog not changing even though we are making progress through the key 
> space. This also prevents splitting within such an area since the double 
> can't provide that necessary precision (without multiple rounds of splitting 
> which adds complexity).

We'll have to support multiple rounds of splitting regardless. I can
see how this gives more information up front though.

(As an aside, I've been thinking about some ways of solving the dark
matter problem, and it might depend on knowing the actual key, using
the fact that character boundaries are likely cut-off points for
changes in density, which would get obscured by alternative
representations.)

>> 2) Whether its's worth passing backlog back to split requests, rather
>> than (again) a double representing "portion of current remaining"
>> which may change over time. (The most common split request is into
>> even portions, and specifically half, which can't accurately be
>> requested from a stale backlog.)
>
> I see two scenarios here:
> * the fraction is exposed to the SDF author and then the SDF author needs to 
> map from their restriciton space to backlog and also map fractions onto their 
> restriction space meaning that they are required to write mappings between 
> three different models.
> * the fraction is not exposed to the SDF author and the framework code 
> multiplies the fraction against the backlog and provides the backlog to the 
> user (this solves the backlog skew issue but still has the limited precision 
> issue).

Limited precision is not as much of an issue here because one can
express very small numbers to split close to the current position, and
don't need high precision for splitting further away.

I also think it's nice that the space of possible splits is always
(current position, restriction end) which a always double maps onto
despite those both being moving targets. If you phrase things in terms
of backlogs, you might ask for impossible things. I don't recall if
the passed backlog is the amount that should be retained or the amount
that should be returned, but if the latter, it'll be difficult to
accurately split near the current position.

> I believe it is easier for an SDF author to write a two way mapping from 
> backlog to their position space then to write two different types of 
> mappings. For example, when a person is reading a file that has 100 bytes to 
> process and is asked to split at 60.3%, they have to map 60.3% onto 100 bytes 
> figuring out that they are responsible for 60.3 bytes in which they round 
> down to 60 bytes. In the scenario where the runner provides the backlog, 60.3 
> would have been sent across and the SDF author would only need to perform 
> rounding.

Yeah, that's something to mull on. Maybe with a set of concrete examples.

>> There are also some questions about returning multiple remainders, and
>> how that relates to/overlaps with the initial splitting, but those can
>> probably be deferred.
>
>
> Agree.
>
>>
>> On Wed, Nov 28, 2018 at 2:23 AM Lukasz Cwik  wrote:
>> >
>> > I updated the PR addressing the last of Scott's comments and also migrated 
>> > to use an integral fraction as Robert had recommended by using approach A 
>> > for the proto representation and BigDecimal within the Java SDK:
>> > A:
>> > // Represents a non-negative decimal number: unscaled_value * 10^(-scale)
>> > message Decimal {
>> >   // Represents the unscaled value as a big endian unlimited precision 
>> > non-negative integer.
>> >   bytes unscaled_value = 1;
>> >   // Represents the scale
>> >   uint32 scale = 2;
>> > }
>> >
>> > Ismael, I would like to defer the changes to improve the ByteBuddy 
>> > DoFnInvoker since that is parallelizable work and have filed BEAM-6142.
>> >
>> > I don't believe there are any other outstanding changes and would like to 
>> > get the PR merged so that people can start working on implementing support 
>> > for backlog reporting and splitting within the Java SDK harness, improving 
>> > the ByteBuddy DoFnInvoker, exposing the shared runner library parts, and 
>> > integrating this into ULR, Flink, Dataflow, ...
>> >
>> > On Mon, Nov 26, 2018 at 9:49 AM Lukasz Cwik  wrote:
>> >>
>> >>
>> >>
>> >> On Mon, Nov 26, 2018 at 9:09 AM Ismaël Mejía  wrote:
>> >>>
>> >>> > Bundle finalization is unrelated to backlogs 

Re: [SDF] Why do we need markDone (or an equivalent claim)?

2018-11-30 Thread Robert Bradshaw
On Fri, Nov 30, 2018 at 7:10 PM Lukasz Cwik  wrote:
>
> Uh, I'm not sure what your asking.

I'm asking why we wanted a markDone in the first place.

> The SDF API already has a void return on processElement means that a call to 
> tryClaim must have returned false

We could widen this to "or finished the restriction."

> while a non void return allows the caller to either return STOP (tryClaim 
> must have returned false) or return RESUME (with a time of when to resume).

We could also return STOP if tryClaim never returned false but the
restriction was finished.

> This allows the framework code to prevent user errors by ensuring the 
> restriction has been completed.

I don't think the framework can ensure this. (It can enforce the above
constraints that on a STOP tryClaim did indeed return false on the
last call, but I'm fuzzy on the value this actually provides when it
just means the use must artificially force it to return a false value.
It also means we can't make it an error to try claiming values outside
the initial restriction. If we want to make things more explicit, we
could require a STOP or RESUME return rather than allow a void
return.)

Maybe I'm just not clever enough to come up with a kind of source
where this could be good at catching errors?

> Also, "" is the byte key range, the code could have just passed in 
> range.getEndPosition() in to the final tryClaim, its just that "" is 
> shorthand and would be similar to passing in Long.MAX_VALUE for the file 
> offset range.

Having to choose a value pass depending on the restriction tracker
type is something that could simply be eliminated.

> On Fri, Nov 30, 2018 at 2:45 AM Robert Bradshaw  wrote:
>>
>> In looking at the SDF examples, it seems error-prone to have to
>> remember to write
>>
>> tryClaim([fake-end-position])
>>
>> to indicate that a restriction is finished. IIRC, this was done to
>> decide whether the entire restriction had been processed on return in
>> the case that tryClaim never returned false. It seems preferable to
>> encode this into the return value (with a void return meaning iff
>> tryClaim returned false, and a non-void return being able to indicate
>> any hints as to when, if ever, process should be called again).
>>
>> Can someone job my memory as to if there was a case in which this wouldn't 
>> work?


Re: [DISCUSS] Structuring Java based DSLs

2018-11-30 Thread Robert Bradshaw
I don't really see Euphoria as a subset of SQL or the other way
around, and I think it makes sense to use either without the other, so
by this criteria keeping them as siblings than a nesting.

That said, I think it's really good to have a bunch of shared code,
e.g. a join library that could be used by both. One could even depend
on the other without having to abandon the sibling relationship.
Something like retractions belong in the core SDK itself. Deeper than
that, actually, it should be part of the model.

- Robert

On Fri, Nov 30, 2018 at 7:20 PM David Morávek  wrote:
>
> Jan, we made Kryo optional recently (it is a separate module and is used only 
> in tests). From a quick look it seems that we forgot to remove compile time 
> dependency from euphoria's build.gradle. Only "strong" dependencies I'm aware 
> of are core SDK and guava. We'll be probably adding sketching extension 
> dependency soon.
>
> D.
>
> On Fri, Nov 30, 2018 at 7:08 PM Jan Lukavský  wrote:
>>
>> Hi Anton,
>> reactions inline.
>>
>> -- Původní e-mail --
>> Od: Anton Kedin 
>> Komu: dev@beam.apache.org
>> Datum: 30. 11. 2018 18:17:06
>> Předmět: Re: [DISCUSS] Structuring Java based DSLs
>>
>> I think this approach makes sense in general, Euphoria can be the 
>> implementation detail of SQL, similar to Join Library or core SDK Schemas.
>>
>> I wonder though whether it would be better to bring Euphoria closer to core 
>> SDK first, maybe even merge them together. If you look at Reuven's recent 
>> work around schemas it seems like there are already similarities between 
>> that and Euphoria's approach, unless I'm missing the point (e.g. Filter 
>> transforms, FullJoin vs CoGroup... see [2]). And we're already switching 
>> parts of SQL to those transforms (e.g. SQL Aggregation is now implemented by 
>> core SDK's Group[3]).
>>
>>
>>
>> Yes, these transforms seem to be very similar to those Euphoria has. Whether 
>> or not to merge Euphoria with core is essentially just a decision of the 
>> community (in my point of view).
>>
>>
>>
>> Adding explicit Schema support to Euphoria will bring it both closer to core 
>> SDK and make it natural to use for SQL. Can this be a first step towards 
>> this integration?
>>
>>
>>
>> Euphoria currently operates on pure PCollections, so when PCollection has a 
>> schema, it will be accessible by Euphoria. It makes sense to make use of the 
>> schema in Euphoria - it seems natural on inputs to Euphoria operators, but 
>> it might be tricky (not saying impossible) to actually produce schema-aware 
>> PCollections as outputs from Euphoria operators (generally speaking, in 
>> special cases that might be possible). Regarding inputs, there is actually 
>> intention to act on type of PCollection - e.g. when PCollection is already 
>> of type KV, then it is possible to make key extractor and value extractor 
>> optional in Euphoria builders, so it feels natural to enable changing the 
>> builders when a schema-aware PCollection, and make use of the provided 
>> schema. The rest of Euphoria team might correct me, if I'm wrong.
>>
>>
>>
>>
>> One question I have is, does Euphoria bring dependencies that are not needed 
>> by SQL, or does more or less only rely on the core SDK?
>>
>>
>>
>> I think the only relevant dependency that Euphoria has besides core SDK is 
>> Kryo. It is the default coder when no coder is provided, but that could be 
>> made optional - e.g. the default coder would be supported only if an 
>> appropriate module would be available. That way I think that Euphoria has no 
>> special dependencies.
>>
>>
>>
>> [1] 
>> https://github.com/apache/beam/blob/f66eb5fe23b2500b396e6f711cdf4aeef6b31ab8/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/Group.java#L73
>> [2] 
>> https://github.com/apache/beam/tree/f66eb5fe23b2500b396e6f711cdf4aeef6b31ab8/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms
>> [3] 
>> https://github.com/apache/beam/blob/f66eb5fe23b2500b396e6f711cdf4aeef6b31ab8/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamAggregationRel.java#L179
>>
>>
>>
>> On Fri, Nov 30, 2018 at 6:29 AM Jan Lukavský  wrote:
>>
>> Hi community,
>>
>> I'm part of Euphoria DSL team, and on behalf of this team, I'd like to
>> discuss possible development of Java based DSLs currently present in
>> Beam. In my knowledge, there are currently two DSLs based on Java SDK -
>> Euphoria and SQL. These DSLs currently share only the SDK itself,
>> although there might be room to share some more effort. We already know
>> that both Euphoria and SQL have need for retractions, but there are
>> probably many more features that these two could share.
>>
>> So, I'd like to open a discussion on what it would cost and what it
>> would possibly bring, if instead of the current structure
>>
>>Java SDK
>>
>>  |  SQL
>>
>>  |  Euphoria
>>
>> these DSLs would be structured as
>>
>>Java SDK ---> Euphoria ---> SQL

Re: What is Jenkins job "Portable_Python" in PreCommit?

2018-11-30 Thread Thomas Weise
This is a very valuable addition! Given the execution times of Python and
Java pre-commit, having this run in parallel should not be an issue.

Thanks,
Thomas


On Fri, Nov 30, 2018 at 11:01 AM Ankur Goenka  wrote:

> We added new precommit which tests a wordcount pipeline on portable flink
> runner.
> In recent time we missed catching some obvious issues which broke
> potability and could have been caught by this test.
> The current test is fairly light weight and executes in ~5min which seems
> to be reasonable for a precommit test.
> This precommit is added just yesterday so we don't actively track it yet.
> However please let me know if you see any issues with this precommit.
>
> On Fri, Nov 30, 2018 at 3:49 AM Maximilian Michels  wrote:
>
>> This was merged with https://github.com/apache/beam/pull/6954.
>>
>> Eventually we want to run a portable WordCount on PreCommit. We will do
>> some more testing on Jenkins before it becomes an official PreCommit task.
>>
>> Thanks,
>> Max
>>
>> On 29.11.18 19:03, Mark Liu wrote:
>> > ah, thanks Boyuan! Probably I created the PR in a bad timing. Looks
>> like
>> > a new PR will fix it.
>> >
>> > On Thu, Nov 29, 2018 at 9:50 AM Boyuan Zhang > > > wrote:
>> >
>> > I believe it's in this pending PR:
>> > https://github.com/apache/beam/pull/7157.
>> >
>> > On Thu, Nov 29, 2018 at 8:36 AM Mark Liu > > > wrote:
>> >
>> > Hi guys,
>> >
>> > I made some changes 
>> > to Python PreCommit Gradle and then Portable_Python is invoked
>> > as a PrecCommit test and failed. However, I can't find where
>> > it's defined / generated in Gradle or Jenkins groovy. Does
>> > anyone know? My branch is synced to master yesterday.
>> >
>> > Thanks!
>> > Mark
>> >
>>
>


Re: What is Jenkins job "Portable_Python" in PreCommit?

2018-11-30 Thread Ankur Goenka
We added new precommit which tests a wordcount pipeline on portable flink
runner.
In recent time we missed catching some obvious issues which broke
potability and could have been caught by this test.
The current test is fairly light weight and executes in ~5min which seems
to be reasonable for a precommit test.
This precommit is added just yesterday so we don't actively track it yet.
However please let me know if you see any issues with this precommit.

On Fri, Nov 30, 2018 at 3:49 AM Maximilian Michels  wrote:

> This was merged with https://github.com/apache/beam/pull/6954.
>
> Eventually we want to run a portable WordCount on PreCommit. We will do
> some more testing on Jenkins before it becomes an official PreCommit task.
>
> Thanks,
> Max
>
> On 29.11.18 19:03, Mark Liu wrote:
> > ah, thanks Boyuan! Probably I created the PR in a bad timing. Looks like
> > a new PR will fix it.
> >
> > On Thu, Nov 29, 2018 at 9:50 AM Boyuan Zhang  > > wrote:
> >
> > I believe it's in this pending PR:
> > https://github.com/apache/beam/pull/7157.
> >
> > On Thu, Nov 29, 2018 at 8:36 AM Mark Liu  > > wrote:
> >
> > Hi guys,
> >
> > I made some changes 
> > to Python PreCommit Gradle and then Portable_Python is invoked
> > as a PrecCommit test and failed. However, I can't find where
> > it's defined / generated in Gradle or Jenkins groovy. Does
> > anyone know? My branch is synced to master yesterday.
> >
> > Thanks!
> > Mark
> >
>


Re: Handling large values

2018-11-30 Thread Lukasz Cwik
On Thu, Nov 29, 2018 at 3:01 PM Robert Bradshaw  wrote:

> On Thu, Nov 29, 2018 at 7:08 PM Lukasz Cwik  wrote:
> >
> > On Thu, Nov 29, 2018 at 7:13 AM Robert Bradshaw 
> wrote:
> >>
> >> On Thu, Nov 29, 2018 at 2:18 AM Lukasz Cwik  wrote:
> >> >
> >> > I don't believe we would need to change any other coders since
> SeekableInputStream wouldn't change how a regular InputStream would work so
> coders that don't care about the implementation would still use it as a
> forward only input stream. Coders that care about seeking would use the new
> functionality.
> >>
> >> An API could be developed that makes this work, but the proposal of
> >>
> >> class SmartCoder {
> >>   public T decode(InputStream is) {
> >> if (is instanceof SeekableInputStream) {
> >>   return view((SeekableInputStream) is);
> >> }
> >> return decodeInternal(is);
> >>   }
> >> }
> >>
> >> would break it passed to (just as an example) the unmodified KV coder
> >>
> >> class KvCoder {
> >>   public Kv decode(InputStream is) {
> >> return Kv.of(keyCoder.decode(is), valueCoder.decode(is));
> >>   }
> >> }
> >>
> >> when invoked with an InputStream that happens to be a
> >> SeekableInputStream if either keyCoder or valueCoder were instances of
> >> SmartCoder unless SmartCoder.view() did something really clever about
> >> advancing the provided stream the right amount without actually
> >> consuming it. This is particularly expensive/tricky for iterables
> >> where it's most useful.
> >
> >
> > Thanks for walking through this with me Robert.
> >
> > The issue is that the view needs to advance the stream if it wants to
> decode the components separately, this works naturally for the iterable
> coder since all decoding is done in order so that advances the stream
> automatically and for any component coder where it also supports being a
> view. For any coder that isn't advancing the stream in order has to have an
> index as part of its encoding. Using the KV coder as the example, the two
> strategies would be as follows:
> >
> > decode method is the same for both strategies
> > public KV decode(InputStream is) {
> >   if (is instanceof SeekableInputStream) {
> > return KVView((SeekableInputStream) is, keyCoder, valueCoder);
> >   }
> >   return Kv.of(keyCoder.decode(is), valueCoder.decode(is));
> > }
> >
> > forward only view decoding:
> > class KVView extends KV {
> >   K getKey() {
> > if (!keyDecoded) {
> >   key = keyCoder.decode(is);
> > }
> > return key;
> >   }
> >
> >   V getValue() {
> > // ensures the input stream has advanced to the value position
> > getKey();
> >
> > if (!valueDecoded) {
> >  value = valueCoder.decode(is);
> > }
> > return value;
> > }
> >
> > index based decoding:
> > class KVView extends KV {
> >   KVView(SeekableInputStream is, Coder keyCoder, Coder valueCoder)
> {
> > valueOffset = readBigEndianInt(is);
> > // ...
> >   }
> >   K getKey() {
> > if (!keyDecoded) {
> >   is.seek(4);  // 4 bytes for big int index
> >   key = keyCoder.decode(is);
> > }
> > return key;
> >   }
> >
> >   V getValue() {
> > if (!valueDecoded) {
> >  is.seek(valueOffset);
> >  value = valueCoder.decode(is);
> > }
> > return value;
> > }
> >
> > I believe for the KV case and the iterable case we will find that our
> coders are typically KV, LengthPrefix> and
> Iterable> which would mean that a smart coder could
> inspect the component coder and if its a length prefix coder, ask it to
> seek to the end of its value within the input stream which mean that a
> smart coder could understand the length of its components.
>
> I understand how KV coder can be made smart. My concern is the
> difficulty of having dumb coders with smart coder components. E.g.
> imagine a row coder
>
> class DumbRowCoder {
>   Row decode(InputStream is) {
> List parts = ...
> for (Coder c : componentCoders) {
>   // Smart coders *must* advance the inputs stream in case the
>   // subsequent coder is also dumb.
>   // Efficient seek will require more than continuation tokens
> over the FnAPI.
>   // Important ones like iterable are likely to be lazily written,
> and so won't know
>   // their length when they start encoding, but iterating it to
> discover the length
>   // defeats much of the goal of being lazy.
>   parts.add(c.decode(is));
> }
>   }
> }
>

I agree that smart coders must advance the stream for interoperability with
dumb coders and also that efficient seek can't be built off of continuation
tokens.


> >> > For the encoding portion, the state backed length prefix coder would
> send the small snippet of data that it received plus the state key without
> invoking the component coder to encode the value. The downstream receiving
> party would need to lookup the remote reference to get all the data.
> >>
> >> I'm trying to follow what you're saying here. Are you restricting to
> >> the case of only 

Re: [DISCUSS] Structuring Java based DSLs

2018-11-30 Thread David Morávek
Jan, we made Kryo optional recently (it is a separate module and is used
only in tests). From a quick look it seems that we forgot to remove compile
time dependency from euphoria's *build.gradle*. Only "strong" dependencies
I'm aware of are core SDK and guava. We'll be probably adding sketching
extension dependency soon.

D.

On Fri, Nov 30, 2018 at 7:08 PM Jan Lukavský  wrote:

> Hi Anton,
> reactions inline.
>
> -- Původní e-mail --
> Od: Anton Kedin 
> Komu: dev@beam.apache.org
> Datum: 30. 11. 2018 18:17:06
> Předmět: Re: [DISCUSS] Structuring Java based DSLs
>
> I think this approach makes sense in general, Euphoria can be the
> implementation detail of SQL, similar to Join Library or core SDK Schemas.
>
> I wonder though whether it would be better to bring Euphoria closer to
> core SDK first, maybe even merge them together. If you look at Reuven's
> recent work around schemas it seems like there are already similarities
> between that and Euphoria's approach, unless I'm missing the point (e.g.
> Filter transforms, FullJoin vs CoGroup... see [2]). And we're already
> switching parts of SQL to those transforms (e.g. SQL Aggregation is now
> implemented by core SDK's Group[3]).
>
>
>
> Yes, these transforms seem to be very similar to those Euphoria has.
> Whether or not to merge Euphoria with core is essentially just a decision
> of the community (in my point of view).
>
>
>
> Adding explicit Schema support to Euphoria will bring it both closer to
> core SDK and make it natural to use for SQL. Can this be a first step
> towards this integration?
>
>
>
> Euphoria currently operates on pure PCollections, so when PCollection has
> a schema, it will be accessible by Euphoria. It makes sense to make use of
> the schema in Euphoria - it seems natural on inputs to Euphoria operators,
> but it might be tricky (not saying impossible) to actually produce
> schema-aware PCollections as outputs from Euphoria operators (generally
> speaking, in special cases that might be possible). Regarding inputs, there
> is actually intention to act on type of PCollection - e.g. when PCollection
> is already of type KV, then it is possible to make key extractor and value
> extractor optional in Euphoria builders, so it feels natural to enable
> changing the builders when a schema-aware PCollection, and make use of the
> provided schema. The rest of Euphoria team might correct me, if I'm wrong.
>
>
>
>
> One question I have is, does Euphoria bring dependencies that are not
> needed by SQL, or does more or less only rely on the core SDK?
>
>
>
> I think the only relevant dependency that Euphoria has besides core SDK is
> Kryo. It is the default coder when no coder is provided, but that could be
> made optional - e.g. the default coder would be supported only if an
> appropriate module would be available. That way I think that Euphoria has
> no special dependencies.
>
>
>
> [1]
> https://github.com/apache/beam/blob/f66eb5fe23b2500b396e6f711cdf4aeef6b31ab8/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/Group.java#L73
> [2]
> https://github.com/apache/beam/tree/f66eb5fe23b2500b396e6f711cdf4aeef6b31ab8/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms
> [3]
> https://github.com/apache/beam/blob/f66eb5fe23b2500b396e6f711cdf4aeef6b31ab8/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamAggregationRel.java#L179
>
>
>
> On Fri, Nov 30, 2018 at 6:29 AM Jan Lukavský  wrote:
>
> Hi community,
>
> I'm part of Euphoria DSL team, and on behalf of this team, I'd like to
> discuss possible development of Java based DSLs currently present in
> Beam. In my knowledge, there are currently two DSLs based on Java SDK -
> Euphoria and SQL. These DSLs currently share only the SDK itself,
> although there might be room to share some more effort. We already know
> that both Euphoria and SQL have need for retractions, but there are
> probably many more features that these two could share.
>
> So, I'd like to open a discussion on what it would cost and what it
> would possibly bring, if instead of the current structure
>
>Java SDK
>
>  |  SQL
>
>  |  Euphoria
>
> these DSLs would be structured as
>
>Java SDK ---> Euphoria ---> SQL
>
> I'm absolutely sure that this would be a great investment and a huge
> change, but I'd like to gather some opinions and general feelings of the
> community about this. Some points to start the discussion from my side
> would be, that structuring DSLs like this has internal logical
> consistency, because each API layer further narrows completeness, but
> brings simpler API for simpler tasks, while adding additional high-level
> view of the data processing pipeline and thus enabling more
> optimizations. On Euphoria side, these are various implementations joins
> (most effective implementation depends on data), pipeline sampling and
> more. Some (or maybe most) of these optimizations would have to be
> implemented 

Re: [SDF] Why do we need markDone (or an equivalent claim)?

2018-11-30 Thread Lukasz Cwik
Uh, I'm not sure what your asking. The SDF API already has a void return on
processElement means that a call to tryClaim must have returned false while
a non void return allows the caller to either return STOP (tryClaim must
have returned false) or return RESUME (with a time of when to resume). This
allows the framework code to prevent user errors by ensuring the
restriction has been completed.

Also, "" is the byte key range, the code could have just passed in
range.getEndPosition() in to the final tryClaim, its just that "" is
shorthand and would be similar to passing in Long.MAX_VALUE for the file
offset range.



On Fri, Nov 30, 2018 at 2:45 AM Robert Bradshaw  wrote:

> In looking at the SDF examples, it seems error-prone to have to
> remember to write
>
> tryClaim([fake-end-position])
>
> to indicate that a restriction is finished. IIRC, this was done to
> decide whether the entire restriction had been processed on return in
> the case that tryClaim never returned false. It seems preferable to
> encode this into the return value (with a void return meaning iff
> tryClaim returned false, and a non-void return being able to indicate
> any hints as to when, if ever, process should be called again).
>
> Can someone job my memory as to if there was a case in which this wouldn't
> work?
>


Re: [DISCUSS] Structuring Java based DSLs

2018-11-30 Thread Jan Lukavský

Hi Anton,

reactions inline.



-- Původní e-mail --
Od: Anton Kedin 
Komu: dev@beam.apache.org
Datum: 30. 11. 2018 18:17:06
Předmět: Re: [DISCUSS] Structuring Java based DSLs
"



I think this approach makes sense in general, Euphoria can be the
implementation detail of SQL, similar to Join Library or core SDK Schemas.



I wonder though whether it would be better to bring Euphoria closer to core
SDK first, maybe even merge them together. If you look at Reuven's recent 
work around schemas it seems like there are already similarities between 
that and Euphoria's approach, unless I'm missing the point (e.g. Filter 
transforms, FullJoin vs CoGroup... see [2]). And we're already switching 
parts of SQL to those transforms (e.g. SQL Aggregation is now implemented by
core SDK's Group[3]).




"
 
"









"
Yes, these transforms seem to be very similar to those Euphoria has. Whether
or not to merge Euphoria with core is essentially just a decision of the 
community (in my point of view).




"









Adding explicit Schema support to Euphoria will bring it both closer to core
SDK and make it natural to use for SQL. Can this be a first step towards 
this integration?




"
 
"









"
Euphoria currently operates on pure PCollections, so when PCollection has a
schema, it will be accessible by Euphoria. It makes sense to make use of the
schema in Euphoria - it seems natural on inputs to Euphoria operators, but
it might be tricky (not saying impossible) to actually produce schema-aware
PCollections as outputs from Euphoria operators (generally speaking, in 
special cases that might be possible). Regarding inputs, there is actually
intention to act on type of PCollection - e.g. when PCollection is already
of type KV, then it is possible to make key extractor and value extractor 
optional in Euphoria builders, so it feels natural to enable changing the 
builders when a schema-aware PCollection, and make use of the provided
schema. The rest of Euphoria team might correct me, if I'm wrong.


 
"









One question I have is, does Euphoria bring dependencies that are not needed
by SQL, or does more or less only rely on the core SDK?




"
 
"









"
I think the only relevant dependency that Euphoria has besides core SDK is
Kryo. It is the default coder when no coder is provided, but that could be
made optional - e.g. the default coder would be supported only if an
appropriate module would be available. That way I think that Euphoria has no
special dependencies.



"









[1] https://github.com/apache/beam/blob/f66eb5fe23b2500b396e6f711cdf4aeef6b
31ab8/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/
Group.java#L73
(https://github.com/apache/beam/blob/f66eb5fe23b2500b396e6f711cdf4aeef6b31ab8/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/Group.java#L73)

[2] https://github.com/apache/beam/tree/f66eb5fe23b2500b396e6f711cdf4aeef6b
31ab8/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms
(https://github.com/apache/beam/tree/f66eb5fe23b2500b396e6f711cdf4aeef6b31ab8/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms)

[3] https://github.com/apache/beam/blob/f66eb5fe23b2500b396e6f711cdf4aeef6b
31ab8/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/
sql/impl/rel/BeamAggregationRel.java#L179
(https://github.com/apache/beam/blob/f66eb5fe23b2500b396e6f711cdf4aeef6b31ab8/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamAggregationRel.java#L179)














On Fri, Nov 30, 2018 at 6:29 AM Jan Lukavský mailto:je...@seznam.cz)> wrote:

"Hi community,

I'm part of Euphoria DSL team, and on behalf of this team, I'd like to
discuss possible development of Java based DSLs currently present in
Beam. In my knowledge, there are currently two DSLs based on Java SDK - 
Euphoria and SQL. These DSLs currently share only the SDK itself,
although there might be room to share some more effort. We already know 
that both Euphoria and SQL have need for retractions, but there are
probably many more features that these two could share.

So, I'd like to open a discussion on what it would cost and what it
would possibly bring, if instead of the current structure

   Java SDK

     |  SQL

 |  Euphoria

these DSLs would be structured as

   Java SDK ---> Euphoria ---> SQL

I'm absolutely sure that this would be a great investment and a huge
change, but I'd like to gather some opinions and general feelings of the 
community about this. Some points to start the discussion from my side
would be, that structuring DSLs like this has internal logical
consistency, because each API layer further narrows completeness, but
brings simpler API for simpler tasks, while adding additional high-level 
view of the data processing pipeline and thus enabling more
optimizations. On Euphoria side, these are various implementations joins 
(most effective implementation depends on data), pipeline 

Re: [DISCUSS] SplittableDoFn Java SDK User Facing API

2018-11-30 Thread Lukasz Cwik
Note I have merged the PR but will continue to iterate based upon the
feedback provided in this thread as it has been quite useful.

On Fri, Nov 30, 2018 at 9:37 AM Lukasz Cwik  wrote:

> Sorry, for some reason I thought I had answered these.
>
> On Fri, Nov 30, 2018 at 2:20 AM Robert Bradshaw 
> wrote:
>
>> I still have outstanding questions (above) about
>>
>> 1) Why we need arbitrary precision for backlog, instead of just using
>> a (much simpler) double.
>>
>
> Double lacks the precision for reporting backlogs for byte key ranges
> (HBase, Bigtable, ...). Scanning a key range such as ["a", "b") and with a
> large number of keys with a really long common prefix such as
> "aab" and "aac", ... leads
> to the backlog not changing even though we are making progress through the
> key space. This also prevents splitting within such an area since the
> double can't provide that necessary precision (without multiple rounds of
> splitting which adds complexity).
>
>
>> 2) Whether its's worth passing backlog back to split requests, rather
>> than (again) a double representing "portion of current remaining"
>> which may change over time. (The most common split request is into
>> even portions, and specifically half, which can't accurately be
>> requested from a stale backlog.)
>>
>
> I see two scenarios here:
> * the fraction is exposed to the SDF author and then the SDF author needs
> to map from their restriciton space to backlog and also map fractions onto
> their restriction space meaning that they are required to write mappings
> between three different models.
> * the fraction is not exposed to the SDF author and the framework code
> multiplies the fraction against the backlog and provides the backlog to the
> user (this solves the backlog skew issue but still has the limited
> precision issue).
>
> I believe it is easier for an SDF author to write a two way mapping from
> backlog to their position space then to write two different types of
> mappings. For example, when a person is reading a file that has 100 bytes
> to process and is asked to split at 60.3%, they have to map 60.3% onto 100
> bytes figuring out that they are responsible for 60.3 bytes in which they
> round down to 60 bytes. In the scenario where the runner provides the
> backlog, 60.3 would have been sent across and the SDF author would only
> need to perform rounding.
>
>
>> There are also some questions about returning multiple remainders, and
>> how that relates to/overlaps with the initial splitting, but those can
>> probably be deferred.
>>
>
> Agree.
>
>
>> On Wed, Nov 28, 2018 at 2:23 AM Lukasz Cwik  wrote:
>> >
>> > I updated the PR addressing the last of Scott's comments and also
>> migrated to use an integral fraction as Robert had recommended by using
>> approach A for the proto representation and BigDecimal within the Java SDK:
>> > A:
>> > // Represents a non-negative decimal number: unscaled_value *
>> 10^(-scale)
>> > message Decimal {
>> >   // Represents the unscaled value as a big endian unlimited precision
>> non-negative integer.
>> >   bytes unscaled_value = 1;
>> >   // Represents the scale
>> >   uint32 scale = 2;
>> > }
>> >
>> > Ismael, I would like to defer the changes to improve the ByteBuddy
>> DoFnInvoker since that is parallelizable work and have filed BEAM-6142.
>> >
>> > I don't believe there are any other outstanding changes and would like
>> to get the PR merged so that people can start working on implementing
>> support for backlog reporting and splitting within the Java SDK harness,
>> improving the ByteBuddy DoFnInvoker, exposing the shared runner library
>> parts, and integrating this into ULR, Flink, Dataflow, ...
>> >
>> > On Mon, Nov 26, 2018 at 9:49 AM Lukasz Cwik  wrote:
>> >>
>> >>
>> >>
>> >> On Mon, Nov 26, 2018 at 9:09 AM Ismaël Mejía 
>> wrote:
>> >>>
>> >>> > Bundle finalization is unrelated to backlogs but is needed since
>> there is a class of data stores which need acknowledgement that says I have
>> successfully received your data and am now responsible for it such as
>> acking a message from a message queue.
>> >>>
>> >>> Currently ack is done by IOs as part of checkpointing. How this will
>> >>> be different? Can you please clarify how should be done in this case,
>> >>> or is this totally independent?
>> >>
>> >>
>> >> The flow for finalization and checkpointing is similar:
>> >> Checkpointing:
>> >> 1) Process a bundle
>> >> 2) Checkpoint bundle containing acks that need to be done
>> >> 3) When checkpoint resumes, acknowledge messages
>> >>
>> >> Finalization:
>> >> 1) Process a bundle
>> >> 2) Request bundle finalization when bundle completes
>> >> 3) SDK is asked to finalize bundle
>> >>
>> >> The difference between the two is that bundle finalization always goes
>> back to the same machine instance that processed the bundle while
>> checkpointing can be scheduled on another machine. Many message queue like
>> systems expose clients 

Re: [DISCUSS] SplittableDoFn Java SDK User Facing API

2018-11-30 Thread Lukasz Cwik
Sorry, for some reason I thought I had answered these.

On Fri, Nov 30, 2018 at 2:20 AM Robert Bradshaw  wrote:

> I still have outstanding questions (above) about
>
> 1) Why we need arbitrary precision for backlog, instead of just using
> a (much simpler) double.
>

Double lacks the precision for reporting backlogs for byte key ranges
(HBase, Bigtable, ...). Scanning a key range such as ["a", "b") and with a
large number of keys with a really long common prefix such as
"aab" and "aac", ... leads
to the backlog not changing even though we are making progress through the
key space. This also prevents splitting within such an area since the
double can't provide that necessary precision (without multiple rounds of
splitting which adds complexity).


> 2) Whether its's worth passing backlog back to split requests, rather
> than (again) a double representing "portion of current remaining"
> which may change over time. (The most common split request is into
> even portions, and specifically half, which can't accurately be
> requested from a stale backlog.)
>

I see two scenarios here:
* the fraction is exposed to the SDF author and then the SDF author needs
to map from their restriciton space to backlog and also map fractions onto
their restriction space meaning that they are required to write mappings
between three different models.
* the fraction is not exposed to the SDF author and the framework code
multiplies the fraction against the backlog and provides the backlog to the
user (this solves the backlog skew issue but still has the limited
precision issue).

I believe it is easier for an SDF author to write a two way mapping from
backlog to their position space then to write two different types of
mappings. For example, when a person is reading a file that has 100 bytes
to process and is asked to split at 60.3%, they have to map 60.3% onto 100
bytes figuring out that they are responsible for 60.3 bytes in which they
round down to 60 bytes. In the scenario where the runner provides the
backlog, 60.3 would have been sent across and the SDF author would only
need to perform rounding.


> There are also some questions about returning multiple remainders, and
> how that relates to/overlaps with the initial splitting, but those can
> probably be deferred.
>

Agree.


> On Wed, Nov 28, 2018 at 2:23 AM Lukasz Cwik  wrote:
> >
> > I updated the PR addressing the last of Scott's comments and also
> migrated to use an integral fraction as Robert had recommended by using
> approach A for the proto representation and BigDecimal within the Java SDK:
> > A:
> > // Represents a non-negative decimal number: unscaled_value * 10^(-scale)
> > message Decimal {
> >   // Represents the unscaled value as a big endian unlimited precision
> non-negative integer.
> >   bytes unscaled_value = 1;
> >   // Represents the scale
> >   uint32 scale = 2;
> > }
> >
> > Ismael, I would like to defer the changes to improve the ByteBuddy
> DoFnInvoker since that is parallelizable work and have filed BEAM-6142.
> >
> > I don't believe there are any other outstanding changes and would like
> to get the PR merged so that people can start working on implementing
> support for backlog reporting and splitting within the Java SDK harness,
> improving the ByteBuddy DoFnInvoker, exposing the shared runner library
> parts, and integrating this into ULR, Flink, Dataflow, ...
> >
> > On Mon, Nov 26, 2018 at 9:49 AM Lukasz Cwik  wrote:
> >>
> >>
> >>
> >> On Mon, Nov 26, 2018 at 9:09 AM Ismaël Mejía  wrote:
> >>>
> >>> > Bundle finalization is unrelated to backlogs but is needed since
> there is a class of data stores which need acknowledgement that says I have
> successfully received your data and am now responsible for it such as
> acking a message from a message queue.
> >>>
> >>> Currently ack is done by IOs as part of checkpointing. How this will
> >>> be different? Can you please clarify how should be done in this case,
> >>> or is this totally independent?
> >>
> >>
> >> The flow for finalization and checkpointing is similar:
> >> Checkpointing:
> >> 1) Process a bundle
> >> 2) Checkpoint bundle containing acks that need to be done
> >> 3) When checkpoint resumes, acknowledge messages
> >>
> >> Finalization:
> >> 1) Process a bundle
> >> 2) Request bundle finalization when bundle completes
> >> 3) SDK is asked to finalize bundle
> >>
> >> The difference between the two is that bundle finalization always goes
> back to the same machine instance that processed the bundle while
> checkpointing can be scheduled on another machine. Many message queue like
> systems expose clients which store in memory state and can't ack from
> another machine. You could solve the problem with checkpointing but would
> require each machine to be able to tell another machine that it got a
> checkpoint with acks that it is responsible for but this won't work
> everywhere and isn't as clean.
> >>
> >>>
> >>> > 

Re: Stand at FOSDEM 2019

2018-11-30 Thread Alexey Romanenko
I’m going to visit FOSDEM this year as well and will be glad to help as much as 
I can.  

> On 30 Nov 2018, at 13:00, Maximilian Michels  wrote:
> 
> Thank you for all your reactions. Looks like we will have a great presence at 
> FOSDEM :)
> 
> @Matthias: Yes, I'm planning to go.
> @Wout: Locals are perfect :)
> @Gris: Thanks for helping out with the merch!
> @JB: Are you also around?
> 
> I'll try to book something for Saturday afternoon.
> 
> Thanks,
> Max
> 
> On 30.11.18 09:15, Wout Scheepers wrote:
>> I’m based in Brussels and happy to help out.
>> Wout
>> *From: *Griselda Cuevas 
>> *Reply-To: *"dev@beam.apache.org" 
>> *Date: *Thursday, 29 November 2018 at 21:44
>> *To: *"dev@beam.apache.org" 
>> *Subject: *Re: Stand at FOSDEM 2019
>> +1 -- I'm happy to help with the merch, I'll be attending and will help 
>> staff the booth :)
>> G
>> On Thu, 29 Nov 2018 at 05:46, Suneel Marthi > > wrote:
>>+1
>>On Thu, Nov 29, 2018 at 6:14 AM Matthias Baetens
>>mailto:baetensmatth...@gmail.com>> wrote:
>>Hey Max,
>>Great idea. I'd be very keen to join. I'll look at my calendar
>>over the weekend to see if this would work.
>>Are you going yourself?
>>Cheers,
>>Matthias
>>On Thu, 29 Nov 2018 at 11:06 Maximilian Michels >> wrote:
>>Hi,
>>For everyone who might be attending FOSDEM19: What do you
>>think about
>>taking a slot for Beam at the Apache stand?
>>A slot is 2-3 hours. It is a great way to spread the word
>>about Beam. We
>>wouldn't have to prepare much, just bring some merch.
>>There is still plenty of space:
>>https://cwiki.apache.org/confluence/display/COMDEV/FOSDEM+2019
>>Cheers,
>>Max
>>PS: FOSDEM is an open-source conference in Brussels, Feb
>>2-3, 2019
>>-- 



Re: [DISCUSS] Structuring Java based DSLs

2018-11-30 Thread Anton Kedin
I think this approach makes sense in general, Euphoria can be the
implementation detail of SQL, similar to Join Library or core SDK Schemas.

I wonder though whether it would be better to bring Euphoria closer to core
SDK first, maybe even merge them together. If you look at Reuven's recent
work around schemas it seems like there are already similarities between
that and Euphoria's approach, unless I'm missing the point (e.g. Filter
transforms, FullJoin vs CoGroup... see [2]). And we're already switching
parts of SQL to those transforms (e.g. SQL Aggregation is now implemented
by core SDK's Group[3]).

Adding explicit Schema support to Euphoria will bring it both closer to
core SDK and make it natural to use for SQL. Can this be a first step
towards this integration?

One question I have is, does Euphoria bring dependencies that are not
needed by SQL, or does more or less only rely on the core SDK?

[1]
https://github.com/apache/beam/blob/f66eb5fe23b2500b396e6f711cdf4aeef6b31ab8/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/Group.java#L73
[2]
https://github.com/apache/beam/tree/f66eb5fe23b2500b396e6f711cdf4aeef6b31ab8/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms
[3]
https://github.com/apache/beam/blob/f66eb5fe23b2500b396e6f711cdf4aeef6b31ab8/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamAggregationRel.java#L179



On Fri, Nov 30, 2018 at 6:29 AM Jan Lukavský  wrote:

> Hi community,
>
> I'm part of Euphoria DSL team, and on behalf of this team, I'd like to
> discuss possible development of Java based DSLs currently present in
> Beam. In my knowledge, there are currently two DSLs based on Java SDK -
> Euphoria and SQL. These DSLs currently share only the SDK itself,
> although there might be room to share some more effort. We already know
> that both Euphoria and SQL have need for retractions, but there are
> probably many more features that these two could share.
>
> So, I'd like to open a discussion on what it would cost and what it
> would possibly bring, if instead of the current structure
>
>Java SDK
>
>  |  SQL
>
>  |  Euphoria
>
> these DSLs would be structured as
>
>Java SDK ---> Euphoria ---> SQL
>
> I'm absolutely sure that this would be a great investment and a huge
> change, but I'd like to gather some opinions and general feelings of the
> community about this. Some points to start the discussion from my side
> would be, that structuring DSLs like this has internal logical
> consistency, because each API layer further narrows completeness, but
> brings simpler API for simpler tasks, while adding additional high-level
> view of the data processing pipeline and thus enabling more
> optimizations. On Euphoria side, these are various implementations joins
> (most effective implementation depends on data), pipeline sampling and
> more. Some (or maybe most) of these optimizations would have to be
> implemented in both DSLs, so implementing them once is beneficial.
> Another benefit is that this would bring Euphoria "closer" to Beam core
> development (which would be good, it is part of the project anyway,
> right? :)) and help better drive features, that although currently
> needed mostly by SQL, might be needed by other Java users anyway.
>
> Thanks for discussion and looking forward to any opinions.
>
>Jan
>
>


Re: org.apache.beam.runners.flink.PortableTimersExecutionTest is very flakey

2018-11-30 Thread Maximilian Michels
This turned out to be a tricky bug. Robert and me had a joined debugging 
session and managed to find the culprit.


PR pending: https://github.com/apache/beam/pull/7171

On 27.11.18 19:35, Kenneth Knowles wrote:
I actually didn't look at this one. I filed a bunch more adjacent flake 
bugs. I didn't find your bug but I do see that test flaking at the same 
time as the others. FWIW here is the list of flakes and sickbayed tests: 
https://issues.apache.org/jira/issues/?filter=12343195


Kenn

On Tue, Nov 27, 2018 at 10:25 AM Alex Amato > wrote:


+Ken,

Did you happen to look into this test? I heard that you may have
been looking into this.

On Mon, Nov 26, 2018 at 3:36 PM Maximilian Michels mailto:m...@apache.org>> wrote:

Hi Alex,

Thanks for your help! I'm quite used to debugging
concurrent/distributed
problems. But this one is quite tricky, especially with regards
to GRPC
threads. I try to provide more information in the following.

There are two observations:

1) The problem is specifically related to how the cleanup is
performed
for the EmbeddedEnvironmentFactory. The environment is shutdown
when the
SDK Harness exists but the GRPC threads continue to linger for
some time
and may stall state processing on the next test.

If you do _not_ close DefaultJobBundleFactory, which happens during
close() or dispose() in the FlinkExecutableStageFunction or
ExecutableStageDoFnOperator respectively, the tests run just
fine. I ran
1000 test runs without a single failure.

The EmbeddedEnvironment uses direct channels which are marked
experimental in GRPC. We may have to convert them to regular socket
communication.

2) Try setting a conditional breakpoint in GrpcStateService
which will
never break, e.g. "false". Set it here:

https://github.com/apache/beam/blob/6da9aa5594f96c0201d497f6dce4797c4984a2fd/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/state/GrpcStateService.java#L134

The tests will never fail. The SDK harness is always shutdown
correctly
at the end of the test.

Thanks,
Max

On 26.11.18 19:15, Alex Amato wrote:
 > Thanks Maximilian, let me know if you need any help. Usually
I debug
 > this sort of thing by pausing the IntelliJ debugger to see
all the
 > different threads which are waiting on various conditions. If
you find
 > any insights from that, please post them here and we can try
to figure
 > out the source of the stuckness. Perhaps it may be some
concurrency
 > issue leading to deadlock?
 >
 > On Thu, Nov 22, 2018 at 12:57 PM Maximilian Michels
mailto:m...@apache.org>
 > >> wrote:
 >
 >     I couldn't fix it thus far. The issue does not seem to be
in the Flink
 >     Runner but in the way the tests utilizes the EMBEDDED
environment to
 >     run
 >     multiple portable jobs in a row.
 >
 >     When it gets stuck it is in RemoteBundle#close and it is
independent of
 >     the test type (batch and streaming have different
implementations).
 >
 >     Will give it another look tomorrow.
 >
 >     Thanks,
 >     Max
 >
 >     On 22.11.18 13:07, Maximilian Michels wrote:
 >      > Hi Alex,
 >      >
 >      > The test seems to have gotten flaky after we merged
support for
 >     portable
 >      > timers in Flink's batch mode.
 >      >
 >      > Looking into this now.
 >      >
 >      > Thanks,
 >      > Max
 >      >
 >      > On 21.11.18 23:56, Alex Amato wrote:
 >      >> Hello, I have noticed
 >      >>
that org.apache.beam.runners.flink.PortableTimersExecutionTest
 >     is very
 >      >> flakey, and repro'd this test timeout on the master
branch in
 >     40/50 runs.
 >      >>
 >      >> I filed a JIRA issue: BEAM-6111
 >      >> . I
was just
 >      >> wondering if anyone knew why this may be occurring,
and to check if
 >      >> anyone else has been experiencing this.
 >      >>
 >      >> Thanks,
 >      >> Alex
 >



[DISCUSS] Structuring Java based DSLs

2018-11-30 Thread Jan Lukavský

Hi community,

I'm part of Euphoria DSL team, and on behalf of this team, I'd like to 
discuss possible development of Java based DSLs currently present in 
Beam. In my knowledge, there are currently two DSLs based on Java SDK - 
Euphoria and SQL. These DSLs currently share only the SDK itself, 
although there might be room to share some more effort. We already know 
that both Euphoria and SQL have need for retractions, but there are 
probably many more features that these two could share.


So, I'd like to open a discussion on what it would cost and what it 
would possibly bring, if instead of the current structure


  Java SDK

    |  SQL

    |  Euphoria

these DSLs would be structured as

  Java SDK ---> Euphoria ---> SQL

I'm absolutely sure that this would be a great investment and a huge 
change, but I'd like to gather some opinions and general feelings of the 
community about this. Some points to start the discussion from my side 
would be, that structuring DSLs like this has internal logical 
consistency, because each API layer further narrows completeness, but 
brings simpler API for simpler tasks, while adding additional high-level 
view of the data processing pipeline and thus enabling more 
optimizations. On Euphoria side, these are various implementations joins 
(most effective implementation depends on data), pipeline sampling and 
more. Some (or maybe most) of these optimizations would have to be 
implemented in both DSLs, so implementing them once is beneficial. 
Another benefit is that this would bring Euphoria "closer" to Beam core 
development (which would be good, it is part of the project anyway, 
right? :)) and help better drive features, that although currently 
needed mostly by SQL, might be needed by other Java users anyway.


Thanks for discussion and looking forward to any opinions.

  Jan



Re: Beam Metrics questions

2018-11-30 Thread Etienne Chauchot
Hi Phil,
Thanks for using MetricsPusher and Beam in general ! 
- MetricsHttpSink works that way: it filters out committed metrics from the 
json output when committed metrics are not
supported.  I checked, Flink runner still does not support committed metrics. 
So there should be no committed metrics
values in the output json.There might be a bug. I'll open a ticket: thx for 
pointing out ! You tested on flink and spark
right? and both output committed metrics values right?
- there is no default mechanism to fallback committed metrics values on 
attempted ones
- Apache Flink does no make flink Accumulators available in detached mode, so 
indeed, metrics are not available in this
mode.
CCing dev list.
Etienne
Le lundi 26 novembre 2018 à 15:57 -0600, Phil Franklin a écrit :
> All of the discussion I’ve seen says that Flink and Spark only provided 
> attempted metric values, but when I use
> MetricsHttpSink and look at the JSON it has both attempted and committed 
> values (albeit, both the same for my simple
> testing).  Has the metrics processing been updated recently, and I’m just 
> missing the change updates?  Or are the
> committed values being defaulted to the attempted values? 
> 
> Also, I’ve seen it mentioned that Flink doesn’t report metrics when in 
> detached mode.  Is this still the case?
> 
> 
> Thanks for your help!


Re: [FEEDBACK REQUEST] Re: [ANNOUNCEMENT] Nexmark included to the CI

2018-11-30 Thread Etienne Chauchot
No problem, always glad to help

Etienne

Le jeudi 29 novembre 2018 à 09:20 -0800, Alex Amato a écrit :
> Thanks Etienne, appreciate the info. This will help me a lot :)
> On Wed, Nov 28, 2018 at 1:02 AM Etienne Chauchot  wrote:
> > Hi Alex,
> > Exporting results to the dashboards is as easy as writing to a BigQuery 
> > table and then configure the dashboard SQL
> > request to display it. Here is an example:
> > - exporting: 
> > https://github.com/apache/beam/blob/ad150c1d654aac5720975727d8c6981c5382b449/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/Main.java#L163
> > - displaying:
> > 
> > SELECT
> > DATE(timestamp) as date,
> > runtimeSec
> > FROM
> > [apache-beam-testing:nexmark.nexmark_0_DirectRunner_batch]
> > WHERE
> > timestamp >= TIMESTAMP_TO_SEC(DATE_ADD(CURRENT_TIMESTAMP(), -2, 
> > "WEEK")) 
> > ORDER BY
> > date;
> > 
> > Best
> > Etienne
> > 
> > Le mardi 27 novembre 2018 à 17:34 -0800, Alex Amato a écrit :
> > > It would be great to add some lower level benchmark tests for the java 
> > > SDK. I was thinking of using open census
> > > for collecting benchmarks, which looks easy to use should be license 
> > > compatible. I'm just not sure about how to
> > > export the results so that we can display them on the perfkit dashboard 
> > > for everyone to see.
> > > 
> > > Is there an example PR for this part? Can we write to this data store for 
> > > this perfkit dashboard easily?
> > > 
> > > https://github.com/census-instrumentation/opencensus-java
> > > https://github.com/census-instrumentation/opencensus-java/tree/master/exporters/trace/zipkin#quickstart
> > > 
> > > 
> > > 
> > > 
> > > On Thu, Jul 19, 2018 at 1:28 PM Andrew Pilloud  
> > > wrote:
> > > > The doc changes look good to me, I'll add Dataflow once it is ready. 
> > > > Thanks for opening the issue on the
> > > > DirectRunner. I'll try to get some progress on a dedicated perf node 
> > > > while you are gone, we can talk about
> > > > increasing the size of the nexmark input collection for the runs once 
> > > > we know what the utilization on that looks
> > > > like.
> > > > Enjoy your time off!
> > > > 
> > > > Andrew
> > > > On Thu, Jul 19, 2018 at 9:00 AM Etienne Chauchot  
> > > > wrote:
> > > > > Hi guys,As suggested by Anton bellow, I opened a PR on the website to 
> > > > > reference the Nexmark dashboards. As I
> > > > > did not want users to take them for proper neutral benchmarks of the 
> > > > > runners / engines,  but more for a CI
> > > > > piece of software, I added a disclaimer.
> > > > > Please:- tell if you agree on  the publication of such performance 
> > > > > results- comment on the PR for the
> > > > > disclaimer.
> > > > > PR: https://github.com/apache/beam-site/pull/500
> > > > > 
> > > > > Thanks
> > > > > Etienne
> > > > > 
> > > > > Le jeudi 19 juillet 2018 à 12:30 +0200, Etienne Chauchot a écrit :
> > > > > > Hi Anton, 
> > > > > > Yes, good idea, I'll update nexmark website page
> > > > > > Etienne
> > > > > > Le mercredi 18 juillet 2018 à 10:17 -0700, Anton Kedin a écrit :
> > > > > > > These dashboards look great!
> > > > > > > 
> > > > > > > Can publish the links to the dashboards somewhere, for better 
> > > > > > > visibility? E.g. in the jenkins website /
> > > > > > > emails, or the wiki.
> > > > > > > 
> > > > > > > Regards,Anton
> > > > > > > On Wed, Jul 18, 2018 at 10:08 AM Andrew Pilloud 
> > > > > > >  wrote:
> > > > > > > > Hi Etienne,
> > > > > > > > 
> > > > > > > > I've been asking around and it sounds like we should be able to 
> > > > > > > > get a dedicated Jenkins node for
> > > > > > > > performance tests. Another thing that might help is making the 
> > > > > > > > runs a few times longer. They are
> > > > > > > > currently running around 2 seconds each, so the total time of 
> > > > > > > > the build probably exceeds testing.
> > > > > > > > Internally at Google we are running them with 2000x as many 
> > > > > > > > events on Dataflow, but a job of that size
> > > > > > > > won't even complete on the Direct Runner.
> > > > > > > > I didn't see the query 3 issues, but now that you point it out 
> > > > > > > > it looks like a bug to me too.
> > > > > > > > 
> > > > > > > > Andrew
> > > > > > > > On Wed, Jul 18, 2018 at 1:13 AM Etienne Chauchot 
> > > > > > > >  wrote:
> > > > > > > > > Hi Andrew,
> > > > > > > > > Yes I saw that, except dedicating jenkins nodes to nexmark, I 
> > > > > > > > > see no other way.
> > > > > > > > > Also, did you see query 3 output size on direct runner? 
> > > > > > > > > Should be a straight line and it is not, I'm
> > > > > > > > > wondering if there is a problem with sate and timers impl in 
> > > > > > > > > direct runner.
> > > > > > > > > Etienne
> > > > > > > > > Le mardi 17 juillet 2018 à 11:38 -0700, Andrew Pilloud a 
> > > > > > > > > écrit :
> > > > > > > > > > I'm noticing the graphs are really noisy. It looks like we 
> > > > > > > > > > are running these on shared Jenkins
> > > > > > > > 

Re: Stand at FOSDEM 2019

2018-11-30 Thread Maximilian Michels
Thank you for all your reactions. Looks like we will have a great 
presence at FOSDEM :)


@Matthias: Yes, I'm planning to go.
@Wout: Locals are perfect :)
@Gris: Thanks for helping out with the merch!
@JB: Are you also around?

I'll try to book something for Saturday afternoon.

Thanks,
Max

On 30.11.18 09:15, Wout Scheepers wrote:

I’m based in Brussels and happy to help out.

Wout

*From: *Griselda Cuevas 
*Reply-To: *"dev@beam.apache.org" 
*Date: *Thursday, 29 November 2018 at 21:44
*To: *"dev@beam.apache.org" 
*Subject: *Re: Stand at FOSDEM 2019

+1 -- I'm happy to help with the merch, I'll be attending and will help 
staff the booth :)


G

On Thu, 29 Nov 2018 at 05:46, Suneel Marthi > wrote:


+1

On Thu, Nov 29, 2018 at 6:14 AM Matthias Baetens
mailto:baetensmatth...@gmail.com>> wrote:

Hey Max,

Great idea. I'd be very keen to join. I'll look at my calendar
over the weekend to see if this would work.

Are you going yourself?

Cheers,

Matthias

On Thu, 29 Nov 2018 at 11:06 Maximilian Michels mailto:m...@apache.org>> wrote:

Hi,

For everyone who might be attending FOSDEM19: What do you
think about
taking a slot for Beam at the Apache stand?

A slot is 2-3 hours. It is a great way to spread the word
about Beam. We
wouldn't have to prepare much, just bring some merch.

There is still plenty of space:
https://cwiki.apache.org/confluence/display/COMDEV/FOSDEM+2019

Cheers,
Max

PS: FOSDEM is an open-source conference in Brussels, Feb
2-3, 2019

-- 



Re: What is Jenkins job "Portable_Python" in PreCommit?

2018-11-30 Thread Maximilian Michels

This was merged with https://github.com/apache/beam/pull/6954.

Eventually we want to run a portable WordCount on PreCommit. We will do 
some more testing on Jenkins before it becomes an official PreCommit task.


Thanks,
Max

On 29.11.18 19:03, Mark Liu wrote:
ah, thanks Boyuan! Probably I created the PR in a bad timing. Looks like 
a new PR will fix it.


On Thu, Nov 29, 2018 at 9:50 AM Boyuan Zhang > wrote:


I believe it's in this pending PR:
https://github.com/apache/beam/pull/7157.

On Thu, Nov 29, 2018 at 8:36 AM Mark Liu mailto:mark...@google.com>> wrote:

Hi guys,

I made some changes 
to Python PreCommit Gradle and then Portable_Python is invoked
as a PrecCommit test and failed. However, I can't find where
it's defined / generated in Gradle or Jenkins groovy. Does
anyone know? My branch is synced to master yesterday.

Thanks!
Mark



[SDF] Why do we need markDone (or an equivalent claim)?

2018-11-30 Thread Robert Bradshaw
In looking at the SDF examples, it seems error-prone to have to
remember to write

tryClaim([fake-end-position])

to indicate that a restriction is finished. IIRC, this was done to
decide whether the entire restriction had been processed on return in
the case that tryClaim never returned false. It seems preferable to
encode this into the return value (with a void return meaning iff
tryClaim returned false, and a non-void return being able to indicate
any hints as to when, if ever, process should be called again).

Can someone job my memory as to if there was a case in which this wouldn't work?


Re: [DISCUSS] SplittableDoFn Java SDK User Facing API

2018-11-30 Thread Robert Bradshaw
I still have outstanding questions (above) about

1) Why we need arbitrary precision for backlog, instead of just using
a (much simpler) double.
2) Whether its's worth passing backlog back to split requests, rather
than (again) a double representing "portion of current remaining"
which may change over time. (The most common split request is into
even portions, and specifically half, which can't accurately be
requested from a stale backlog.)

There are also some questions about returning multiple remainders, and
how that relates to/overlaps with the initial splitting, but those can
probably be deferred.

On Wed, Nov 28, 2018 at 2:23 AM Lukasz Cwik  wrote:
>
> I updated the PR addressing the last of Scott's comments and also migrated to 
> use an integral fraction as Robert had recommended by using approach A for 
> the proto representation and BigDecimal within the Java SDK:
> A:
> // Represents a non-negative decimal number: unscaled_value * 10^(-scale)
> message Decimal {
>   // Represents the unscaled value as a big endian unlimited precision 
> non-negative integer.
>   bytes unscaled_value = 1;
>   // Represents the scale
>   uint32 scale = 2;
> }
>
> Ismael, I would like to defer the changes to improve the ByteBuddy 
> DoFnInvoker since that is parallelizable work and have filed BEAM-6142.
>
> I don't believe there are any other outstanding changes and would like to get 
> the PR merged so that people can start working on implementing support for 
> backlog reporting and splitting within the Java SDK harness, improving the 
> ByteBuddy DoFnInvoker, exposing the shared runner library parts, and 
> integrating this into ULR, Flink, Dataflow, ...
>
> On Mon, Nov 26, 2018 at 9:49 AM Lukasz Cwik  wrote:
>>
>>
>>
>> On Mon, Nov 26, 2018 at 9:09 AM Ismaël Mejía  wrote:
>>>
>>> > Bundle finalization is unrelated to backlogs but is needed since there is 
>>> > a class of data stores which need acknowledgement that says I have 
>>> > successfully received your data and am now responsible for it such as 
>>> > acking a message from a message queue.
>>>
>>> Currently ack is done by IOs as part of checkpointing. How this will
>>> be different? Can you please clarify how should be done in this case,
>>> or is this totally independent?
>>
>>
>> The flow for finalization and checkpointing is similar:
>> Checkpointing:
>> 1) Process a bundle
>> 2) Checkpoint bundle containing acks that need to be done
>> 3) When checkpoint resumes, acknowledge messages
>>
>> Finalization:
>> 1) Process a bundle
>> 2) Request bundle finalization when bundle completes
>> 3) SDK is asked to finalize bundle
>>
>> The difference between the two is that bundle finalization always goes back 
>> to the same machine instance that processed the bundle while checkpointing 
>> can be scheduled on another machine. Many message queue like systems expose 
>> clients which store in memory state and can't ack from another machine. You 
>> could solve the problem with checkpointing but would require each machine to 
>> be able to tell another machine that it got a checkpoint with acks that it 
>> is responsible for but this won't work everywhere and isn't as clean.
>>
>>>
>>> > UnboundedPerElement/BoundedPerElement tells us during pipeline 
>>> > construction time what type of PCollection we will be creating since we 
>>> > may have a bounded PCollection goto an UnboundedPerElement DoFn and that 
>>> > will produce an unbounded PCollection and similarly we could have an 
>>> > unbounded PCollection goto a BoundedPerElement DoFn and that will produce 
>>> > an unbounded PCollection. Restrictions.IsBounded is used during pipeline 
>>> > execution to inform the runner whether a restriction being returned is 
>>> > bounded or not since unbounded restrictions can return bounded 
>>> > restrictions during splitting. So in the above example using the message 
>>> > queue, the first 7 restrictions that only read 1250 messages would be 
>>> > marked with the Restrictions.IsBounded interface while the last one would 
>>> > not be. This could also be a method on restrictions such as "IsBounded 
>>> > isBounded()" on Pcollections.
>>>
>>> Thanks for the explanation about Restrictions.IsBounded, since this is
>>> information for the runner What is the runner expected to do
>>> differently when IsUnbounded? (I assume that IsBounded is the default
>>> behavior and nothing changes).
>>
>>
>> Knowing whether a restriction is bounded or unbounded is important, one 
>> example use case would be for the limited depth splitting proposal 
>> (https://docs.google.com/document/d/1cKOB9ToasfYs1kLWQgffzvIbJx2Smy4svlodPRhFrk4/edit#heading=h.wkwslng744mv)
>>  since you want to keep the unbounded restrictions at level 0 and only pass 
>> the bounded restrictions to the other levels. The reasoning behind this is 
>> that you don't want to end up in a state where all your unbounded 
>> restrictions are at the highest level preventing you from splitting any 
>> further.
>>
>>>
>>> > 

Re: Stand at FOSDEM 2019

2018-11-30 Thread Wout Scheepers
I’m based in Brussels and happy to help out.

Wout

From: Griselda Cuevas 
Reply-To: "dev@beam.apache.org" 
Date: Thursday, 29 November 2018 at 21:44
To: "dev@beam.apache.org" 
Subject: Re: Stand at FOSDEM 2019

+1 -- I'm happy to help with the merch, I'll be attending and will help staff 
the booth :)

G

On Thu, 29 Nov 2018 at 05:46, Suneel Marthi 
mailto:smar...@apache.org>> wrote:
+1

On Thu, Nov 29, 2018 at 6:14 AM Matthias Baetens 
mailto:baetensmatth...@gmail.com>> wrote:
Hey Max,

Great idea. I'd be very keen to join. I'll look at my calendar over the weekend 
to see if this would work.
Are you going yourself?

Cheers,
Matthias

On Thu, 29 Nov 2018 at 11:06 Maximilian Michels 
mailto:m...@apache.org>> wrote:
Hi,

For everyone who might be attending FOSDEM19: What do you think about
taking a slot for Beam at the Apache stand?

A slot is 2-3 hours. It is a great way to spread the word about Beam. We
wouldn't have to prepare much, just bring some merch.

There is still plenty of space:
https://cwiki.apache.org/confluence/display/COMDEV/FOSDEM+2019

Cheers,
Max

PS: FOSDEM is an open-source conference in Brussels, Feb 2-3, 2019
--