Re: [ANNOUNCE] New committer: Brian Hulette

2019-11-14 Thread Gleb Kanterov
Congratulations!

On Fri, Nov 15, 2019 at 5:44 AM Valentyn Tymofieiev 
wrote:

> Congratulations, Brian!
>
> On Thu, Nov 14, 2019 at 6:25 PM jincheng sun 
> wrote:
>
>> Congratulation Brian!
>>
>> Best,
>> Jincheng
>>
>> Kyle Weaver  于2019年11月15日周五 上午7:19写道:
>>
>>> Thanks for your contributions and congrats Brian!
>>>
>>> On Thu, Nov 14, 2019 at 3:14 PM Kenneth Knowles  wrote:
>>>
 Hi all,

 Please join me and the rest of the Beam PMC in welcoming a new
 committer: Brian Hulette

 Brian introduced himself to dev@ earlier this year and has been
 contributing since then. His contributions to Beam include explorations of
 integration with Arrow, standardizing coders, portability for schemas, and
 presentations at Beam events.

 In consideration of Brian's contributions, the Beam PMC trusts him with
 the responsibilities of a Beam committer [1].

 Thank you, Brian, for your contributions and looking forward to many
 more!

 Kenn, on behalf of the Apache Beam PMC

 [1]
 https://beam.apache.org/contribute/become-a-committer/#an-apache-beam-committer

>>>


Re: On processing event streams

2019-11-14 Thread Kenneth Knowles
On Tue, Nov 12, 2019 at 1:36 AM Jan Lukavský  wrote:

> Hi,
>
> this is follow up of multiple threads covering the topic of how to (in a
> unified way) process event streams. Event streams can be characterized
> by a common property that ordering of events matter.


1. events are ordered (hence timestamps)
2. most operators do not depend on order / operators that depend on some
order do not depend on the total order
3. real-world data is inherently likely to have a distribution of disorder
that has an unboundedly long tail


> The processing
> (usually) looks something like
>
>unordered stream -> buffer (per key) -> ordered stream -> stateful
> logic (DoFn)
>

This is a pre-watermark, pre-Beam approach to processing data. It drops
more data and/or introduces more latency, but can lead to simpler or more
efficient operator implementations (but not always).

I do think it seems OK to add this to Beam in some form as a convenience
when the user knows something about their data and/or DoFn. The risk of
course is that users go for this when they shouldn't, thinking it is
simpler without considering the complexity of how to avoid dropping too
much data.

This thread seems to be a continuation of the other thread I just responded
to. It would be good to try to keep them tied together to avoid duplicate
responses.

Kenn

This is perfectly fine and can be solved by current tools Beam offers
> (state & timers), but *only for streaming case*. The batch case is
> essentially broken, because:
>
>   a) out-of-orderness is essentially *unbounded* (as opposed to input
> being bounded, strangely, that is not a contradiction), out-of-orderness
> in streaming case is *bounded*, because the watermark can fall behind
> only limit amount of time (sooner or later, nobody would actually care
> about results from streaming pipeline being months or years late, right?)
>
>   b) with unbounded out-of-orderness, the spatial requirements of state
> grow with O(N), worst case, where N is size of the whole input
>
>   c) moreover, many runners restrict the size of state per key to fit in
> memory (spark, flink)
>
> Now, solutions to this problems seem to be:
>
>   1) refine the model guarantees for batch stateful processing, so that
> we limit the out-of-orderness (the source of issues here) - the only
> reasonable way to do that is to enforce sorting before all stateful
> dofns in batch case (perhaps there might opt-out for that), or
>
>   2) define a way to mark stateful dofn as requiring the sorting (e.g.
> @RequiresTimeSortedInput) - note this has to be done for both batch and
> streaming case, as opposed to 1), or
>
>   3) define a different URN for "ordered stateful dofn", with default
> expansion using state as buffer (for both batch and streaming case) -
> that way this can be overridden in batch runners that can get into
> trouble otherwise (and could be regarded as sort of natural extension of
> the current approach).
>
> I still think that the best solution is 1), for multiple reasons going
> from being internally logically consistent to being practical and easily
> implemented (a few lines of code in flink's case for instance). On the
> other hand, if this is really not what we want to do, then I'd like to
> know the community's opinion on the two other options (or, if there
> maybe is some other option I didn't cover).
>
> Many thanks for opinions and help with fixing what is (sort of) broken
> right now.
>
> Jan
>
>


slides?

2019-11-14 Thread Austin Bennett
Hi Dev and User,

Wondering if people would find a benefit from collecting slides from
Meetups/Talks?

Seems that this could be appropriate on the website, for instance.  Not
sure whether this has been asked previously, so bringing it to the group.

Cheers,
Austin


Re: Why is Pipeline not Serializable and can it be changed to be Serializable

2019-11-14 Thread Pulasthi Supun Wickramasinghe
Hi Luke,

That is the approach i am taking currently to handle the functions. I Might
have to do the same for Coders as well since some coders have the same
issue of not having default constructors.

I also initially considered converting the pipeline into a JSON format and
sending that over to the workers, Will take a look at the option you have
mentioned since we do plan to implement a Portable pipeline runner for
Twister2 as well. Thanks for the information

Best Regards,
Pulasthi


On Thu, Nov 14, 2019 at 2:30 PM Luke Cwik  wrote:

> You should create placeholders inside of your Twister2/OpenMPI
> implementation that represent these functions and then instantiate actual
> instances of them on the workers if you want to write your own pipeline
> representation and format for OpenMPI/Twister2.
>
> Or consider converting the pipeline to its proto representation and
> building a portable pipeline runner. This way you could run Go and Python
> pipelines as well. The best example of this is the current Flink
> integration[1]
>
> 1:
> https://github.com/apache/beam/blob/master/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchPortablePipelineTranslator.java
>
> On Wed, Nov 13, 2019 at 7:44 PM Pulasthi Supun Wickramasinghe <
> pulasthi...@gmail.com> wrote:
>
>> Hi Dev's
>>
>> Currently, the Pipeline class in Beam is not Serializable. This is not a
>> problem for the current runners since the pipeline is translated and
>> submitted through a centralized Driver like model. However, if the runner
>> has a decentralized model similar to OpenMPI (MPI), which is also the case
>> with Twister2, which I am developing a runner currently, it would have been
>> better if the pipeline itself was Serializable.
>>
>> Currently, I am trying to transform the Pipeline into a Twister2 graph
>> and then send over to the workers, however since there are some functions
>> such as "SystemReduceFn" that are not serializable this also is somewhat
>> troublesome.
>>
>> Was the decision to make Pipelines not Serializable made due to some
>> specific reason or because all the current use cases did not present any
>> valid requirement to make them Serializable?
>>
>> Best Regards,
>> Pulasthi
>> --
>> Pulasthi S. Wickramasinghe
>> PhD Candidate  | Research Assistant
>> School of Informatics and Computing | Digital Science Center
>> Indiana University, Bloomington
>> cell: 224-386-9035 <(224)%20386-9035>
>>
>

-- 
Pulasthi S. Wickramasinghe
PhD Candidate  | Research Assistant
School of Informatics and Computing | Digital Science Center
Indiana University, Bloomington
cell: 224-386-9035


Re: Proposal: @RequiresTimeSortedInput

2019-11-14 Thread Kenneth Knowles
Hi Jan,

Sorry for the very slow reply.

Your proposed feature is sensitive to all data that is not in timestamp
order, which is not the same as late. In Beam "late" is defined as
"assigned to a window where the watermark has passed the end of the window
and a 'final' aggregate has been produced". Your proposal is not really
sensitive to this form of late data.

I think there is some published work that will help you particularly in
addressing out-of-order data. Note that this is not the normal notion of
late. . Trill has a high-watermark driven sorting buffer prior to sending
elements in order to stateful operators. It is similar to your sketched
algorithm for emitting elements as the watermark passes. I believe Gearpump
also uses a sorting buffer and processes in order, and we do have a
Gearpump runner still here in our repo.

Kenn

On Mon, Nov 4, 2019 at 3:54 AM Jan Lukavský  wrote:

> Hi,
>
> there has been some development around this [1], which essentially
> concludes that currently this feature can be safely supported only by
> direct runner, flink runner (both batch and streaming, non-portable
> only) and spark (batch, legacy only). This is due to the fact, that time
> sorting relies heavily on timers to be strictly ordered. Failing to do
> so might result in unpredictable data loss, due to window-cleanup of
> state occurring prior to all elements being emitted (note that this
> generally might happen even to current user pipelines!). I can link
> issues [2], [3] and [4] to [5], but the question is, with only so few
> runners being able to support this, what should be the best way to
> incorporate this into any upcoming release (I'm assuming that this will
> pass a vote, which is not known yet)? I'd say that the best way would be
> the affected runners to fail to execute the pipeline until the
> respective issues are resolved. Another option would be to block this
> until the issues are resolved in runners, but that might delay the
> availability of this feature for some unknown time.
>
> Thanks for any opinions,
>
> Jan
>
> [1]
>
> https://lists.apache.org/thread.html/71a8f48ca518f1f2e6e9b1284114624670884775d209b0097f68264b@%3Cdev.beam.apache.org%3E
>
> [2] https://issues.apache.org/jira/browse/BEAM-8459
>
> [3] https://issues.apache.org/jira/browse/BEAM-8460
>
> [4] https://issues.apache.org/jira/browse/BEAM-8543.
>
> [5] https://issues.apache.org/jira/browse/BEAM-8550
>
> On 10/31/19 2:59 PM, Jan Lukavský wrote:
> > Hi,
> >
> > as a follow-up from previous design draft, I'd like to promote the
> > document [1] and associated PR [2] to proposal.
> >
> > The PR contains working implementation for:
> >
> >  - non-portable batch flink and batch spark (legacy)
> >
> >  - all non-portable streaming runners that use StatefulDoFnRunner
> > (direct, samza, dataflow)
> >
> >  - portable flink (batch, streaming)
> >
> > There are still some unresolved issues:
> >
> >  a) no way to specify allowed lateness (currently is simply zero, late
> > data should be dropped)
> >
> >  b) need a way to specify user UDF for extracting timestamp (according
> > to [3] it would be useful to have that option)
> >
> >  c) need to add more tests (e.g. late data)
> >
> > The plan is to postpone resolution of issues a) and b) after the
> > proposal is merged. I'd like to gather some more feedback on the
> > proposal, iterate over that again, add more tests and then pass this
> > to a vote.
> >
> > Unrelated - during implementation a bug [4] in Samza runner was found.
> >
> > Looking forward to any comments!
> >
> > Jan
> >
> > [1]
> >
> https://docs.google.com/document/d/1ObLVUFsf1NcG8ZuIZE4aVy2RYKx2FfyMhkZYWPnI9-c/
> >
> >
> > [2] https://github.com/apache/beam/pull/8774
> >
> > [3]
> >
> https://lists.apache.org/thread.html/813429e78b895a336d4f5507e3b2330282e2904fa25d52d6d441741a@%3Cdev.beam.apache.org%3E
> >
> > [4] https://issues.apache.org/jira/browse/BEAM-8529
> >
> >
> > On 5/23/19 4:10 PM, Jan Lukavský wrote:
> >> Hi,
> >>
> >> I have written a very brief draft of how it might be possible to
> >> implement @RequireTimeSortedInput discussed in [1]. I see the
> >> document [2] a starting point for a discussion. There are several
> >> open questions, which I believe can be resolved by this great
> >> community. :-)
> >>
> >> Jan
> >>
> >> [1]
> >>
> https://lists.apache.org/thread.html/4609a1bb1662690d67950e76d2f1108b51327b8feaf9580de659552e@%3Cdev.beam.apache.org%3E
> >>
> >> [2]
> >>
> https://docs.google.com/document/d/1ObLVUFsf1NcG8ZuIZE4aVy2RYKx2FfyMhkZYWPnI9-c/
> >>
>


Re: [ANNOUNCE] New committer: Brian Hulette

2019-11-14 Thread Valentyn Tymofieiev
Congratulations, Brian!

On Thu, Nov 14, 2019 at 6:25 PM jincheng sun 
wrote:

> Congratulation Brian!
>
> Best,
> Jincheng
>
> Kyle Weaver  于2019年11月15日周五 上午7:19写道:
>
>> Thanks for your contributions and congrats Brian!
>>
>> On Thu, Nov 14, 2019 at 3:14 PM Kenneth Knowles  wrote:
>>
>>> Hi all,
>>>
>>> Please join me and the rest of the Beam PMC in welcoming a new
>>> committer: Brian Hulette
>>>
>>> Brian introduced himself to dev@ earlier this year and has been
>>> contributing since then. His contributions to Beam include explorations of
>>> integration with Arrow, standardizing coders, portability for schemas, and
>>> presentations at Beam events.
>>>
>>> In consideration of Brian's contributions, the Beam PMC trusts him with
>>> the responsibilities of a Beam committer [1].
>>>
>>> Thank you, Brian, for your contributions and looking forward to many
>>> more!
>>>
>>> Kenn, on behalf of the Apache Beam PMC
>>>
>>> [1]
>>> https://beam.apache.org/contribute/become-a-committer/#an-apache-beam-committer
>>>
>>


Re: [ANNOUNCE] New committer: Brian Hulette

2019-11-14 Thread jincheng sun
Congratulation Brian!

Best,
Jincheng

Kyle Weaver  于2019年11月15日周五 上午7:19写道:

> Thanks for your contributions and congrats Brian!
>
> On Thu, Nov 14, 2019 at 3:14 PM Kenneth Knowles  wrote:
>
>> Hi all,
>>
>> Please join me and the rest of the Beam PMC in welcoming a new committer:
>> Brian Hulette
>>
>> Brian introduced himself to dev@ earlier this year and has been
>> contributing since then. His contributions to Beam include explorations of
>> integration with Arrow, standardizing coders, portability for schemas, and
>> presentations at Beam events.
>>
>> In consideration of Brian's contributions, the Beam PMC trusts him with
>> the responsibilities of a Beam committer [1].
>>
>> Thank you, Brian, for your contributions and looking forward to many more!
>>
>> Kenn, on behalf of the Apache Beam PMC
>>
>> [1]
>> https://beam.apache.org/contribute/become-a-committer/#an-apache-beam-committer
>>
>


Re: Cleaning up Approximate Algorithms in Beam

2019-11-14 Thread Robert Bradshaw
On Thu, Nov 14, 2019 at 1:06 AM Kenneth Knowles  wrote:

> Wow. Nice summary, yes. Major calls to action:
>
> 0. Never allow a combiner that does not include the format of its state
> clear in its name/URN. The "update compatibility" problem makes their
> internal accumulator state essentially part of their public API. Combiners
> named for what they do are an inherent risk, since we might have a new way
> to do the same operation with different implementation-detail state.
>

It seems this will make for a worse user experience, motivated solely by
limitations in our implementation. I think we can do better. Hypothetical
idea: what if upgrade required access to the original graph (or at least
metadata about it) during construction? In this case an ApproximateDistinct
could look at what was used last time and try to do the same, but be free
to do something better when unconstrained. Another approach would be to
encode several alternative expansions in the Beam graph and let the runner
do the picking (based on prior submission). (Making the CombineFn, as
opposed to the composite, have several alternatives seems harder to reason
about, but maybe worth pursuing as well).

This is not unique to Combiners, but any stateful DoFn, or composite
operations with non-trivial internal structure (and coders). This has been
discussed a lot, perhaps there are some ideas there we could borrow?

And they will match search terms better, which is a major problem.
>

I'm not following this--by naming things after their implementation rather
than their intent I think they will be harder to search for.


> 1. Point users to HllCount. This seems to be the best of the three. Does
> it have a name that is clear enough about the format of its state? Noting
> that its Java package name includes zetasketch, perhaps.
> 2. Deprecate the others, at least. And remove them from e.g. Javadoc.
>

+1


> On Wed, Nov 13, 2019 at 10:01 AM Reuven Lax  wrote:
>
>>
>>
>> On Wed, Nov 13, 2019 at 9:58 AM Ahmet Altay  wrote:
>>
>>> Thank you for writing this summary.
>>>
>>> On Tue, Nov 12, 2019 at 6:35 PM Reza Rokni  wrote:
>>>
 Hi everyone;

 TL/DR : Discussion on Beam's various Approximate Distinct Count
 algorithms.

 Today there are several options for Approximate Algorithms in Apache
 Beam 2.16 with HLLCount being the most recently added. Would like to canvas
 opinions here on the possibility of rationalizing these API's by removing
 obsolete / less efficient implementations.
 The current situation:

 There are three options available to users: ApproximateUnique.java
 ,
 ApproximateDistinct.java
 
 and HllCount.java
 .
 A quick summary of these API's as I understand them:

 HllCount.java
 :
 Marked as @Experimental

 PTransforms to compute HyperLogLogPlusPlus (HLL++) sketches on data
 streams based on the ZetaSketch 
 implementation.Detailed design of this class, see
 https://s.apache.org/hll-in-beam.

 ApproximateUnique.java
 :
 Not Marked with experimental

 This API does not expose the ability to create sketches so it's not
 suitable for the OLAP use case that HLL++ is geared towards (do
 pre-aggregation into sketches to allow interactive query speeds). It's also
 less precise for the same amount of memory used: the error bounds in the
 doc comments give :

 /* The error is about

 {@code 2 * / sqrt(sampleSize)},) */

 Compared to the default HLLCount sketch size, its error is 10X larger
 than the HLL++ error.

>>>
>>> FWIW, There is a python implementation only for this version:
>>> https://github.com/apache/beam/blob/master/sdks/python/apache_beam/transforms/stats.py#L38
>>>
>>>
>>>
 ApproximateDistinct.java
 
 Marked with @Experimental

 This is a re-implementation of the HLL++ algorithm, based on the paper
 published in 2013. It is exposing sketches via a HyperLogLogPlusCoder. We
 have not run any benchm

Re: Python Precommit duration pushing 2 hours

2019-11-14 Thread Robert Bradshaw
On Thu, Nov 14, 2019 at 2:58 PM Ahmet Altay  wrote:
>
> On Thu, Nov 14, 2019 at 2:55 PM Mikhail Gryzykhin  wrote:
>>
>> Hi Everyone,
>>
>> Python precommit phrase timeouts for (roughly) 80% of the jobs in 2 hours. 
>> This also blocks release branch validation. I suggest to bump the timeout to 
>> 3 hours while we are working on a proper solution. This way many people can 
>> get unblocked.
>>
>> I believe the change can be rather small: 
>> https://github.com/apache/beam/pull/10121
>
> +1 to unblock the current state.

+1 from me as well. Who (if anyone) is looking into the root cause? I
don't want to bump this up and forget it--2 hours is painful enough as
is.

>> On Mon, Nov 11, 2019 at 5:24 PM Ning Kang  wrote:
>>>
>>> I'm removing the additional interactive test env + suite and add 
>>> [interactive] dependencies as extra dependencies in tests_require: 
>>> https://github.com/apache/beam/pull/10068
>>>
>>> On Mon, Nov 11, 2019 at 2:15 PM Robert Bradshaw  wrote:

 On Fri, Nov 8, 2019 at 5:45 PM Ahmet Altay  wrote:
 >
 > I looked at the log but I could not figure what is causing the timeout 
 > because the gradle scan links are missing. I sampled a few of the 
 > successful jobs, It seems like python 3.7 and python 2 are running 3 
 > tests in serial {interactive, py37cython, py37gcp} and {docs, 
 > py27cython, py27gcp} respectively. These two versions are pushing the 
 > total time because other variants are now only running {cython, gcp} 
 > versions.
 >
 > I suggest breaking up docs, and interactive into 2 separate suites of 
 > their own. docs is actually faster than interactive,just separating that 
 > out to a new suite might help.
 >
 > Interactive was recently added 
 > (https://github.com/apache/beam/pull/9741). +Ning Kang could you 
 > separate interactive to new suite?

 I would ask why interactive is a separate tox configuration at all; I
 don't think there's a need to run every test again with a couple of
 extra dependencies (adding ~30 minutes to every presumbit). I think it
 would be much more valuable to run the (presumably relatively small)
 set of interactive tests in all modes.

 (The other suites are to guerentee the tests specifically run
 *without* installing gcp and *without* compiling with Cython.)

 > On Fri, Nov 8, 2019 at 11:09 AM Robert Bradshaw  
 > wrote:
 >>
 >> Just saw another 2-hour timeout:
 >> https://builds.apache.org/job/beam_PreCommit_Python_Commit/9440/ , so
 >> perhaps we're not out of the woods yet (though in general things have
 >> been a lot better).
 >>
 >> On Tue, Nov 5, 2019 at 10:52 AM Ahmet Altay  wrote:
 >> >
 >> > GCP tests are already on separate locations. IO related tests are 
 >> > under /sdks/python/apache_beam/io/gcp and Dataflow related tests are 
 >> > under sdks/python/apache_beam/runners/dataflow. It should be a matter 
 >> > of changing gradle files to run either one of the base tests or GCP 
 >> > tests depending on the types of changes. I do not expect this to have 
 >> > any material impact on the precommit times because these two test 
 >> > suites take about exactly the same time to complete.
 >> >
 >> > #9985 is merged now. Precommit times on master branch dropped to ~1h 
 >> > 20 for the last 5 runs.
 >> >
 >> > On Tue, Nov 5, 2019 at 10:12 AM David Cavazos  
 >> > wrote:
 >> >>
 >> >> +1 to moving the GCP tests outside of core. If there are issues that 
 >> >> only show up on GCP tests but not in core, it might be an indication 
 >> >> that there needs to be another test in core covering that, but I 
 >> >> think that should be pretty rare.
 >> >>
 >> >> On Mon, Nov 4, 2019 at 8:33 PM Kenneth Knowles  
 >> >> wrote:
 >> >>>
 >> >>> +1 to moving forward with this
 >> >>>
 >> >>> Could we move GCP tests outside the core? Then only code changes 
 >> >>> touches/affecting GCP would cause them to run in precommit. Could 
 >> >>> still run them in postcommit in their own suite. If the core has 
 >> >>> reasonably stable abstractions that the connectors are built on, 
 >> >>> this should not change coverage much.
 >> >>>
 >> >>> Kenn
 >> >>>
 >> >>> On Mon, Nov 4, 2019 at 1:55 PM Ahmet Altay  wrote:
 >> 
 >>  PR for the proposed change: 
 >>  https://github.com/apache/beam/pull/9985
 >> 
 >>  On Mon, Nov 4, 2019 at 1:35 PM Udi Meiri  wrote:
 >> >
 >> > +1
 >> >
 >> > On Mon, Nov 4, 2019 at 12:09 PM Robert Bradshaw 
 >> >  wrote:
 >> >>
 >> >> +1, this seems like a good step with a clear win.
 >> >>
 >> >> On Mon, Nov 4, 2019 at 12:06 PM Ahmet Altay  
 >> >> wrote:
 >> >> >
 >> >> > Python precommits are still timing out on #9925.

Re: Date/Time Ranges & Protobuf

2019-11-14 Thread Luke Cwik
The timestamps flow both ways since:
* IO authors are responsible for saying what the watermark timestamp is and
stateful DoFns also allow for users to set timers in relative and
processing time domains.
* Runner authors need to understand and merge these timestamps together to
compute what the global watermark is for a PCollection.

On Thu, Nov 14, 2019 at 3:15 PM Sam Rohde  wrote:

> My two cents are we just need a proto representation for timestamps and
> durations that includes units. The underlying library can then determine
> what to do with it. Then further, we can have a standard across Beam SDKs
> and Runners of how to interpret the proto. Using a raw int64 for timestamps
> and durations is confusing and *very very *bug prone (as we have seen in
> the past).
>
> I don't know if this is relevant, but does Apache Beam have any standards
> surrounding leap years or seconds? If we were to make our own timestamp
> format, would we have to worry about that? Or is the timestamp supplied to
> Beam a property of the underlying system giving Beam the timestamp? If it
> is, then there may be some interop problems between sources.
>
> On Wed, Nov 13, 2019 at 10:35 AM Luke Cwik  wrote:
>
>> I do agree that Apache Beam can represent dates and times with arbitrary
>> precision and can do it many different ways.
>>
>> My argument has always been should around whether we restrict this range
>> to a common standard to increase interoperability across other systems. For
>> example, SQL database servers have varying degrees as to what ranges they
>> support:
>> * Oracle 10[1]: 0001-01-01 to -12-31
>> * Oracle 11g[2]: Julian era, ranging from January 1, 4712 BCE through
>> December 31,  CE (Common Era, or 'AD'). Unless BCE ('BC' in the format
>> mask)
>> * MySQL[3]: '1000-01-01 00:00:00' to '-12-31 23:59:59'
>> * Microsoft SQL:  January 1, 1753, through December 31,  for
>> datetime[4] and January 1,1 CE through December 31,  CE for datetime2[5]
>>
>> The common case of the global window containing timestamps that are
>> before and after all of these supported ranges above means that our users
>> can't represent a global window within a database using its common data
>> types.
>>
>> 1: https://docs.oracle.com/javadb/10.8.3.0/ref/rrefdttlimits.html
>> 2:
>> https://docs.oracle.com/cd/B28359_01/server.111/b28318/datatype.htm#CNCPT413
>> 3: https://dev.mysql.com/doc/refman/8.0/en/datetime.html
>> 4:
>> https://docs.microsoft.com/en-us/sql/t-sql/data-types/datetime-transact-sql?view=sql-server-ver15
>> 5:
>> https://docs.microsoft.com/en-us/sql/t-sql/data-types/datetime2-transact-sql?view=sql-server-ver15
>>
>> On Wed, Nov 13, 2019 at 3:28 AM Jan Lukavský  wrote:
>>
>>> Hi,
>>>
>>> just an idea on these related topics that appear these days - it might
>>> help to realize, that what we actually don't need a full arithmetic on
>>> timestamps (Beam model IMHO doesn't need to know exactly what is the exact
>>> difference of two events). What we actually need is a slightly simplified
>>> algebra. Given two timestamps T1 and T2 and a "duration" (a different type
>>> from timestamp), we need operations (not 100% sure that this is exhaustive,
>>> but seems to be):
>>>
>>>  - is_preceding(T1, T2): bool
>>>
>>>- important !is_preceding(T1, T2) does NOT imply that
>>> is_preceding(T2, T1) - !is_preceding(T1, T2) && !is_preceding(T2, T1) would
>>> mean events are _concurrent_
>>>
>>>- this relation has to be also antisymmetric
>>>
>>>- given this function we can construct a comparator, where multiple
>>> distinct timestamps can be "equal" (or with no particular ordering, which
>>> is natural property of time)
>>>
>>>  - min_timestamp_following(T1, duration): T2
>>>
>>>- that would return a timestamp for which is_preceding(T1 + duration,
>>> T2) would return true and no other timestamp X would exist for which
>>> is_preceding(T1 + duration, X) && is_preceding(X, T2) would be true
>>>
>>>- actually, this function would serve as the definition for the
>>> duration object
>>>
>>> If we can supply this algebra, it seems that we can use any
>>> representation of timestamps and intervals. It might be (probably) even
>>> possible to let user specify his own type used as timestamps and durations,
>>> which could solve the issues of not currently being able to correctly
>>> represent timestamps lower than Long.MIN_VALUE (although we can get data
>>> for that low timestamps - cosmic microwave background being one example
>>> :)). Specifying this algebra actually probably boils down to proposal (3)
>>> in Robert's thread [1].
>>>
>>> Just my 2 cents.
>>>
>>> Jan
>>>
>>> [1]
>>> https://lists.apache.org/thread.html/1672898393cb0d54a77a879be0fb5725902289a3e5063d0f9ec36fe1@%3Cdev.beam.apache.org%3E
>>> On 11/13/19 10:11 AM, jincheng sun wrote:
>>>
>>> Thanks for bringing up this discussion @Luke.
>>>
>>> As @Kenn mentioned, in Beam we have defined the constants value for the
>>> min/max/end of global window. 

Re: [ANNOUNCE] New committer: Brian Hulette

2019-11-14 Thread Kyle Weaver
Thanks for your contributions and congrats Brian!

On Thu, Nov 14, 2019 at 3:14 PM Kenneth Knowles  wrote:

> Hi all,
>
> Please join me and the rest of the Beam PMC in welcoming a new committer:
> Brian Hulette
>
> Brian introduced himself to dev@ earlier this year and has been
> contributing since then. His contributions to Beam include explorations of
> integration with Arrow, standardizing coders, portability for schemas, and
> presentations at Beam events.
>
> In consideration of Brian's contributions, the Beam PMC trusts him with
> the responsibilities of a Beam committer [1].
>
> Thank you, Brian, for your contributions and looking forward to many more!
>
> Kenn, on behalf of the Apache Beam PMC
>
> [1]
> https://beam.apache.org/contribute/become-a-committer/#an-apache-beam-committer
>


Re: Date/Time Ranges & Protobuf

2019-11-14 Thread Sam Rohde
My two cents are we just need a proto representation for timestamps and
durations that includes units. The underlying library can then determine
what to do with it. Then further, we can have a standard across Beam SDKs
and Runners of how to interpret the proto. Using a raw int64 for timestamps
and durations is confusing and *very very *bug prone (as we have seen in
the past).

I don't know if this is relevant, but does Apache Beam have any standards
surrounding leap years or seconds? If we were to make our own timestamp
format, would we have to worry about that? Or is the timestamp supplied to
Beam a property of the underlying system giving Beam the timestamp? If it
is, then there may be some interop problems between sources.

On Wed, Nov 13, 2019 at 10:35 AM Luke Cwik  wrote:

> I do agree that Apache Beam can represent dates and times with arbitrary
> precision and can do it many different ways.
>
> My argument has always been should around whether we restrict this range
> to a common standard to increase interoperability across other systems. For
> example, SQL database servers have varying degrees as to what ranges they
> support:
> * Oracle 10[1]: 0001-01-01 to -12-31
> * Oracle 11g[2]: Julian era, ranging from January 1, 4712 BCE through
> December 31,  CE (Common Era, or 'AD'). Unless BCE ('BC' in the format
> mask)
> * MySQL[3]: '1000-01-01 00:00:00' to '-12-31 23:59:59'
> * Microsoft SQL:  January 1, 1753, through December 31,  for
> datetime[4] and January 1,1 CE through December 31,  CE for datetime2[5]
>
> The common case of the global window containing timestamps that are before
> and after all of these supported ranges above means that our users can't
> represent a global window within a database using its common data types.
>
> 1: https://docs.oracle.com/javadb/10.8.3.0/ref/rrefdttlimits.html
> 2:
> https://docs.oracle.com/cd/B28359_01/server.111/b28318/datatype.htm#CNCPT413
> 3: https://dev.mysql.com/doc/refman/8.0/en/datetime.html
> 4:
> https://docs.microsoft.com/en-us/sql/t-sql/data-types/datetime-transact-sql?view=sql-server-ver15
> 5:
> https://docs.microsoft.com/en-us/sql/t-sql/data-types/datetime2-transact-sql?view=sql-server-ver15
>
> On Wed, Nov 13, 2019 at 3:28 AM Jan Lukavský  wrote:
>
>> Hi,
>>
>> just an idea on these related topics that appear these days - it might
>> help to realize, that what we actually don't need a full arithmetic on
>> timestamps (Beam model IMHO doesn't need to know exactly what is the exact
>> difference of two events). What we actually need is a slightly simplified
>> algebra. Given two timestamps T1 and T2 and a "duration" (a different type
>> from timestamp), we need operations (not 100% sure that this is exhaustive,
>> but seems to be):
>>
>>  - is_preceding(T1, T2): bool
>>
>>- important !is_preceding(T1, T2) does NOT imply that is_preceding(T2,
>> T1) - !is_preceding(T1, T2) && !is_preceding(T2, T1) would mean events are
>> _concurrent_
>>
>>- this relation has to be also antisymmetric
>>
>>- given this function we can construct a comparator, where multiple
>> distinct timestamps can be "equal" (or with no particular ordering, which
>> is natural property of time)
>>
>>  - min_timestamp_following(T1, duration): T2
>>
>>- that would return a timestamp for which is_preceding(T1 + duration,
>> T2) would return true and no other timestamp X would exist for which
>> is_preceding(T1 + duration, X) && is_preceding(X, T2) would be true
>>
>>- actually, this function would serve as the definition for the
>> duration object
>>
>> If we can supply this algebra, it seems that we can use any
>> representation of timestamps and intervals. It might be (probably) even
>> possible to let user specify his own type used as timestamps and durations,
>> which could solve the issues of not currently being able to correctly
>> represent timestamps lower than Long.MIN_VALUE (although we can get data
>> for that low timestamps - cosmic microwave background being one example
>> :)). Specifying this algebra actually probably boils down to proposal (3)
>> in Robert's thread [1].
>>
>> Just my 2 cents.
>>
>> Jan
>>
>> [1]
>> https://lists.apache.org/thread.html/1672898393cb0d54a77a879be0fb5725902289a3e5063d0f9ec36fe1@%3Cdev.beam.apache.org%3E
>> On 11/13/19 10:11 AM, jincheng sun wrote:
>>
>> Thanks for bringing up this discussion @Luke.
>>
>> As @Kenn mentioned, in Beam we have defined the constants value for the
>> min/max/end of global window. I noticed that
>> google.protobuf.Timestamp/Duration is only used in window definitions,
>> such as FixedWindowsPayload, SlidingWindowsPayload, SessionsPayload, etc.
>>
>> I think that both RFC 3339 and Beam's current implementation are big
>> enough to express a common window definitions. But users can really
>> define a window size that outside the scope of the RFC 3339. Conceptually,
>> we should not limit the time range for window(although I think the range of
>> RPC 3339 is big enough in m

[ANNOUNCE] New committer: Brian Hulette

2019-11-14 Thread Kenneth Knowles
Hi all,

Please join me and the rest of the Beam PMC in welcoming a new committer:
Brian Hulette

Brian introduced himself to dev@ earlier this year and has been
contributing since then. His contributions to Beam include explorations of
integration with Arrow, standardizing coders, portability for schemas, and
presentations at Beam events.

In consideration of Brian's contributions, the Beam PMC trusts him with the
responsibilities of a Beam committer [1].

Thank you, Brian, for your contributions and looking forward to many more!

Kenn, on behalf of the Apache Beam PMC

[1]
https://beam.apache.org/contribute/become-a-committer/#an-apache-beam-committer


Re: Python Precommit duration pushing 2 hours

2019-11-14 Thread Ahmet Altay
On Thu, Nov 14, 2019 at 2:55 PM Mikhail Gryzykhin  wrote:

> Hi Everyone,
>
> Python precommit phrase timeouts for (roughly) 80% of the jobs in 2 hours.
> This also blocks release branch validation. I suggest to bump the timeout
> to 3 hours while we are working on a proper solution. This way many people
> can get unblocked.
>
> I believe the change can be rather small:
> https://github.com/apache/beam/pull/10121
>

+1 to unblock the current state.


>
> --Mikhail
>
>
>
> On Mon, Nov 11, 2019 at 5:24 PM Ning Kang  wrote:
>
>> I'm removing the additional interactive test env + suite and add
>> [interactive] dependencies as extra dependencies in tests_require:
>> https://github.com/apache/beam/pull/10068
>>
>> On Mon, Nov 11, 2019 at 2:15 PM Robert Bradshaw 
>> wrote:
>>
>>> On Fri, Nov 8, 2019 at 5:45 PM Ahmet Altay  wrote:
>>> >
>>> > I looked at the log but I could not figure what is causing the timeout
>>> because the gradle scan links are missing. I sampled a few of the
>>> successful jobs, It seems like python 3.7 and python 2 are running 3 tests
>>> in serial {interactive, py37cython, py37gcp} and {docs, py27cython,
>>> py27gcp} respectively. These two versions are pushing the total time
>>> because other variants are now only running {cython, gcp} versions.
>>> >
>>> > I suggest breaking up docs, and interactive into 2 separate suites of
>>> their own. docs is actually faster than interactive,just separating that
>>> out to a new suite might help.
>>> >
>>> > Interactive was recently added (
>>> https://github.com/apache/beam/pull/9741). +Ning Kang could you
>>> separate interactive to new suite?
>>>
>>> I would ask why interactive is a separate tox configuration at all; I
>>> don't think there's a need to run every test again with a couple of
>>> extra dependencies (adding ~30 minutes to every presumbit). I think it
>>> would be much more valuable to run the (presumably relatively small)
>>> set of interactive tests in all modes.
>>>
>>> (The other suites are to guerentee the tests specifically run
>>> *without* installing gcp and *without* compiling with Cython.)
>>>
>>> > On Fri, Nov 8, 2019 at 11:09 AM Robert Bradshaw 
>>> wrote:
>>> >>
>>> >> Just saw another 2-hour timeout:
>>> >> https://builds.apache.org/job/beam_PreCommit_Python_Commit/9440/ , so
>>> >> perhaps we're not out of the woods yet (though in general things have
>>> >> been a lot better).
>>> >>
>>> >> On Tue, Nov 5, 2019 at 10:52 AM Ahmet Altay  wrote:
>>> >> >
>>> >> > GCP tests are already on separate locations. IO related tests are
>>> under /sdks/python/apache_beam/io/gcp and Dataflow related tests are under
>>> sdks/python/apache_beam/runners/dataflow. It should be a matter of changing
>>> gradle files to run either one of the base tests or GCP tests depending on
>>> the types of changes. I do not expect this to have any material impact on
>>> the precommit times because these two test suites take about exactly the
>>> same time to complete.
>>> >> >
>>> >> > #9985 is merged now. Precommit times on master branch dropped to
>>> ~1h 20 for the last 5 runs.
>>> >> >
>>> >> > On Tue, Nov 5, 2019 at 10:12 AM David Cavazos 
>>> wrote:
>>> >> >>
>>> >> >> +1 to moving the GCP tests outside of core. If there are issues
>>> that only show up on GCP tests but not in core, it might be an indication
>>> that there needs to be another test in core covering that, but I think that
>>> should be pretty rare.
>>> >> >>
>>> >> >> On Mon, Nov 4, 2019 at 8:33 PM Kenneth Knowles 
>>> wrote:
>>> >> >>>
>>> >> >>> +1 to moving forward with this
>>> >> >>>
>>> >> >>> Could we move GCP tests outside the core? Then only code changes
>>> touches/affecting GCP would cause them to run in precommit. Could still run
>>> them in postcommit in their own suite. If the core has reasonably stable
>>> abstractions that the connectors are built on, this should not change
>>> coverage much.
>>> >> >>>
>>> >> >>> Kenn
>>> >> >>>
>>> >> >>> On Mon, Nov 4, 2019 at 1:55 PM Ahmet Altay 
>>> wrote:
>>> >> 
>>> >>  PR for the proposed change:
>>> https://github.com/apache/beam/pull/9985
>>> >> 
>>> >>  On Mon, Nov 4, 2019 at 1:35 PM Udi Meiri 
>>> wrote:
>>> >> >
>>> >> > +1
>>> >> >
>>> >> > On Mon, Nov 4, 2019 at 12:09 PM Robert Bradshaw <
>>> rober...@google.com> wrote:
>>> >> >>
>>> >> >> +1, this seems like a good step with a clear win.
>>> >> >>
>>> >> >> On Mon, Nov 4, 2019 at 12:06 PM Ahmet Altay 
>>> wrote:
>>> >> >> >
>>> >> >> > Python precommits are still timing out on #9925. I am
>>> guessing that means this change would not be enough.
>>> >> >> >
>>> >> >> > I am proposing cutting down the number of test variants we
>>> run in precommits. Currently for each version we ran the following variants
>>> serially:
>>> >> >> > - base: Runs all unit tests with tox
>>> >> >> > - Cython: Installs cython and runs all unit tests as base
>>> version. The original purpose was to ensure that tests pas

Re: Python Precommit duration pushing 2 hours

2019-11-14 Thread Mikhail Gryzykhin
Hi Everyone,

Python precommit phrase timeouts for (roughly) 80% of the jobs in 2 hours.
This also blocks release branch validation. I suggest to bump the timeout
to 3 hours while we are working on a proper solution. This way many people
can get unblocked.

I believe the change can be rather small:
https://github.com/apache/beam/pull/10121

--Mikhail



On Mon, Nov 11, 2019 at 5:24 PM Ning Kang  wrote:

> I'm removing the additional interactive test env + suite and add
> [interactive] dependencies as extra dependencies in tests_require:
> https://github.com/apache/beam/pull/10068
>
> On Mon, Nov 11, 2019 at 2:15 PM Robert Bradshaw 
> wrote:
>
>> On Fri, Nov 8, 2019 at 5:45 PM Ahmet Altay  wrote:
>> >
>> > I looked at the log but I could not figure what is causing the timeout
>> because the gradle scan links are missing. I sampled a few of the
>> successful jobs, It seems like python 3.7 and python 2 are running 3 tests
>> in serial {interactive, py37cython, py37gcp} and {docs, py27cython,
>> py27gcp} respectively. These two versions are pushing the total time
>> because other variants are now only running {cython, gcp} versions.
>> >
>> > I suggest breaking up docs, and interactive into 2 separate suites of
>> their own. docs is actually faster than interactive,just separating that
>> out to a new suite might help.
>> >
>> > Interactive was recently added (
>> https://github.com/apache/beam/pull/9741). +Ning Kang could you separate
>> interactive to new suite?
>>
>> I would ask why interactive is a separate tox configuration at all; I
>> don't think there's a need to run every test again with a couple of
>> extra dependencies (adding ~30 minutes to every presumbit). I think it
>> would be much more valuable to run the (presumably relatively small)
>> set of interactive tests in all modes.
>>
>> (The other suites are to guerentee the tests specifically run
>> *without* installing gcp and *without* compiling with Cython.)
>>
>> > On Fri, Nov 8, 2019 at 11:09 AM Robert Bradshaw 
>> wrote:
>> >>
>> >> Just saw another 2-hour timeout:
>> >> https://builds.apache.org/job/beam_PreCommit_Python_Commit/9440/ , so
>> >> perhaps we're not out of the woods yet (though in general things have
>> >> been a lot better).
>> >>
>> >> On Tue, Nov 5, 2019 at 10:52 AM Ahmet Altay  wrote:
>> >> >
>> >> > GCP tests are already on separate locations. IO related tests are
>> under /sdks/python/apache_beam/io/gcp and Dataflow related tests are under
>> sdks/python/apache_beam/runners/dataflow. It should be a matter of changing
>> gradle files to run either one of the base tests or GCP tests depending on
>> the types of changes. I do not expect this to have any material impact on
>> the precommit times because these two test suites take about exactly the
>> same time to complete.
>> >> >
>> >> > #9985 is merged now. Precommit times on master branch dropped to ~1h
>> 20 for the last 5 runs.
>> >> >
>> >> > On Tue, Nov 5, 2019 at 10:12 AM David Cavazos 
>> wrote:
>> >> >>
>> >> >> +1 to moving the GCP tests outside of core. If there are issues
>> that only show up on GCP tests but not in core, it might be an indication
>> that there needs to be another test in core covering that, but I think that
>> should be pretty rare.
>> >> >>
>> >> >> On Mon, Nov 4, 2019 at 8:33 PM Kenneth Knowles 
>> wrote:
>> >> >>>
>> >> >>> +1 to moving forward with this
>> >> >>>
>> >> >>> Could we move GCP tests outside the core? Then only code changes
>> touches/affecting GCP would cause them to run in precommit. Could still run
>> them in postcommit in their own suite. If the core has reasonably stable
>> abstractions that the connectors are built on, this should not change
>> coverage much.
>> >> >>>
>> >> >>> Kenn
>> >> >>>
>> >> >>> On Mon, Nov 4, 2019 at 1:55 PM Ahmet Altay 
>> wrote:
>> >> 
>> >>  PR for the proposed change:
>> https://github.com/apache/beam/pull/9985
>> >> 
>> >>  On Mon, Nov 4, 2019 at 1:35 PM Udi Meiri 
>> wrote:
>> >> >
>> >> > +1
>> >> >
>> >> > On Mon, Nov 4, 2019 at 12:09 PM Robert Bradshaw <
>> rober...@google.com> wrote:
>> >> >>
>> >> >> +1, this seems like a good step with a clear win.
>> >> >>
>> >> >> On Mon, Nov 4, 2019 at 12:06 PM Ahmet Altay 
>> wrote:
>> >> >> >
>> >> >> > Python precommits are still timing out on #9925. I am
>> guessing that means this change would not be enough.
>> >> >> >
>> >> >> > I am proposing cutting down the number of test variants we
>> run in precommits. Currently for each version we ran the following variants
>> serially:
>> >> >> > - base: Runs all unit tests with tox
>> >> >> > - Cython: Installs cython and runs all unit tests as base
>> version. The original purpose was to ensure that tests pass with or without
>> cython. There is probably a huge overlap with base. (IIRC only a few coders
>> have different slow vs fast tests.)
>> >> >> > - GCP: Installs GCP dependencies and tests all base +
>> additional gcp sp

Re: RabbitMQ and CheckpointMark feasibility

2019-11-14 Thread Jan Lukavský

On 11/14/19 9:50 PM, Daniel Robert wrote:

Alright, thanks everybody. I'm really appreciative of the conversation 
here. I think I see where my disconnect is and how this might all work 
together for me. There are some bugs in the current rabbit 
implementation that I think have confused my understanding of the 
intended semantics. I'm coming around to seeing how such a system with 
rabbit's restrictions can work properly in Beam (I'd totally forgotten 
about 'dedupe' support in Beam) but I want to clarify some 
implementation questions after pulling everyone's notes together.


RabbitMQ reader should not bother accepting an existing CheckpointMark 
in its constructor (in 'ack-based' systems this is unnecessary per 
Eugene's original reply). It should construct its own CheckpointMark 
at construction time and use it throughout its lifecycle.


At some point later, the CheckpointMark will be 'finalized'. If this 
CheckpointMark has been Serialized (via Coder or otherwise) or its 
underlying connection has been severed, this step will fail. This 
would mean at some point the messages are redelivered to Beam on some 
other Reader, so no data loss. If it has not been serialized, the acks 
will take place just fine, even if much later.


If the system is using processing-time as event-time, however, the 
redelivery of these messages would effectively change the ordering and 
potentially the window they arrived in. I *believe* that Beam deduping 
seems to be managed per-window so if 'finalizeCheckpoint' is attempted 
(and fails) would these messages appear in a new window?


This is very much likely to happen with any source, if it would assign 
something like *now* to event time. That is ill defined and if the 
source cannot provide some retry-persistent estimate of real event-time, 
than I'd suggest to force user to specify an UDF to extract event time 
from the payload. Everything else would probably break (at least if any 
timestamp-related windowing would be used in the pipeline).


Perhaps my question are now:
- how should a CheckpointMark should communicate failure to the Beam

An exception thrown should fail the checkpoint and therefore retry 
everything from the last checkpoint.


- how does Beam handle a CheckpointMark.finalizeCheckpoint failure, if 
the API dictates such a thing?



See above.


- is there a provision that would need to be made for processing-time 
sources that can fail a checkpointmark.finalizeCheckpoint call? (I'm 
nervous redelivered messages would appear in another window)


You are nervous for a reason. :) I strongly believe processing time 
source should be considered anti-pattern, at least in situations where 
there is any time manipulation downstream (time-windows, stateful 
processing, ...).


- What is the relationship lifecycle-wise between a CheckpointMark and 
a Reader? My understanding is a CheckpointMark may outlive a Reader, 
is that correct?


Definitely. But the same instance bound to the lifecycle of the reader 
would be used to finalizeCheckpoint (if that ever happens).


Thanks for bearing with me everyone. It feels a bit unfortunate my 
first foray into beam is reliant on this rabbit connector but I'm 
learning a lot and I'm very grateful for the help. PRs pending once I 
get this all straightened out in my head.


-Danny

On 11/14/19 2:35 PM, Eugene Kirpichov wrote:

Hi Daniel,


On Wed, Nov 13, 2019 at 8:26 PM Daniel Robert > wrote:


I believe I've nailed down a situation that happens in practice
that causes Beam and Rabbit to be incompatible. It seems that
runners can and do make assumptions about the serializability
(via Coder) of a CheckpointMark.

To start, these are the semantics of RabbitMQ:

- the client establishes a connection to the server
- client opens a channel on the connection
- messages are either pulled or pushed to the client from the
server along this channel
- when messages are done processing, they are acknowledged
*client-side* and must be acknowledged on the *same channel* that
originally received the message.

Since a channel (or any open connection) is non-serializable, it
means that a CheckpointMark that has been serialized cannot ever
be used to acknowledge these messages and correctly 'finalize'
the checkpoint. It also, as previously discussed in this thread,
implies a rabbit Reader cannot accept an existing CheckpointMark
at all; the Reader and the CheckpointMark must share the same
connection to the rabbit server ("channel").

This is correct.

Next, I've found how DirectRunner (and presumably others) can
attempt to serialize a CheckpointMark that has not been
finalized. In

https://github.com/apache/beam/blob/master/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java#L150,
the DirectRunner applies a probability and if it hits, it sets
the current reader 

Re: RabbitMQ and CheckpointMark feasibility

2019-11-14 Thread Reuven Lax
Immediately after a source, the window is the Global window, which means
you will get global deduplication.

On Thu, Nov 14, 2019 at 12:50 PM Daniel Robert 
wrote:

> Alright, thanks everybody. I'm really appreciative of the conversation
> here. I think I see where my disconnect is and how this might all work
> together for me. There are some bugs in the current rabbit implementation
> that I think have confused my understanding of the intended semantics. I'm
> coming around to seeing how such a system with rabbit's restrictions can
> work properly in Beam (I'd totally forgotten about 'dedupe' support in
> Beam) but I want to clarify some implementation questions after pulling
> everyone's notes together.
>
> RabbitMQ reader should not bother accepting an existing CheckpointMark in
> its constructor (in 'ack-based' systems this is unnecessary per Eugene's
> original reply). It should construct its own CheckpointMark at construction
> time and use it throughout its lifecycle.
>
> At some point later, the CheckpointMark will be 'finalized'. If this
> CheckpointMark has been Serialized (via Coder or otherwise) or its
> underlying connection has been severed, this step will fail. This would
> mean at some point the messages are redelivered to Beam on some other
> Reader, so no data loss. If it has not been serialized, the acks will take
> place just fine, even if much later.
>
> If the system is using processing-time as event-time, however, the
> redelivery of these messages would effectively change the ordering and
> potentially the window they arrived in. I *believe* that Beam deduping
> seems to be managed per-window so if 'finalizeCheckpoint' is attempted (and
> fails) would these messages appear in a new window?
>
> Perhaps my question are now:
> - how should a CheckpointMark should communicate failure to the Beam
> - how does Beam handle a CheckpointMark.finalizeCheckpoint failure, if the
> API dictates such a thing?
> - is there a provision that would need to be made for processing-time
> sources that can fail a checkpointmark.finalizeCheckpoint call? (I'm
> nervous redelivered messages would appear in another window)
> - What is the relationship lifecycle-wise between a CheckpointMark and a
> Reader? My understanding is a CheckpointMark may outlive a Reader, is that
> correct?
>
> Thanks for bearing with me everyone. It feels a bit unfortunate my first
> foray into beam is reliant on this rabbit connector but I'm learning a lot
> and I'm very grateful for the help. PRs pending once I get this all
> straightened out in my head.
>
> -Danny
> On 11/14/19 2:35 PM, Eugene Kirpichov wrote:
>
> Hi Daniel,
>
>
> On Wed, Nov 13, 2019 at 8:26 PM Daniel Robert 
> wrote:
>
>> I believe I've nailed down a situation that happens in practice that
>> causes Beam and Rabbit to be incompatible. It seems that runners can and do
>> make assumptions about the serializability (via Coder) of a CheckpointMark.
>>
>> To start, these are the semantics of RabbitMQ:
>>
>> - the client establishes a connection to the server
>> - client opens a channel on the connection
>> - messages are either pulled or pushed to the client from the server
>> along this channel
>> - when messages are done processing, they are acknowledged *client-side*
>> and must be acknowledged on the *same channel* that originally received the
>> message.
>>
>> Since a channel (or any open connection) is non-serializable, it means
>> that a CheckpointMark that has been serialized cannot ever be used to
>> acknowledge these messages and correctly 'finalize' the checkpoint. It
>> also, as previously discussed in this thread, implies a rabbit Reader
>> cannot accept an existing CheckpointMark at all; the Reader and the
>> CheckpointMark must share the same connection to the rabbit server
>> ("channel").
>>
> This is correct.
>
>
>> Next, I've found how DirectRunner (and presumably others) can attempt to
>> serialize a CheckpointMark that has not been finalized. In
>> https://github.com/apache/beam/blob/master/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java#L150,
>> the DirectRunner applies a probability and if it hits, it sets the current
>> reader to 'null' but retains the existing CheckpointMark, which it then
>> attempts to pass to a new reader via a Coder.
>>
> Correct, this simulates a failure scenario:
> - Runner was reading the source and, after finalizing a bunch of previous
> CheckpointMarks, obtained a new one and serialized it so things can be
> restored in case of failure
> - A failure happened before the current CheckpointMark could be finalized,
> which means Beam was not able to guarantee that elements after the
> last-finalized mark have been durably processed, so we may need to re-read
> them, so runner recreates a reader from the current mark.
>
>
>> This puts the shard, the runner, and the reader with differing views of
>> the world. In UnboundedReadEvaluatorFactory's processElement function, a

Re: RabbitMQ and CheckpointMark feasibility

2019-11-14 Thread Daniel Robert
Alright, thanks everybody. I'm really appreciative of the conversation 
here. I think I see where my disconnect is and how this might all work 
together for me. There are some bugs in the current rabbit 
implementation that I think have confused my understanding of the 
intended semantics. I'm coming around to seeing how such a system with 
rabbit's restrictions can work properly in Beam (I'd totally forgotten 
about 'dedupe' support in Beam) but I want to clarify some 
implementation questions after pulling everyone's notes together.


RabbitMQ reader should not bother accepting an existing CheckpointMark 
in its constructor (in 'ack-based' systems this is unnecessary per 
Eugene's original reply). It should construct its own CheckpointMark at 
construction time and use it throughout its lifecycle.


At some point later, the CheckpointMark will be 'finalized'. If this 
CheckpointMark has been Serialized (via Coder or otherwise) or its 
underlying connection has been severed, this step will fail. This would 
mean at some point the messages are redelivered to Beam on some other 
Reader, so no data loss. If it has not been serialized, the acks will 
take place just fine, even if much later.


If the system is using processing-time as event-time, however, the 
redelivery of these messages would effectively change the ordering and 
potentially the window they arrived in. I *believe* that Beam deduping 
seems to be managed per-window so if 'finalizeCheckpoint' is attempted 
(and fails) would these messages appear in a new window?


Perhaps my question are now:
- how should a CheckpointMark should communicate failure to the Beam
- how does Beam handle a CheckpointMark.finalizeCheckpoint failure, if 
the API dictates such a thing?
- is there a provision that would need to be made for processing-time 
sources that can fail a checkpointmark.finalizeCheckpoint call? (I'm 
nervous redelivered messages would appear in another window)
- What is the relationship lifecycle-wise between a CheckpointMark and a 
Reader? My understanding is a CheckpointMark may outlive a Reader, is 
that correct?


Thanks for bearing with me everyone. It feels a bit unfortunate my first 
foray into beam is reliant on this rabbit connector but I'm learning a 
lot and I'm very grateful for the help. PRs pending once I get this all 
straightened out in my head.


-Danny

On 11/14/19 2:35 PM, Eugene Kirpichov wrote:

Hi Daniel,


On Wed, Nov 13, 2019 at 8:26 PM Daniel Robert > wrote:


I believe I've nailed down a situation that happens in practice
that causes Beam and Rabbit to be incompatible. It seems that
runners can and do make assumptions about the serializability (via
Coder) of a CheckpointMark.

To start, these are the semantics of RabbitMQ:

- the client establishes a connection to the server
- client opens a channel on the connection
- messages are either pulled or pushed to the client from the
server along this channel
- when messages are done processing, they are acknowledged
*client-side* and must be acknowledged on the *same channel* that
originally received the message.

Since a channel (or any open connection) is non-serializable, it
means that a CheckpointMark that has been serialized cannot ever
be used to acknowledge these messages and correctly 'finalize' the
checkpoint. It also, as previously discussed in this thread,
implies a rabbit Reader cannot accept an existing CheckpointMark
at all; the Reader and the CheckpointMark must share the same
connection to the rabbit server ("channel").

This is correct.

Next, I've found how DirectRunner (and presumably others) can
attempt to serialize a CheckpointMark that has not been finalized.
In

https://github.com/apache/beam/blob/master/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java#L150,
the DirectRunner applies a probability and if it hits, it sets the
current reader to 'null' but retains the existing CheckpointMark,
which it then attempts to pass to a new reader via a Coder.

Correct, this simulates a failure scenario:
- Runner was reading the source and, after finalizing a bunch of 
previous CheckpointMarks, obtained a new one and serialized it so 
things can be restored in case of failure
- A failure happened before the current CheckpointMark could be 
finalized, which means Beam was not able to guarantee that elements 
after the last-finalized mark have been durably processed, so we may 
need to re-read them, so runner recreates a reader from the current mark.


This puts the shard, the runner, and the reader with differing
views of the world. In UnboundedReadEvaluatorFactory's
processElement function, a call to getReader(shard) (

https://github.com/apache/beam/blob/master/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java#L132
 

Re: RabbitMQ and CheckpointMark feasibility

2019-11-14 Thread Jan Lukavský
Just as a matter of curiosity, I wonder why it would be needed to assign 
a (local) UUIDs to RabbitMQ streams. There seem to be only two options:


 a) RabbitMQ does not support restore of client connection (this is 
valid, many sources work like that, e.g. plain websocket, or UDP stream)


 b) it does support that (and according to the documentation and 
overall logic it seems it should)


if a) is true, then the source itself is at most once and there is 
actually pretty nothing we can do about that.


If b) is true, then there has to be some sort of identifier of the 
"subscriber", "stream", "session", "consumer", whatever we call it. This 
identifier should be serializable and transferable to different machine. 
Every other option leads to (in my point of view) non-sensical 
conclusions (after all, that's why web browsers have cookies and all of 
us can enjoy the pleasant view of popups asking us to accept this, 
right? :))


If there is a serializable identifier of the "stream", "session", 
"consumer", "subscriber", then the stream could be just recreated in 
call to UnboundedSource#createReader().


One more not yet mentioned point - the logic in DirectRunner associated 
with 'readerReuseChance' (assigning reader a null value under some 
probability distribution) is just an emulation of faults in a real system.


Jan

On 11/14/19 7:21 PM, Reuven Lax wrote:
Just a thought: instead of embedding the RabbitMQ streams inside the 
checkpoint mark, could you keep a global static map of RabbitMQ 
streams keyed by a unique UUID. Then all you have to serialize inside 
the CheckpointMark is the UUID; you can look up the actual stream in 
the constructor of the CheckpointMark and cache it in a volatile 
variable that won't be serialized.


This does mean that if the source shard starts up on a new machine 
(this will happen after a crash or if a runner load balances to 
another machine) then you cannot recover the same RabbitMQ stream. I 
presume (hope!) that RabbitMQ must have some sort ack timeout and will 
redeliver the messages after a while. In this case those messages will 
get "stuck" until RabbitMQ redelivers them, but will eventually show 
up again on the new RabbitMQ stream. (I hope that opening a new stream 
would not redeliver messages that had already been successfully 
acked on the previous stream).


Would this work?

Reuven

On Thu, Nov 14, 2019 at 7:16 AM Daniel Robert > wrote:


We may be talking past each other a bit, though I do appreciate
the responses.

Rabbit behaves a lot like a relational database in terms of state
required. A connection is analogous to a database connection, and
a channel (poor analogy here) is similar to an open transaction.
If the connection is severed, the transaction will not be able to
be committed.

In direct response to the consumer lifecycle linked to, yes, one
can recover and re-establish connections, but any state maintained
within the previous channel are lost. If there were messages that
had not been acknowledged, they would have been re-delivered to
some other consumer as they were never acknowledged.

"Subscription" isn't really the model in rabbit. It has advantages
and disadvantages when compared with kafka -- mostly out of scope
here -- but some quick advantages of the rabbit model: 1) it
parallelizes "infinitely" without any changes to server (no
re-partitioning or the like); 2) messages can be acknowledge in a
separate order than they were consumed; 3) because state is
managed associated with an active connection, at-least-once
delivery semantics are easy to implement as any disconnection will
result in the messages being re-placed in the queue and delivered
to a new consumer. To say it's "incompatible with any fault
tolerant semantics" is unfair, they just aren't incompatible with
Beam's, as Beam is currently implemented.

Regardless, I'm now wondering what the best path forward is.
Rabbit isn't unusable in Beam if the set of requirements and
tradeoffs are well documented. That is, there are use cases that
could be properly supported and some that likely can't.

One option would be to use a pull-based api and immediately
acknowledge each message as they arrive. This would effectively
make the CheckpointMark a no-op, other than maintaining the
watermark. In a pipeline that uses fixed windows (or non-session
windowing) and uses a runner that supports 'Drain'-style semantics
(like Dataflow) this should work just fine I think.

Another would be to do a best-attempt at acknowledging as late as
possible. This would be a hybrid approach where we attempt
acknowledgements in the CheckpointMark, but use a special Coder
that acknowledges all messages at the point the CheckpointMark is
encoded. I think this feels a bit unsafe and overly complex, and
I'm not sure it solves any

Re: RabbitMQ and CheckpointMark feasibility

2019-11-14 Thread Eugene Kirpichov
Hi Daniel,


On Wed, Nov 13, 2019 at 8:26 PM Daniel Robert  wrote:

> I believe I've nailed down a situation that happens in practice that
> causes Beam and Rabbit to be incompatible. It seems that runners can and do
> make assumptions about the serializability (via Coder) of a CheckpointMark.
>
> To start, these are the semantics of RabbitMQ:
>
> - the client establishes a connection to the server
> - client opens a channel on the connection
> - messages are either pulled or pushed to the client from the server along
> this channel
> - when messages are done processing, they are acknowledged *client-side*
> and must be acknowledged on the *same channel* that originally received the
> message.
>
> Since a channel (or any open connection) is non-serializable, it means
> that a CheckpointMark that has been serialized cannot ever be used to
> acknowledge these messages and correctly 'finalize' the checkpoint. It
> also, as previously discussed in this thread, implies a rabbit Reader
> cannot accept an existing CheckpointMark at all; the Reader and the
> CheckpointMark must share the same connection to the rabbit server
> ("channel").
>
This is correct.


> Next, I've found how DirectRunner (and presumably others) can attempt to
> serialize a CheckpointMark that has not been finalized. In
> https://github.com/apache/beam/blob/master/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java#L150,
> the DirectRunner applies a probability and if it hits, it sets the current
> reader to 'null' but retains the existing CheckpointMark, which it then
> attempts to pass to a new reader via a Coder.
>
Correct, this simulates a failure scenario:
- Runner was reading the source and, after finalizing a bunch of previous
CheckpointMarks, obtained a new one and serialized it so things can be
restored in case of failure
- A failure happened before the current CheckpointMark could be finalized,
which means Beam was not able to guarantee that elements after the
last-finalized mark have been durably processed, so we may need to re-read
them, so runner recreates a reader from the current mark.


> This puts the shard, the runner, and the reader with differing views of
> the world. In UnboundedReadEvaluatorFactory's processElement function, a
> call to getReader(shard) (
> https://github.com/apache/beam/blob/master/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java#L132
> ) clones the shard's checkpoint mark and passes that to the new reader. The
> reader ignores it, creating its own, but even if it accepted it, it would
> be accepting a serialized CheckpointMark, which wouldn't work.
>
Correct in the sense that for a RabbitMQ reader, a CheckpointMark doesn't
affect what the reader will read: it depends only on the broker's internal
state (which in turn depends on which messages have been acked by previous
finalized CheckpointMark's).

> Later, the runner calls finishRead (
> https://github.com/apache/beam/blob/master/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java#L246
> ). The shard's CheckpointMark (unserialized; which should still be valid)
> is finalized. The reader's CheckpointMark (which may be a different
> instance) becomes the return value, which is referred to as
> "finishedCheckpoint" in the calling code, which is misleading at best and
> problematic at worst as *this* checkpoint has not been finalized.
>
I'm not following what is the problem here. In that code, "oldMark" is the
last checkpoint mark to be finalized - calling finalizeCheckpoint on it
signals that Beam has durably processed all the messages read from the
reader until that mark. "mark" (the new one) represents the state of the
reader after the last finalized mark, so it should not be finalized.

I.e. AFAIR in a hypothetical runner (which DirectRunner tries to emulate)
things go like this:

Create a reader
Let mark M1 = reader.getCheckpointMark()
Durably persist M1 as the "restore point" of this reader
...read messages A B C from reader and durably process them...
Finalize M1 (acks A B C)
Let mark M2 = reader.getCheckpointMark()
Durably persist M2 as the "restore point" of this reader
...read messages D E F and durably process them...
Finalize M2 (acks D E F)

Now let's imagine a failure.
Durably persist M2 as the "restore point" of this reader
...read messages D E, and then a failure happens
Recreate reader from M2 (reader ignores M2 but it doesn't matter)
Since M2 was not finalized, messages D E F were not acked, and RabbitMQ
will redeliver them to this reader. D E will be processed twice, but only
the results of this new processing will be durable.
Finalize M2 (acks D E F)
Etc.

Basically you can think of this as a series of micro-bundles, where
micro-bundles are delimited by checkpoint marks, and each micro-bundle is a
runner-side transaction which either commits or discards the results of
processing all messag

Re: Why is Pipeline not Serializable and can it be changed to be Serializable

2019-11-14 Thread Luke Cwik
You should create placeholders inside of your Twister2/OpenMPI
implementation that represent these functions and then instantiate actual
instances of them on the workers if you want to write your own pipeline
representation and format for OpenMPI/Twister2.

Or consider converting the pipeline to its proto representation and
building a portable pipeline runner. This way you could run Go and Python
pipelines as well. The best example of this is the current Flink
integration[1]

1:
https://github.com/apache/beam/blob/master/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchPortablePipelineTranslator.java

On Wed, Nov 13, 2019 at 7:44 PM Pulasthi Supun Wickramasinghe <
pulasthi...@gmail.com> wrote:

> Hi Dev's
>
> Currently, the Pipeline class in Beam is not Serializable. This is not a
> problem for the current runners since the pipeline is translated and
> submitted through a centralized Driver like model. However, if the runner
> has a decentralized model similar to OpenMPI (MPI), which is also the case
> with Twister2, which I am developing a runner currently, it would have been
> better if the pipeline itself was Serializable.
>
> Currently, I am trying to transform the Pipeline into a Twister2 graph and
> then send over to the workers, however since there are some functions such
> as "SystemReduceFn" that are not serializable this also is somewhat
> troublesome.
>
> Was the decision to make Pipelines not Serializable made due to some
> specific reason or because all the current use cases did not present any
> valid requirement to make them Serializable?
>
> Best Regards,
> Pulasthi
> --
> Pulasthi S. Wickramasinghe
> PhD Candidate  | Research Assistant
> School of Informatics and Computing | Digital Science Center
> Indiana University, Bloomington
> cell: 224-386-9035 <(224)%20386-9035>
>


Re: Completeness of Beam Java Dependency Check Report

2019-11-14 Thread Kenneth Knowles
On Thu, Nov 14, 2019 at 8:04 AM Alexey Romanenko 
wrote:

> Good example about Guava deps, let me go a bit deeper.
>
> $ find . -name build.gradle | xargs grep library.java.guava
> ./sdks/java/core/build.gradle:  shadowTest library.java.guava_testlib
>
> ./sdks/java/io/kinesis/build.gradle:  testCompile
> library.java.guava_testlib
>
>
> Regarding using non-vendored Guava in KinesisIO (and "java/core" as well),
> it’s all about *“library.java.guava_testlib” *and
> *“com.google.common.testing.EqualsTester”* only in particular, which is
> used for tests.
> Do we need to vendor “*com.google.guava:guava-testlib*” for this in this
> case?
>

I didn't worry about test scopes because they don't ship to users so much.
It could be useful if there is a runner with a conflict and they want to
run the integration tests.


> - KinesisIO does depend on Guava at compile scope but has incorrect
> dependencies (Kinesis libs have Guava on API surface so it is OK here, but
> should be correctly declared)
>
>
> Sorry, but I didn’t understand what do you mean by “*but should be
> correctly declared*”.
> Since Kinesis client libs have own Guava deps and we shade our own guava,
> so it should be fine, no?
>

I mean that any module with an "import X" should have a dependency on X in
its build.gradle. When you leave it off, the dep analysis (such as Maven's)
calls it out as "used undeclared dependency".

Kenn


>
> On 11 Nov 2019, at 22:29, Kenneth Knowles  wrote:
>
> BeamModulePlugin just contains lists of versions to ease coordination
> across Beam modules, but mostly does not create dependencies. Most of
> Beam's modules only depend on a few things there. For example Guava is not
> a core dependency, but here is where it is actually depended upon:
>
> $ find . -name build.gradle | xargs grep library.java.guava
> ./sdks/java/core/build.gradle:  shadowTest library.java.guava_testlib
> ./sdks/java/extensions/sql/jdbc/build.gradle:  compile library.java.guava
> ./sdks/java/io/google-cloud-platform/build.gradle:  compile
> library.java.guava
> ./sdks/java/io/kinesis/build.gradle:  testCompile
> library.java.guava_testlib
>
> These results appear to be misleading. Grepping for 'import
> com.google.common', I see this as the actual state of things:
>
>  - GCP connector does not appear to actually depend on Guava in compile
> scope
>  - The Beam SQL JDBC driver does not appear to actually depend on Guava in
> compile scope
>  - The Dataflow Java worker does depend on Guava at compile scope but has
> incorrect dependencies (and it probably shouldn't)
>  - KinesisIO does depend on Guava at compile scope but has incorrect
> dependencies (Kinesis libs have Guava on API surface so it is OK here, but
> should be correctly declared)
>  - ZetaSQL translator does depend on Guava at compile scope but has
> incorrect dependencies (ZetaSQL has it on API surface so it is OK here, but
> should be correctly declared)
>
> We used to have an analysis that prevented this class of error.
>
> Once the errors are fixed, the guava_version is simply a version that we
> have discovered that seems to work for both Kinesis and ZetaSQL, libraries
> we do not control. Kinesis producer is built against 18.0. Kinesis client
> against 26.0-jre. ZetaSQL against 26.0-android.
>
> (or maybe I messed up in my analysis)
>
> Kenn
>
> On Mon, Nov 11, 2019 at 12:07 PM Tomo Suzuki  wrote:
>
>>
>> Chamikara and Yifan,
>> Thank you for the responses! Looking forward to hearing the investigation
>> result.
>> In the meantime, I'll explore .test-infra/jenkins/dependency_check
>> directory.
>>
>>
>


Re: RabbitMQ and CheckpointMark feasibility

2019-11-14 Thread Reuven Lax
Just a thought: instead of embedding the RabbitMQ streams inside the
checkpoint mark, could you keep a global static map of RabbitMQ streams
keyed by a unique UUID. Then all you have to serialize inside the
CheckpointMark is the UUID; you can look up the actual stream in the
constructor of the CheckpointMark and cache it in a volatile variable that
won't be serialized.

This does mean that if the source shard starts up on a new machine (this
will happen after a crash or if a runner load balances to another machine)
then you cannot recover the same RabbitMQ stream. I presume (hope!) that
RabbitMQ must have some sort ack timeout and will redeliver the messages
after a while. In this case those messages will get "stuck" until RabbitMQ
redelivers them, but will eventually show up again on the new RabbitMQ
stream. (I hope that opening a new stream would not redeliver messages that
had already been successfully acked on the previous stream).

Would this work?

Reuven

On Thu, Nov 14, 2019 at 7:16 AM Daniel Robert  wrote:

> We may be talking past each other a bit, though I do appreciate the
> responses.
>
> Rabbit behaves a lot like a relational database in terms of state
> required. A connection is analogous to a database connection, and a channel
> (poor analogy here) is similar to an open transaction. If the connection is
> severed, the transaction will not be able to be committed.
>
> In direct response to the consumer lifecycle linked to, yes, one can
> recover and re-establish connections, but any state maintained within the
> previous channel are lost. If there were messages that had not been
> acknowledged, they would have been re-delivered to some other consumer as
> they were never acknowledged.
>
> "Subscription" isn't really the model in rabbit. It has advantages and
> disadvantages when compared with kafka -- mostly out of scope here -- but
> some quick advantages of the rabbit model: 1) it parallelizes "infinitely"
> without any changes to server (no re-partitioning or the like); 2) messages
> can be acknowledge in a separate order than they were consumed; 3) because
> state is managed associated with an active connection, at-least-once
> delivery semantics are easy to implement as any disconnection will result
> in the messages being re-placed in the queue and delivered to a new
> consumer. To say it's "incompatible with any fault tolerant semantics" is
> unfair, they just aren't incompatible with Beam's, as Beam is currently
> implemented.
>
> Regardless, I'm now wondering what the best path forward is. Rabbit isn't
> unusable in Beam if the set of requirements and tradeoffs are well
> documented. That is, there are use cases that could be properly supported
> and some that likely can't.
>
> One option would be to use a pull-based api and immediately acknowledge
> each message as they arrive. This would effectively make the CheckpointMark
> a no-op, other than maintaining the watermark. In a pipeline that uses
> fixed windows (or non-session windowing) and uses a runner that supports
> 'Drain'-style semantics (like Dataflow) this should work just fine I think.
>
> Another would be to do a best-attempt at acknowledging as late as
> possible. This would be a hybrid approach where we attempt acknowledgements
> in the CheckpointMark, but use a special Coder that acknowledges all
> messages at the point the CheckpointMark is encoded. I think this feels a
> bit unsafe and overly complex, and I'm not sure it solves any real-world
> problems.
>
> I also feel like perhaps we should include Beam IO documentation that
> makes it clear that an unbounded source that requires a persistent
> connection for state tracking is not supportable by beam.
>
> Thanks,
> -Danny
> On 11/14/19 7:49 AM, Jan Lukavský wrote:
>
> Hi, as I said, I didn't dig too deep into that, but what I saw was [1].
> Generally, if RabbitMQ would have no way to recover subscription (which I
> don't think is the case), then it would not be incompatible with beam, but
> actually with would be incompatible any fault tolerant semantics.
>
> [1] https://www.rabbitmq.com/consumers.html#consumer-lifecycle
>
> Dne 14. 11. 2019 13:06 napsal uživatel Daniel Robert
>  :
>
>
> On 11/14/19 2:32 AM, Jan Lukavský wrote:
>
> Hi Danny,
>
> as Eugene pointed out, there are essentially two "modes of operation" of
> CheckpointMark. It can:
>
>  a) be used to somehow restore state of a reader (in call to
> UnboundedSource#createReader)
>
>  b) confirm processed elements in CheckpointMark#finalizeCheckpoint
>
> If your source doesn't provide a persistent position in data stream that
> can be referred to (and serialized - example of this would be kafka
> offsets), then what you actually need to serialize is not the channel, but
> a way how to restore it - e.g. by opening a new channel with a given
> 'consumer group name'. Then you just use this checkpoint to commit your
> processed data in finalizeCheckpoint.
>
> Note that the finalizeCheckpoint is not guaranteed 

Re: Wiki access

2019-11-14 Thread Thomas Weise
Done, you should be all set.


On Thu, Nov 14, 2019 at 9:57 AM Elliotte Rusty Harold 
wrote:

> Hello,
>
> May I please have access to edit the Wiki? username is elharo
>
> Thanks.
>
> --
> Elliotte Rusty Harold
> elh...@ibiblio.org
>


Wiki access

2019-11-14 Thread Elliotte Rusty Harold
Hello,

May I please have access to edit the Wiki? username is elharo

Thanks.

-- 
Elliotte Rusty Harold
elh...@ibiblio.org


Re: RabbitMQ and CheckpointMark feasibility

2019-11-14 Thread Jan Lukavský

Hi,

answers inline.

On 11/14/19 4:15 PM, Daniel Robert wrote:


We may be talking past each other a bit, though I do appreciate the 
responses.


Rabbit behaves a lot like a relational database in terms of state 
required. A connection is analogous to a database connection, and a 
channel (poor analogy here) is similar to an open transaction. If the 
connection is severed, the transaction will not be able to be committed.


In direct response to the consumer lifecycle linked to, yes, one can 
recover and re-establish connections, but any state maintained within 
the previous channel are lost. If there were messages that had not 
been acknowledged, they would have been re-delivered to some other 
consumer as they were never acknowledged.


Yes, that is exactly what basically all streaming sources that commit 
one message at a time (e.g. google pubsub, mqtt, ...) do. That is no 
problem for Beam, because you have to take into account two things:


 a) if a checkpoint is taken, it is taken in a way that ensures 
exactly-once processing in downstream operators (that is actually runner 
dependent, but all major runners behave like that)


 b) some sources might redeliver messages even in between of 
checkpoints (for instance due to timeout of message confirm) - such 
sources have in common that they use commit schemes of one message at a 
time (like rabbit, mqtt, or pubsub). This manifests by the need to 
override the default implementation of 
CheckpointMark#finalizeCheckpoint, which in javadoc [1] states that: 
"Returns whether this source requires explicit deduping. This is needed 
if the underlying data source can return the same record multiple times, 
such a queuing system with a pull-ack model. Sources where the records 
read are uniquely identified by the persisted state in the 
CheckpointMark do not need this."


That is probably exactly what is your case.

[1] 
https://beam.apache.org/releases/javadoc/2.16.0/org/apache/beam/sdk/io/UnboundedSource.html#requiresDeduping--


"Subscription" isn't really the model in rabbit. It has advantages and 
disadvantages when compared with kafka -- mostly out of scope here -- 
but some quick advantages of the rabbit model: 1) it parallelizes 
"infinitely" without any changes to server (no re-partitioning or the 
like); 2) messages can be acknowledge in a separate order than they 
were consumed; 3) because state is managed associated with an active 
connection, at-least-once delivery semantics are easy to implement as 
any disconnection will result in the messages being re-placed in the 
queue and delivered to a new consumer. To say it's "incompatible with 
any fault tolerant semantics" is unfair, they just aren't incompatible 
with Beam's, as Beam is currently implemented.


What I mean by that is "if rabbit would not be able to recover 
'subscription' in at least at-least-once fashion, then it is at best 
at-most-once and thus not fault tolerant. I was pretty sure that it is 
not the case.


Regardless, I'm now wondering what the best path forward is. Rabbit 
isn't unusable in Beam if the set of requirements and tradeoffs are 
well documented. That is, there are use cases that could be properly 
supported and some that likely can't.


I would really say that rabbit can be fully supported by Beam. Maybe the 
best analogy would be PubSubIO.


One option would be to use a pull-based api and immediately 
acknowledge each message as they arrive. This would effectively make 
the CheckpointMark a no-op, other than maintaining the watermark. In a 
pipeline that uses fixed windows (or non-session windowing) and uses a 
runner that supports 'Drain'-style semantics (like Dataflow) this 
should work just fine I think.


That would make the source not fault tolerant, because messages could 
not be redelivered.


Another would be to do a best-attempt at acknowledging as late as 
possible. This would be a hybrid approach where we attempt 
acknowledgements in the CheckpointMark, but use a special Coder that 
acknowledges all messages at the point the CheckpointMark is encoded. 
I think this feels a bit unsafe and overly complex, and I'm not sure 
it solves any real-world problems.


I also feel like perhaps we should include Beam IO documentation that 
makes it clear that an unbounded source that requires a persistent 
connection for state tracking is not supportable by beam.


Thanks,
-Danny

On 11/14/19 7:49 AM, Jan Lukavský wrote:

Hi, as I said, I didn't dig too deep into that, but what I saw was [1].
Generally, if RabbitMQ would have no way to recover subscription 
(which I don't think is the case), then it would not be incompatible 
with beam, but actually with would be incompatible any fault tolerant 
semantics.


[1] https://www.rabbitmq.com/consumers.html#consumer-lifecycle

Dne 14. 11. 2019 13:06 napsal uživatel Daniel Robert 
:



On 11/14/19 2:32 AM, Jan Lukavský wrote:

Hi Danny,

as Eugene pointed out, there are essentially two "modes of
  

Re: Completeness of Beam Java Dependency Check Report

2019-11-14 Thread Alexey Romanenko
Good example about Guava deps, let me go a bit deeper.

> $ find . -name build.gradle | xargs grep library.java.guava
> ./sdks/java/core/build.gradle:  shadowTest library.java.guava_testlib
> ./sdks/java/io/kinesis/build.gradle:  testCompile library.java.guava_testlib


Regarding using non-vendored Guava in KinesisIO (and "java/core" as well), it’s 
all about “library.java.guava_testlib” and 
“com.google.common.testing.EqualsTester” only in particular, which is used for 
tests.
Do we need to vendor “com.google.guava:guava-testlib” for this in this case?

> - KinesisIO does depend on Guava at compile scope but has incorrect 
> dependencies (Kinesis libs have Guava on API surface so it is OK here, but 
> should be correctly declared)


Sorry, but I didn’t understand what do you mean by “but should be correctly 
declared”.
Since Kinesis client libs have own Guava deps and we shade our own guava, so it 
should be fine, no? 

> On 11 Nov 2019, at 22:29, Kenneth Knowles  wrote:
> 
> BeamModulePlugin just contains lists of versions to ease coordination across 
> Beam modules, but mostly does not create dependencies. Most of Beam's modules 
> only depend on a few things there. For example Guava is not a core 
> dependency, but here is where it is actually depended upon:
> 
> $ find . -name build.gradle | xargs grep library.java.guava
> ./sdks/java/core/build.gradle:  shadowTest library.java.guava_testlib
> ./sdks/java/extensions/sql/jdbc/build.gradle:  compile library.java.guava
> ./sdks/java/io/google-cloud-platform/build.gradle:  compile library.java.guava
> ./sdks/java/io/kinesis/build.gradle:  testCompile library.java.guava_testlib
> 
> These results appear to be misleading. Grepping for 'import 
> com.google.common', I see this as the actual state of things:
> 
>  - GCP connector does not appear to actually depend on Guava in compile scope
>  - The Beam SQL JDBC driver does not appear to actually depend on Guava in 
> compile scope
>  - The Dataflow Java worker does depend on Guava at compile scope but has 
> incorrect dependencies (and it probably shouldn't)
>  - KinesisIO does depend on Guava at compile scope but has incorrect 
> dependencies (Kinesis libs have Guava on API surface so it is OK here, but 
> should be correctly declared)
>  - ZetaSQL translator does depend on Guava at compile scope but has incorrect 
> dependencies (ZetaSQL has it on API surface so it is OK here, but should be 
> correctly declared)
> 
> We used to have an analysis that prevented this class of error.
> 
> Once the errors are fixed, the guava_version is simply a version that we have 
> discovered that seems to work for both Kinesis and ZetaSQL, libraries we do 
> not control. Kinesis producer is built against 18.0. Kinesis client against 
> 26.0-jre. ZetaSQL against 26.0-android.
> 
> (or maybe I messed up in my analysis)
> 
> Kenn
> 
> On Mon, Nov 11, 2019 at 12:07 PM Tomo Suzuki  > wrote:
> 
> Chamikara and Yifan,
> Thank you for the responses! Looking forward to hearing the investigation 
> result.
> In the meantime, I'll explore .test-infra/jenkins/dependency_check directory.
> 



Re: [discuss] Using a logger hierarchy in Python

2019-11-14 Thread Thomas Weise
Awesome, thanks Chad!

On Wed, Nov 13, 2019 at 10:26 PM Chad Dombrova  wrote:

> Hi Thomas,
>
>
>> Will this include the ability for users to configure logging via pipeline
>> options?
>>
>
> We're working on a proposal to allow pluggable logging handlers that can
> be configured via pipeline options.  For example, it would allow you to add
> a new logging handler for StackDriver or Elasticsearch.  Will hopefully
> have a document to share soon.
>
> -chad
>
>


Re: RabbitMQ and CheckpointMark feasibility

2019-11-14 Thread Daniel Robert
We may be talking past each other a bit, though I do appreciate the 
responses.


Rabbit behaves a lot like a relational database in terms of state 
required. A connection is analogous to a database connection, and a 
channel (poor analogy here) is similar to an open transaction. If the 
connection is severed, the transaction will not be able to be committed.


In direct response to the consumer lifecycle linked to, yes, one can 
recover and re-establish connections, but any state maintained within 
the previous channel are lost. If there were messages that had not been 
acknowledged, they would have been re-delivered to some other consumer 
as they were never acknowledged.


"Subscription" isn't really the model in rabbit. It has advantages and 
disadvantages when compared with kafka -- mostly out of scope here -- 
but some quick advantages of the rabbit model: 1) it parallelizes 
"infinitely" without any changes to server (no re-partitioning or the 
like); 2) messages can be acknowledge in a separate order than they were 
consumed; 3) because state is managed associated with an active 
connection, at-least-once delivery semantics are easy to implement as 
any disconnection will result in the messages being re-placed in the 
queue and delivered to a new consumer. To say it's "incompatible with 
any fault tolerant semantics" is unfair, they just aren't incompatible 
with Beam's, as Beam is currently implemented.


Regardless, I'm now wondering what the best path forward is. Rabbit 
isn't unusable in Beam if the set of requirements and tradeoffs are well 
documented. That is, there are use cases that could be properly 
supported and some that likely can't.


One option would be to use a pull-based api and immediately acknowledge 
each message as they arrive. This would effectively make the 
CheckpointMark a no-op, other than maintaining the watermark. In a 
pipeline that uses fixed windows (or non-session windowing) and uses a 
runner that supports 'Drain'-style semantics (like Dataflow) this should 
work just fine I think.


Another would be to do a best-attempt at acknowledging as late as 
possible. This would be a hybrid approach where we attempt 
acknowledgements in the CheckpointMark, but use a special Coder that 
acknowledges all messages at the point the CheckpointMark is encoded. I 
think this feels a bit unsafe and overly complex, and I'm not sure it 
solves any real-world problems.


I also feel like perhaps we should include Beam IO documentation that 
makes it clear that an unbounded source that requires a persistent 
connection for state tracking is not supportable by beam.


Thanks,
-Danny

On 11/14/19 7:49 AM, Jan Lukavský wrote:

Hi, as I said, I didn't dig too deep into that, but what I saw was [1].
Generally, if RabbitMQ would have no way to recover subscription 
(which I don't think is the case), then it would not be incompatible 
with beam, but actually with would be incompatible any fault tolerant 
semantics.


[1] https://www.rabbitmq.com/consumers.html#consumer-lifecycle

Dne 14. 11. 2019 13:06 napsal uživatel Daniel Robert 
:



On 11/14/19 2:32 AM, Jan Lukavský wrote:

Hi Danny,

as Eugene pointed out, there are essentially two "modes of
operation" of CheckpointMark. It can:

 a) be used to somehow restore state of a reader (in call to
UnboundedSource#createReader)

 b) confirm processed elements in
CheckpointMark#finalizeCheckpoint

If your source doesn't provide a persistent position in data
stream that can be referred to (and serialized - example of
this would be kafka offsets), then what you actually need to
serialize is not the channel, but a way how to restore it -
e.g. by opening a new channel with a given 'consumer group
name'. Then you just use this checkpoint to commit your
processed data in finalizeCheckpoint.

Note that the finalizeCheckpoint is not guaranteed to be
called - that can happen in cases when an error occurs and the
source has to be rewind back - that is what direct runner
emulates with the probability of 'readerReuseChance'.

I'm reading the documentation of RabbitMQ very quickly, but if
I understand it correctly, then you have to create a
subscription to the broker, serialize identifier of the
subscription into the checkpointmark and then just recover the
subscription in call to UnboundedSource#createReader. That
should do the trick.

I have not seen any such documentation in rabbit. My understanding
is it has to be the same, physical connection and channel. Can you
cite the source you were looking at?

-Danny

Hope this helps, sorry if I'm not using 100% correct RabbitMQ
terminology as I said, I'm not quite familiar with it.

Best,

 Jan

On 11/14/19 5:26 AM, Daniel Robert wrote:

I believe I've n

Re: RabbitMQ and CheckpointMark feasibility

2019-11-14 Thread Jan Lukavský
Hi, as I said, I didn't dig too deep into that, but what I saw was [1].Generally, if RabbitMQ would have no way to recover subscription (which I don't think is the case), then it would not be incompatible with beam, but actually with would be incompatible any fault tolerant semantics.[1] https://www.rabbitmq.com/consumers.html#consumer-lifecycleDne 14. 11. 2019 13:06 napsal uživatel Daniel Robert :


On 11/14/19 2:32 AM, Jan Lukavský
  wrote:


  
  Hi Danny,
  as Eugene pointed out, there are essentially two "modes of
operation" of CheckpointMark. It can:
   a) be used to somehow restore state of a reader (in call to
UnboundedSource#createReader)
  
   b) confirm processed elements in
CheckpointMark#finalizeCheckpoint
  If your source doesn't provide a persistent position in data
stream that can be referred to (and serialized - example of this
would be kafka offsets), then what you actually need to
serialize is not the channel, but a way how to restore it - e.g.
by opening a new channel with a given 'consumer group name'.
Then you just use this checkpoint to commit your processed data
in finalizeCheckpoint.
  Note that the finalizeCheckpoint is not guaranteed to be called
- that can happen in cases when an error occurs and the source
has to be rewind back - that is what direct runner emulates with
the probability of 'readerReuseChance'.
  I'm reading the documentation of RabbitMQ very quickly, but if
I understand it correctly, then you have to create a
subscription to the broker, serialize identifier of the
subscription into the checkpointmark and then just recover the
subscription in call to UnboundedSource#createReader. That
should do the trick.

I have not seen any such documentation in rabbit. My
  understanding is it has to be the same, physical connection and
  channel. Can you cite the source you were looking at?
-Danny


  Hope this helps, sorry if I'm not using 100% correct RabbitMQ
terminology as I said, I'm not quite familiar with it.
  Best,
   Jan
  
  On 11/14/19 5:26 AM, Daniel Robert
wrote:
  
  

I believe I've nailed down a situation that happens in
  practice that causes Beam and Rabbit to be incompatible. It
  seems that runners can and do make assumptions about the
  serializability (via Coder) of a CheckpointMark.
To start, these are the semantics of RabbitMQ:
- the client establishes a connection to the server
  - client opens a channel on the connection
  - messages are either pulled or pushed to the client from the
  server along this channel
  - when messages are done processing, they are acknowledged
  *client-side* and must be acknowledged on the *same channel*
  that originally received the message.
Since a channel (or any open connection) is non-serializable,
  it means that a CheckpointMark that has been serialized cannot
  ever be used to acknowledge these messages and correctly
  'finalize' the checkpoint. It also, as previously discussed in
  this thread, implies a rabbit Reader cannot accept an existing
  CheckpointMark at all; the Reader and the CheckpointMark must
  share the same connection to the rabbit server ("channel").

Next, I've found how DirectRunner (and presumably others) can
  attempt to serialize a CheckpointMark that has not been
  finalized. In https://github.com/apache/beam/blob/master/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java#L150,
  the DirectRunner applies a probability and if it hits, it sets
  the current reader to 'null' but retains the existing
  CheckpointMark, which it then attempts to pass to a new reader
  via a Coder. 

This puts the shard, the runner, and the reader with
  differing views of the world. In
  UnboundedReadEvaluatorFactory's processElement function, a
  call to getReader(shard) ( https://github.com/apache/beam/blob/master/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java#L132
  ) clones the shard's checkpoint mark and passes that to the
  new reader. The reader ignores it, creating its own, but even
  if it accepted it, it would be accepting a serialized
  CheckpointMark, which wouldn't work. Later, the runner calls
  finishRead ( https://github.com/apache/beam/blob/master/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java#L246

Re: RabbitMQ and CheckpointMark feasibility

2019-11-14 Thread Daniel Robert


On 11/14/19 2:32 AM, Jan Lukavský wrote:


Hi Danny,

as Eugene pointed out, there are essentially two "modes of operation" 
of CheckpointMark. It can:


 a) be used to somehow restore state of a reader (in call to 
UnboundedSource#createReader)


 b) confirm processed elements in CheckpointMark#finalizeCheckpoint

If your source doesn't provide a persistent position in data stream 
that can be referred to (and serialized - example of this would be 
kafka offsets), then what you actually need to serialize is not the 
channel, but a way how to restore it - e.g. by opening a new channel 
with a given 'consumer group name'. Then you just use this checkpoint 
to commit your processed data in finalizeCheckpoint.


Note that the finalizeCheckpoint is not guaranteed to be called - that 
can happen in cases when an error occurs and the source has to be 
rewind back - that is what direct runner emulates with the probability 
of 'readerReuseChance'.


I'm reading the documentation of RabbitMQ very quickly, but if I 
understand it correctly, then you have to create a subscription to the 
broker, serialize identifier of the subscription into the 
checkpointmark and then just recover the subscription in call to 
UnboundedSource#createReader. That should do the trick.


I have not seen any such documentation in rabbit. My understanding is it 
has to be the same, physical connection and channel. Can you cite the 
source you were looking at?


-Danny

Hope this helps, sorry if I'm not using 100% correct RabbitMQ 
terminology as I said, I'm not quite familiar with it.


Best,

 Jan

On 11/14/19 5:26 AM, Daniel Robert wrote:


I believe I've nailed down a situation that happens in practice that 
causes Beam and Rabbit to be incompatible. It seems that runners can 
and do make assumptions about the serializability (via Coder) of a 
CheckpointMark.


To start, these are the semantics of RabbitMQ:

- the client establishes a connection to the server
- client opens a channel on the connection
- messages are either pulled or pushed to the client from the server 
along this channel
- when messages are done processing, they are acknowledged 
*client-side* and must be acknowledged on the *same channel* that 
originally received the message.


Since a channel (or any open connection) is non-serializable, it 
means that a CheckpointMark that has been serialized cannot ever be 
used to acknowledge these messages and correctly 'finalize' the 
checkpoint. It also, as previously discussed in this thread, implies 
a rabbit Reader cannot accept an existing CheckpointMark at all; the 
Reader and the CheckpointMark must share the same connection to the 
rabbit server ("channel").


Next, I've found how DirectRunner (and presumably others) can attempt 
to serialize a CheckpointMark that has not been finalized. In 
https://github.com/apache/beam/blob/master/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java#L150, 
the DirectRunner applies a probability and if it hits, it sets the 
current reader to 'null' but retains the existing CheckpointMark, 
which it then attempts to pass to a new reader via a Coder.


This puts the shard, the runner, and the reader with differing views 
of the world. In UnboundedReadEvaluatorFactory's processElement 
function, a call to getReader(shard) ( 
https://github.com/apache/beam/blob/master/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java#L132 
) clones the shard's checkpoint mark and passes that to the new 
reader. The reader ignores it, creating its own, but even if it 
accepted it, it would be accepting a serialized CheckpointMark, which 
wouldn't work. Later, the runner calls finishRead ( 
https://github.com/apache/beam/blob/master/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java#L246 
). The shard's CheckpointMark (unserialized; which should still be 
valid) is finalized. The reader's CheckpointMark (which may be a 
different instance) becomes the return value, which is referred to as 
"finishedCheckpoint" in the calling code, which is misleading at best 
and problematic at worst as *this* checkpoint has not been finalized.


So, tl;dr: I cannot find any means of maintaining a persistent 
connection to the server for finalizing checkpoints that is safe 
across runners. If there's a guarantee all of the shards are on the 
same JVM instance, I could rely on global, static 
collections/instances as a workaround, but if other runners might 
serialize this across the wire, I'm stumped. The only workable 
situation I can think of right now is to proactively acknowledge 
messages as they are received and effectively no-op in 
finalizeCheckpoint. This is very different, semantically, and can 
lead to dropped messages if a pipeline doesn't finish processing the 
given message.


Any help would be much appreciated.

Thanks,
-Danny

On 11/7/19 10:27 PM, Eugene Kirpicho

Re: Triggers still finish and drop all data

2019-11-14 Thread Kenneth Knowles
On Fri, Nov 8, 2019 at 9:44 AM Steve Niemitz  wrote:

> Yeah that looks like what I had in mind too.  I think the most useful
> notification output would be a KV of (K, summary)?
>

Sounds about right. Some use cases may not care about the summary, but just
the notification. But for most runners passing extra in-memory data to a
subsequent projection which drops it is essentially free.

Kenn


> On Fri, Nov 8, 2019 at 12:38 PM Kenneth Knowles  wrote:
>
>> This sounds like a useful feature, if I understand it: a generic
>> transform (build on a generic stateful DoFn) where the end-user provides a
>> monotonic predicate over the input it has seen. It emits a notification
>> exactly once when the predicate is first satisfied. To be efficient, it
>> will also need some form of summarization over the input seen.
>>
>> Notify
>>   .withSummarizer(combineFn)
>>   .withPredicate(summary -> ...)
>>
>> Something like that? The complexity is not much less than just writing a
>> stateful DoFn directly, but the boilerplate is much less.
>>
>> Kenn
>>
>> On Thu, Nov 7, 2019 at 2:02 PM Steve Niemitz  wrote:
>>
>>> Interestingly enough, we just had a use case come up that I think could
>>> have been solved by finishing triggers.
>>>
>>> Basically, we want to emit a notification when a certain threshold is
>>> reached (in this case, we saw at least N elements for a given key), and
>>> then never notify again within that window.  As mentioned, we can
>>> accomplish this using a stateful DoFn as mentioned above, but I thought it
>>> was interesting that this just came up, and wanted to share.
>>>
>>> Maybe it'd be worth building something to simulate this into the SDK?
>>>
>>> On Mon, Nov 4, 2019 at 8:15 PM Kenneth Knowles  wrote:
>>>
 By the way, adding this guard uncovered two bugs in Beam's Java
 codebase, luckily only benchmarks and tests. There were *no* non-buggy
 instances of a finishing trigger. They both declare allowed lateness that
 is never used.

 Nexmark query 10:

 // Clear fancy triggering from above.
 .apply(
 Window.>into(...)
 .triggering(AfterWatermark.pastEndOfWindow())
 // We expect no late data here, but we'll assume the
 worst so we can detect any.
 .withAllowedLateness(Duration.standardDays(1))
 .discardingFiredPanes())

 This is nonsensical: the trigger will fire once and close, never firing
 again. So the allowed lateness has no effect except to change counters from
 "dropped due to lateness" to "dropped due to trigger closing". The intent
 would appear to be to restore the default triggering, but it failed.

 PipelineTranslationTest:


  Window.into(FixedWindows.of(Duration.standardMinutes(7)))
 .triggering(
 AfterWatermark.pastEndOfWindow()

 .withEarlyFirings(AfterPane.elementCountAtLeast(19)))
 .accumulatingFiredPanes()
 .withAllowedLateness(Duration.standardMinutes(3L)));

 Again, the allowed lateness has no effect. This test is just to test
 portable proto round-trip. But still it is odd to write a nonsensical
 pipeline for this.

 Takeaway: experienced Beam developers never use this pattern, but they
 still get it wrong and create pipelines that would have data loss bugs
 because of it.

 Since there is no other discussion here, I will trust the community is
 OK with this change and follow Jan's review of my implementation of his
 idea.

 Kenn


 On Thu, Oct 31, 2019 at 4:06 PM Kenneth Knowles 
 wrote:

> Opened https://github.com/apache/beam/pull/9960 for this idea. This
> will alert users to broken pipelines and force them to alter them.
>
> Kenn
>
> On Thu, Oct 31, 2019 at 2:12 PM Kenneth Knowles 
> wrote:
>
>> On Thu, Oct 31, 2019 at 2:11 AM Jan Lukavský  wrote:
>>
>>> Hi Kenn,
>>>
>>> does there still remain some use for trigger to finish? If we don't
>>> drop
>>> data, would it still be of any use to users? If not, would it be
>>> better
>>> to just remove the functionality completely, so that users who use
>>> it
>>> (and it will possibly break for them) are aware of it at compile
>>> time?
>>>
>>> Jan
>>>
>>
>> Good point. I believe there is no good use for a top-level trigger
>> finishing. As mentioned, the intended uses aren't really met by triggers,
>> but are met by stateful DoFn.
>>
>> Eugene's bug even has this title :-). We could not change any
>> behavior but just reject pipelines with broken top-level triggers. This 
>> is
>> probably a better solution. Because if a user has a broken trigger, the 
>> new
>> behavior is probably not enough to magically fix their pipeline. They 

Re: [CANCELLED] [VOTE] @RequiresTimeSortedInput stateful DoFn annotation

2019-11-14 Thread Kenneth Knowles
Hi Jan,

I want to acknowledge your careful consideration of the community here.

I myself have simply not had the time to dedicate to considering this
proposal. So, like Max, I would have a bit of an "outside" perspective so
would hesitate to cast any sort of vote.

I think you have chosen a good course of action - continue to grow
awareness and understanding of the problem / awareness and understanding of
the solution.

Kenn

On Tue, Nov 12, 2019 at 12:45 AM Jan Lukavský  wrote:

> I'm cancelling this due to lack of activity. I will issue a follow-up
> thread to find solution.
>
> On 11/9/19 11:45 AM, Jan Lukavský wrote:
> > Hi,
> >
> > I'll try to summarize the mailing list threads to clarify why I think
> > this addition is needed (and actually necessary):
> >
> >  a) there are situations where the order of input events matter
> > (obviously any finite state machine)
> >
> >  b) in streaming case, this can be handled by the current machinery
> > (e.g. holding elements in state, sorting all elements with timestamp
> > less than input watermark, dropping latecomers)
> >
> >  c) in batch case, this can be handled the same way, but
> >
> >   i) due to the nature of batch processing, that has extreme
> > requirements on the size of state needed to hold the elements
> > (actually, in extreme, that might be the whole input, which might not
> > be feasible)
> >
> >   ii) although it is true, that watermark might (and will) fall behind
> > in streaming processing as well so that similar issues might arise
> > there too, it is hardly imaginable that it will fall behind as much as
> > several years (but it is absolutely natural in batch case) - I'm
> > talking about regular streaming processing, not some kappa like
> > architectures, where this happens as well, but is causes troubles ([1])
> >
> >   iii) given the fact, that some runners already use sort-merge
> > groupings, it is actually virtually for free to also sort elements
> > inside groups by timestamps, the runner just has to know, that it
> > should do so
> >
> > I don't want to go too far into details to keep this focused, but the
> > fact that runner would know that it should sort by timestamp before
> > stateful pardo brings additional features that are currently
> > unavailable - e.g. actually shift event time smoothly, as elements
> > flow through, not from -inf to +inf in one shot. That might have
> > positive effect on timers being fired smoothly and thus for instance
> > being able to free some state that would have to be held until the end
> > of computation otherwise.
> >
> > Therefore, I think it is essential for users to be able to tell runner
> > that a particular stateful pardo depends on order of input events, so
> > that the runner can use optimizations available in batch case. The
> > streaming case is mostly unaffected by that, because all the sorting
> > can be handled the usual way.
> >
> > Hope this helps to clarify why it would be good to introduce (some
> > way) to mark stateful pardos as "time sorted".
> >
> > Cheers,
> >
> >  Jan
> >
> > [1]
> >
> https://www.ververica.com/resources/flink-forward-san-francisco-2019/moving-from-lambda-and-kappa-architectures-to-kappa-at-uber
> >
> > Hope these thoughts help
> >
> > On 11/8/19 11:35 AM, Jan Lukavský wrote:
> >> Hi Max,
> >>
> >> thanks for comment. I probably should have put links to discussion
> >> threads here in the vote thread. Relevant would be
> >>
> >>  - (a pretty lengthy) discussion about whether sorting by timestamp
> >> should be part of the model - [1]
> >>
> >>  - part of the discussion related to the annotation - [2]
> >>
> >> Regarding the open question in the design document - these are not
> >> meant to be open questions in regard to the design of the annotation
> >> and I'll remove that for now, as it is not (directly) related.
> >>
> >> Now - main reason for this vote is that there is actually not a clear
> >> consensus in the ML thread. There are plenty of words like "should",
> >> "could", "would" and "maybe", so I wanted to be sure there is
> >> consensus to include this. I already run this in production for
> >> several months, so it is definitely useful for me. :-) But that might
> >> not be sufficient.
> >>
> >> I'd be very happy to answer any more questions.
> >>
> >> Thanks,
> >>
> >>  Jan
> >>
> >> [1]
> >>
> https://lists.apache.org/thread.html/4609a1bb1662690d67950e76d2f1108b51327b8feaf9580de659552e@%3Cdev.beam.apache.org%3E
> >>
> >> [2]
> >>
> https://lists.apache.org/thread.html/dd9bec903102d9fcb4f390dc01513c0921eac1fedd8bcfdac630aaee@%3Cdev.beam.apache.org%3E
> >>
> >> On 11/8/19 11:08 AM, Maximilian Michels wrote:
> >>> Hi Jan,
> >>>
> >>> Disclaimer: I haven't followed the discussion closely, so I do not
> >>> want to comment on the technical details of the feature here.
> >>>
> >>> From the outside, it looks like there may be open questions. Also,
> >>> we may need more motivation for what we can build with this feature
> >>> or how it will bec

Re: Cleaning up Approximate Algorithms in Beam

2019-11-14 Thread Kenneth Knowles
Wow. Nice summary, yes. Major calls to action:

0. Never allow a combiner that does not include the format of its state
clear in its name/URN. The "update compatibility" problem makes their
internal accumulator state essentially part of their public API. Combiners
named for what they do are an inherent risk, since we might have a new way
to do the same operation with different implementation-detail state. And
they will match search terms better, which is a major problem.

1. Point users to HllCount. This seems to be the best of the three. Does it
have a name that is clear enough about the format of its state? Noting that
its Java package name includes zetasketch, perhaps.

2. Deprecate the others, at least. And remove them from e.g. Javadoc.

Kenn

On Wed, Nov 13, 2019 at 10:01 AM Reuven Lax  wrote:

>
>
> On Wed, Nov 13, 2019 at 9:58 AM Ahmet Altay  wrote:
>
>> Thank you for writing this summary.
>>
>> On Tue, Nov 12, 2019 at 6:35 PM Reza Rokni  wrote:
>>
>>> Hi everyone;
>>>
>>> TL/DR : Discussion on Beam's various Approximate Distinct Count
>>> algorithms.
>>>
>>> Today there are several options for Approximate Algorithms in Apache
>>> Beam 2.16 with HLLCount being the most recently added. Would like to canvas
>>> opinions here on the possibility of rationalizing these API's by removing
>>> obsolete / less efficient implementations.
>>> The current situation:
>>>
>>> There are three options available to users: ApproximateUnique.java
>>> ,
>>> ApproximateDistinct.java
>>> 
>>> and HllCount.java
>>> .
>>> A quick summary of these API's as I understand them:
>>>
>>> HllCount.java
>>> :
>>> Marked as @Experimental
>>>
>>> PTransforms to compute HyperLogLogPlusPlus (HLL++) sketches on data
>>> streams based on the ZetaSketch 
>>> implementation.Detailed design of this class, see
>>> https://s.apache.org/hll-in-beam.
>>>
>>> ApproximateUnique.java
>>> :
>>> Not Marked with experimental
>>>
>>> This API does not expose the ability to create sketches so it's not
>>> suitable for the OLAP use case that HLL++ is geared towards (do
>>> pre-aggregation into sketches to allow interactive query speeds). It's also
>>> less precise for the same amount of memory used: the error bounds in the
>>> doc comments give :
>>>
>>> /* The error is about
>>>
>>> {@code 2 * / sqrt(sampleSize)},) */
>>>
>>> Compared to the default HLLCount sketch size, its error is 10X larger
>>> than the HLL++ error.
>>>
>>
>> FWIW, There is a python implementation only for this version:
>> https://github.com/apache/beam/blob/master/sdks/python/apache_beam/transforms/stats.py#L38
>>
>>
>>
>>> ApproximateDistinct.java
>>> 
>>> Marked with @Experimental
>>>
>>> This is a re-implementation of the HLL++ algorithm, based on the paper
>>> published in 2013. It is exposing sketches via a HyperLogLogPlusCoder. We
>>> have not run any benchmarks to compare this implementation compared to the
>>> HLLCount and we need to be careful to ensure that if we were to change any
>>> of these API's that the binary format of the sketches should never change,
>>> there could be users who have stored previous sketches using
>>> ApproximateDistinct and it will be important to try and ensure they do not
>>> have a bad experience.
>>>
>>>
>>> Proposals:
>>>
>>> There are two classes of users expected for these algorithms:
>>>
>>> 1) Users who simply use the transform to estimate the size of their data
>>> set in Beam
>>>
>>> 2) Users who want to create sketches and store them, either for
>>> interoperability with other systems, or as features to be used in further
>>> data processing.
>>>
>>>
>>>
>>> For use case 1, it is possible to make use of naming which does not
>>> expose the implementation, however for use case 2 it is important for the
>>> implementation to be explicit as sketches produced with one implementation
>>> will not work with other implementations.
>>>
>>> ApproximateUnique.java
>>>