Re: Streaming data from Pubsub to Spanner with Beam dataflow pipeline

2019-10-30 Thread Kenneth Knowles
Moving to u...@beam.apache.org, the best mailing list for questions like
this.

Yes, this kind of workload is a core use case for Beam. If you have a
problem, please write to this user list with details.

Kenn

On Wed, Oct 30, 2019 at 4:07 AM Taher Koitawala  wrote:

> Hi All,
>   My current use-case is to write data from Pubsub to Spanner
> using a streaming pipeline. I do see that Beam does have a SpannerIO to
> write.
>
>   However, pubsub being streaming and Spanner being RDBMS like, it
> would be helpful to you guys can tell me if this will be performant enough
> or not. If someone has already tried this out and can give me a few
> caveats, then that would be really awesome.
>
>
> Regards,
> Taher Koitawala
>


Re: [spark structured streaming runner] merge to master?

2019-10-30 Thread Kenneth Knowles
Very good points. We definitely ship a lot of code/features in very early
stages, and there seems to be no problem.

I intend mostly to leave this judgment to people like you who know better
about Spark users.

But I do think 1 or 2 jars is better than 3. I really don't like "3 jars"
and I did give two reasons:

1. diamond deps where things overlap
2. figuring out which thing to depend on

Both are annoying for users. I am not certain if it could lead to a real
unsolvable situation. This is just a Java ecosystem problem so I feel
qualified to comment.

I did also ask if there were major dependency differences between the two
that could cause problems for users. This question was dropped and no one
cares to comment so I assume it is not an issue. So then I favor having
just 1 jar with both runners.

Kenn

On Wed, Oct 30, 2019 at 2:46 PM Ismaël Mejía  wrote:

> I am still a bit lost about why we are discussing options without giving
> any
> arguments or reasons for the options? Why is 2 modules better than 3 or 3
> better
> than 2, or even better, what forces us to have something different than a
> single
> module?
>
> What are the reasons for wanting to have separate jars? If the issue is
> that the
> code is unfinished or not passing the tests, the impact for end users is
> minimal
> because they cannot accidentally end up running the new runner, and if they
> decide to do so we can warn them it is at their own risk and not ready for
> production in the documentation + runner.
>
> If the fear is that new code may end up being intertwined with the classic
> and
> portable runners and have some side effects. We have the ValidatesRunner +
> Nexmark in the CI to cover this so again I do not see what is the problem
> that
> requires modules to be separate.
>
> If the issue is being uncomfortable about having in-progress code in
> released
> artifacts we have been doing this in Beam forever, for example most of the
> work
> on portability and Schema/SQL, and all of those were still part of
> artifacts
> long time before they were ready for prime use, so I still don't see why
> this
> case is different to require different artifacts.
>
> I have the impression we are trying to solve a non-issue by adding a lot of
> artificial complexity (in particular to the users), or am I missing
> something
> else?
>
> On Wed, Oct 30, 2019 at 7:40 PM Kenneth Knowles  wrote:
> >
> > Oh, I mean that we ship just 2 jars.
> >
> > And since Spark users always build an uber jar, they can still depend on
> both of ours and be able to switch runners with a flag.
> >
> > I really dislike projects shipping overlapping jars. It is confusing and
> causes major diamond dependency problems.
> >
> > Kenn
> >
> > On Wed, Oct 30, 2019 at 11:12 AM Alexey Romanenko <
> aromanenko@gmail.com> wrote:
> >>
> >> Yes, agree, two jars included in uber jar will work in the similar way.
> Though having 3 jars looks still quite confusing for me.
> >>
> >> On 29 Oct 2019, at 23:54, Kenneth Knowles  wrote:
> >>
> >> Is it just as easy to have two jars and build an uber jar with both
> included? Then the runner can still be toggled with a flag.
> >>
> >> Kenn
> >>
> >> On Tue, Oct 29, 2019 at 9:38 AM Alexey Romanenko <
> aromanenko@gmail.com> wrote:
> >>>
> >>> Hmm, I don’t think that jar size should play a big role comparing to
> the whole size of shaded jar of users job. Even more, I think it will be
> quite confusing for users to choose which jar to use if we will have 3
> different ones for similar purposes. Though, let’s see what others think.
> >>>
> >>> On 29 Oct 2019, at 15:32, Etienne Chauchot 
> wrote:
> >>>
> >>> Hi Alexey,
> >>>
> >>> Thanks for your opinion !
> >>>
> >>> Comments inline
> >>>
> >>> Etienne
> >>>
> >>> On 28/10/2019 17:34, Alexey Romanenko wrote:
> >>>
> >>> Let me share some of my thoughts on this.
> >>>
> >>> - shall we filter out the package name from the release?
> >>>
> >>> Until new runner is not ready to be used in production (or, at least,
> be used for beta testing but users should be clearly warned about that in
> this case), I believe we need to filter out its classes from published jar
> to avoid a confusion.
> >>>
> >>> Yes that is what I think also
> >>>
> >>> - should we release 2 jars: one for the old and one for the new ?
> >>>
> >>> - should we release 3 jars: one for the new, one for the new and
> one for both ?
> >>>
> >>> Once new runner will be released, then I think we need to provide only
> one single jar and allow user to switch between different Spark runners
> with CLI option.
> >>>
> >>> I would vote for 3 jars: one for new, one for old, and one for both.
> Indeed, in some cases, users are looking very closely at the size of jars.
> This solution meets all use cases
> >>>
> >>> - should we create a special entry to the capability matrix ?
> >>>
> >>> Sure, since it has its own uniq characteristics and implementation,
> but again, only once new runner will be "officially 

Re: Rethinking the Flink Runner modes

2019-10-30 Thread Robert Bradshaw
On Wed, Oct 30, 2019 at 3:34 PM Maximilian Michels  wrote:
>
> > One thing I don't understand is what it means for "CLI or REST API
> > context [to be] present." Where does this context come from? A config
> > file in a standard location on the user's machine? Or is this
> > something that is only present when a user uploads a jar and then
> > Flink runs it in a specific context? Or both?
>
> When you upload a Jar to Flink, it can be run by the Flink master.
> Running a Jar on the job manager will just invoke the main method. The
> same happens when you use the Flink CLI tool, the only difference being
> that the Jar runs on the user machine vs on the Flink master. In both
> cases, the Flink config file will be present in a standard location,
> i.e. "/conf/flink_conf.yaml".
>
> What Flink users typically do, is to call this method to construct a
> Flink dataflow (a.k.a. job):
>env = ExecutionEnvironment.getExecutionEnvironment()
>env.addSource().flatMap()
>env.execute()
> This registers an environment which holds the Flink dataflow definition.
>
> The Flink Runner also does this when flink_master is set to "[auto]".
> When it is set to "[local]", it will attempt to execute the whole
> pipeline locally (LocalEnvironment). When an address has been specified
> it will submit against a remote Flink cluster (RemoteEnvironment). The
> last two use cases do not make any sense for the user when they use the
> Python FlinkRunner which uses the jar upload feature of Flink. That's
> why "[auto]" is our choice for the FlinkUberJarJobServer feature of the
> Python FlinkRunner.
>
> In the case of the FlinkJarJobServer to use "[auto]" can also make sense
> because you can even specify a Flink config directory for the Flink job
> server to use. Without a config, auto will always fall back to local
> execution.

Thanks for clarifying. So when I run "./flink my_pipeline.jar"  or
upload the jar via the REST API (and its main method invoked on the
master) then [auto] reads the config and does the right thing, but if
I do java my_pipeline.jar it'll run locally.

> > One more question: https://issues.apache.org/jira/browse/BEAM-8396
> > still seems valuable, but with [auto] as the default, how should we
> > detect whether LOOPBACK is safe to enable from Python?
>
> Yes, it is valuable. I suspect we only want to enable it for local
> execution?

Yes.

> We could let the actual Runner handle this by falling back to
> the default environment in case it detects that the execution will not
> be local. It can simply tell Python then to shutdown the loopback
> server, or it shuts itself down after a timeout.

Python needs to know even whether to start up the loopback server, and
provide the address when submitting the pipeline. If I understood
correctly above, the only time that the job server interprets [auto]
as something other than [local] is when creating the jar for later
submission. (In this case the flink master isn't even used, other than
being baked into the jar, right? And baking anything in but [auto]
seems wrong...) So it seems we could guard using LOOPBACK it on this
flag + [local] or [auto].

> Another option would
> be, to only support it when the mode is set to "[local]".

Well, I'd really like to support it by default...

> On 30.10.19 21:05, Robert Bradshaw wrote:
> > One more question: https://issues.apache.org/jira/browse/BEAM-8396
> > still seems valuable, but with [auto] as the default, how should we
> > detect whether LOOPBACK is safe to enable from Python?
> >
> > On Wed, Oct 30, 2019 at 11:53 AM Robert Bradshaw  
> > wrote:
> >>
> >> Sounds good to me.
> >>
> >> One thing I don't understand is what it means for "CLI or REST API
> >> context [to be] present." Where does this context come from? A config
> >> file in a standard location on the user's machine? Or is this
> >> something that is only present when a user uploads a jar and then
> >> Flink runs it in a specific context? Or both?
> >>
> >> On Tue, Oct 29, 2019 at 3:27 AM Maximilian Michels  wrote:
> >>>
> >>> tl;dr:
> >>>
> >>> - I see consensus for inferring "http://; in Python to align it with the
> >>> Java behavior which currently requires leaving out the protocol scheme.
> >>> Optionally, Java could also accept a scheme which gets removed as
> >>> required by the Flink Java Rest client.
> >>>
> >>> - We won't support "https://; in Python for now, because it requires
> >>> additional SSL setup, i.e. parsing the Flink config file and setting up
> >>> the truststore
> >>>
> >>> - We want to keep "[auto]"/"[local]" but fix the current broken behavior
> >>> via https://issues.apache.org/jira/browse/BEAM-8507
> >>>
> >>>
> >>> Additional comments below:
> >>>
>  One concern with this is that just supplying host:port is the existing
>  behavior, so we can't start requiring the http://
> >>>
> >>> I wouldn't require it but optionally support it, otherwise add it
> >>> automatically.
> >>>
>  One question I have is if there are 

Re: aggregating over triggered results

2019-10-30 Thread Robert Bradshaw
On Tue, Oct 29, 2019 at 7:01 PM Aaron Dixon  wrote:
>
> Thank you, Luke and Robert. Sorry for hitting dev@, I criss-crossed and meant 
> to hit user@, but as we're here could you clarify your two points, however--

No problem. This is veering into dev@ territory anyway :).

> 1) I am under the impression that the 4,000 sliding windows approach (30 days 
> every 10m) will re-evaluate my combine aggregation every 10m whereas with the 
> two-window approach my Combine aggregation would evolve iteratively, only 
> merging new results into the aggregation.
>
> If there's a cross-window optimization occurring that would allow iterative 
> combining _across windows_, given the substantial order of magnitude 
> difference in scale at play, is it safe to consider such 'internal 
> optimization detail' part of the platform contract (Dataflow's, say)? 
> Otherwise it would be hard to lean on this from a production system that will 
> live into the future.

OK, let's first define exactly what (I think) you're trying to
compute. Let's label windows by their (upper) endpoint. So, every 10
minutes you have a window W_t and an aggregate Aggr(e for e in
all_events if t - 60days <= timestamp(e) < t).

The way this is computed in Beam is by storing a map W_t ->
RunningAggregate and whenever we see an element with timestamp T we
assign it to the set of windows S = {W_t : T in W_t} (in this case
there would be 30*24*6 = 4320 of them) and subsequently update all the
running aggregates. When we are sure we've seen all elements up to t
(the watermark) we release window W_t with its computed aggregate
downstream.

An alternative that's often proposed, and works only for aligned
sliding windows, is to instead store a map of 10-minute buckets to
running aggregates, and whenever an element comes in we add its value
to the aggregate of that bucket. This side is cheaper, but every time
the watermark tells us we're able to release a window we then have to
compute an Aggregate over all 4000 of these buckets.

A further extension, if the aggregation function is reversible, is to
keep a running total, and every time we release a window, we add the
next bucket's contribution, and remove the previous buckets
contribution, to this running total. If the computation is not
reversible, we can compute a "binary tree" of aggregates (e.g. 10-min
buckets, 20-min buckets, 40-min buckets, ...) and perform log(N)
aggregations each time and element comes in and log(N) every time a
window is released.

Each of these is more specialized to the exact shape of sliding
windows, and though they may allow us to save some compute, still
require the storage of 4320 (or even 4320 log(N)) bits of state. Often
the compute is virtually free compared to the cost of reading/writing
state, which means there are less (or even no) advantages for the
specialized methods (depending on how sparse the data is), especially
if one does blind writes and only has to absorb latency on reads.

As mentioned, Beam does the first, and though we've looked at the
others there has not been a compelling case made that they would
actually help much if at all in practice.


As for what would be computed with 60/30-day sliding windows with 10
minute triggers, the output would be (approximately, due to mixing
processing and event time, though at this granularity they are likely
to line up mostly), W_t_i where t jumps in 30 day increments and i
jumps in 10 minute increments and the aggregate assigned to W_t_i is
Aggr(e for e in all_events where t-60days < timestamp(e) < i) which is
not quite the same thing. E.g. let's say we have a single event
(aggregation count) every 10 minute interval. The sliding windows
output would give 4320 evens for every 30-day window, published every
10 minutes. The 60/30-day would re-publish each of the windows [0,
60day), [30day, 90day), [60day, 120day), ... with the values 1, 2, 3,
4, ..., 4320. In other words, at time t=45day, the window  [0, 60day)
would trigger with a value of the 6480 values that had fallen in this
window so far, and the [30day, 90day) window would trigger with the
2160 seen so far, neither of which are accurate representations of
"from now until 30 days ago."

I suppose one could avoid doing any pre-aggregation, and emit all of
the events (with reified timestamp) in 60/30-day windows, then have a
DoFn that filters on the events and computes each of the 10-minute
aggregates over the "true" sliding window (4320 outputs). This could
be cheaper if your events are very sparse, will be more expensive if
they're very dense, and it's unclear what the tradeoff will be.

I would try the straightforward approach and see if that works,
because it just well might be good enough that the additional
complexity isn't worth it (if even a measurable improvement). You can
play with various windowing/triggering strategies at
https://window-explorer.appspot.com/ (though unfortunately processing
time triggers are not represented).

> 2) When you say "regardless 

Re: Rethinking the Flink Runner modes

2019-10-30 Thread Maximilian Michels

One thing I don't understand is what it means for "CLI or REST API
context [to be] present." Where does this context come from? A config
file in a standard location on the user's machine? Or is this
something that is only present when a user uploads a jar and then
Flink runs it in a specific context? Or both?


When you upload a Jar to Flink, it can be run by the Flink master. 
Running a Jar on the job manager will just invoke the main method. The 
same happens when you use the Flink CLI tool, the only difference being 
that the Jar runs on the user machine vs on the Flink master. In both 
cases, the Flink config file will be present in a standard location, 
i.e. "/conf/flink_conf.yaml".


What Flink users typically do, is to call this method to construct a 
Flink dataflow (a.k.a. job):

  env = ExecutionEnvironment.getExecutionEnvironment()
  env.addSource().flatMap()
  env.execute()
This registers an environment which holds the Flink dataflow definition.

The Flink Runner also does this when flink_master is set to "[auto]". 
When it is set to "[local]", it will attempt to execute the whole 
pipeline locally (LocalEnvironment). When an address has been specified 
it will submit against a remote Flink cluster (RemoteEnvironment). The 
last two use cases do not make any sense for the user when they use the 
Python FlinkRunner which uses the jar upload feature of Flink. That's 
why "[auto]" is our choice for the FlinkUberJarJobServer feature of the 
Python FlinkRunner.


In the case of the FlinkJarJobServer to use "[auto]" can also make sense 
because you can even specify a Flink config directory for the Flink job 
server to use. Without a config, auto will always fall back to local 
execution.



One more question: https://issues.apache.org/jira/browse/BEAM-8396
still seems valuable, but with [auto] as the default, how should we
detect whether LOOPBACK is safe to enable from Python?


Yes, it is valuable. I suspect we only want to enable it for local 
execution? We could let the actual Runner handle this by falling back to 
the default environment in case it detects that the execution will not 
be local. It can simply tell Python then to shutdown the loopback 
server, or it shuts itself down after a timeout. Another option would 
be, to only support it when the mode is set to "[local]".


-Max

On 30.10.19 21:05, Robert Bradshaw wrote:

One more question: https://issues.apache.org/jira/browse/BEAM-8396
still seems valuable, but with [auto] as the default, how should we
detect whether LOOPBACK is safe to enable from Python?

On Wed, Oct 30, 2019 at 11:53 AM Robert Bradshaw  wrote:


Sounds good to me.

One thing I don't understand is what it means for "CLI or REST API
context [to be] present." Where does this context come from? A config
file in a standard location on the user's machine? Or is this
something that is only present when a user uploads a jar and then
Flink runs it in a specific context? Or both?

On Tue, Oct 29, 2019 at 3:27 AM Maximilian Michels  wrote:


tl;dr:

- I see consensus for inferring "http://; in Python to align it with the
Java behavior which currently requires leaving out the protocol scheme.
Optionally, Java could also accept a scheme which gets removed as
required by the Flink Java Rest client.

- We won't support "https://; in Python for now, because it requires
additional SSL setup, i.e. parsing the Flink config file and setting up
the truststore

- We want to keep "[auto]"/"[local]" but fix the current broken behavior
via https://issues.apache.org/jira/browse/BEAM-8507


Additional comments below:


One concern with this is that just supplying host:port is the existing
behavior, so we can't start requiring the http://


I wouldn't require it but optionally support it, otherwise add it
automatically.


One question I have is if there are other
authentication parameters that may be required for speaking to a flink
endpoint that we should be aware of (that would normally be buried in
the config file).


Yes, essentially all the SSL configuration is in the config file,
including the location of the truststore, the password, certificates, etc.

For now, I would say we cannot properly support SSL in Python, unless we
find a way to load the truststore from Python.


I do like being explicit with something like [local] rather than
treating the empty string in a magical way.


Fine, we can keep "[local]" and throw an error in case the address is
empty. Let's also throw an error in case the Flink CLI tool is used with
local execution because that is clearly not what the user wants.


I'd like to see this issue resolved before 2.17 as changing the public API once 
it's released will be harder.


+1. In particular, I misunderstood that [auto] is not supported by 
`FlinkUberJarJobServer`. Since [auto] is now the default, it's broken for 
Python 3.6+.


+1 Let's fix this in time for the release.


The user shouldn't have to specify a protocol for Python, I think it's 
preferable and 

Re: [spark structured streaming runner] merge to master?

2019-10-30 Thread Ismaël Mejía
I am still a bit lost about why we are discussing options without giving any
arguments or reasons for the options? Why is 2 modules better than 3 or 3 better
than 2, or even better, what forces us to have something different than a single
module?

What are the reasons for wanting to have separate jars? If the issue is that the
code is unfinished or not passing the tests, the impact for end users is minimal
because they cannot accidentally end up running the new runner, and if they
decide to do so we can warn them it is at their own risk and not ready for
production in the documentation + runner.

If the fear is that new code may end up being intertwined with the classic and
portable runners and have some side effects. We have the ValidatesRunner +
Nexmark in the CI to cover this so again I do not see what is the problem that
requires modules to be separate.

If the issue is being uncomfortable about having in-progress code in released
artifacts we have been doing this in Beam forever, for example most of the work
on portability and Schema/SQL, and all of those were still part of artifacts
long time before they were ready for prime use, so I still don't see why this
case is different to require different artifacts.

I have the impression we are trying to solve a non-issue by adding a lot of
artificial complexity (in particular to the users), or am I missing something
else?

On Wed, Oct 30, 2019 at 7:40 PM Kenneth Knowles  wrote:
>
> Oh, I mean that we ship just 2 jars.
>
> And since Spark users always build an uber jar, they can still depend on both 
> of ours and be able to switch runners with a flag.
>
> I really dislike projects shipping overlapping jars. It is confusing and 
> causes major diamond dependency problems.
>
> Kenn
>
> On Wed, Oct 30, 2019 at 11:12 AM Alexey Romanenko  
> wrote:
>>
>> Yes, agree, two jars included in uber jar will work in the similar way. 
>> Though having 3 jars looks still quite confusing for me.
>>
>> On 29 Oct 2019, at 23:54, Kenneth Knowles  wrote:
>>
>> Is it just as easy to have two jars and build an uber jar with both 
>> included? Then the runner can still be toggled with a flag.
>>
>> Kenn
>>
>> On Tue, Oct 29, 2019 at 9:38 AM Alexey Romanenko  
>> wrote:
>>>
>>> Hmm, I don’t think that jar size should play a big role comparing to the 
>>> whole size of shaded jar of users job. Even more, I think it will be quite 
>>> confusing for users to choose which jar to use if we will have 3 different 
>>> ones for similar purposes. Though, let’s see what others think.
>>>
>>> On 29 Oct 2019, at 15:32, Etienne Chauchot  wrote:
>>>
>>> Hi Alexey,
>>>
>>> Thanks for your opinion !
>>>
>>> Comments inline
>>>
>>> Etienne
>>>
>>> On 28/10/2019 17:34, Alexey Romanenko wrote:
>>>
>>> Let me share some of my thoughts on this.
>>>
>>> - shall we filter out the package name from the release?
>>>
>>> Until new runner is not ready to be used in production (or, at least, be 
>>> used for beta testing but users should be clearly warned about that in this 
>>> case), I believe we need to filter out its classes from published jar to 
>>> avoid a confusion.
>>>
>>> Yes that is what I think also
>>>
>>> - should we release 2 jars: one for the old and one for the new ?
>>>
>>> - should we release 3 jars: one for the new, one for the new and one 
>>> for both ?
>>>
>>> Once new runner will be released, then I think we need to provide only one 
>>> single jar and allow user to switch between different Spark runners with 
>>> CLI option.
>>>
>>> I would vote for 3 jars: one for new, one for old, and one for both. 
>>> Indeed, in some cases, users are looking very closely at the size of jars. 
>>> This solution meets all use cases
>>>
>>> - should we create a special entry to the capability matrix ?
>>>
>>> Sure, since it has its own uniq characteristics and implementation, but 
>>> again, only once new runner will be "officially released".
>>>
>>> +1
>>>
>>>
>>>
>>> On 28 Oct 2019, at 10:27, Etienne Chauchot  wrote:
>>>
>>> Hi guys,
>>>
>>> Any opinions on the point2 communication to users ?
>>>
>>> Etienne
>>>
>>> On 24/10/2019 15:44, Etienne Chauchot wrote:
>>>
>>> Hi guys,
>>>
>>> I'm glad to announce that the PR for the merge to master of the new runner 
>>> based on Spark Structured Streaming framework is submitted:
>>>
>>> https://github.com/apache/beam/pull/9866
>>>
>>>
>>> 1. Regarding the status of the runner:
>>>
>>> -the runner passes 93% of the validates runner tests in batch mode.
>>>
>>> -Streaming mode is barely started (waiting for the multi-aggregations 
>>> support in spark Structured Streaming framework from the Spark community)
>>>
>>> -Runner can execute Nexmark
>>>
>>> -Some things are not wired up yet
>>>
>>>   -Beam Schemas not wired with Spark Schemas
>>>
>>>   -Optional features of the model not implemented: state api, timer api, 
>>> splittable doFn api, …
>>>
>>>
>>> 2. Regarding the communication to users:
>>>
>>> - for reasons explained by Ismael: 

Re: RFC: python static typing PR

2019-10-30 Thread Robert Bradshaw
On Wed, Oct 30, 2019 at 1:26 PM Chad Dombrova  wrote:
>
>> Do you believe that a future mypy plugin could replace pipeline type checks 
>> in Beam, or are there limits to what it can do?
>
> mypy will get us quite far on its own once we completely annotate the beam 
> code.  That said, my PR does not include my efforts to turn PTransforms into 
> Generics, which will be required to properly analyze pipelines, so there's 
> still a lot more work to do.  I've experimented with a mypy plugin to smooth 
> over some of the rough spots in that workflow and I will just say that the 
> mypy API has a very steep learning curve.
>
> Another thing to note: mypy is very explicit about function annotations.  It 
> does not do the "implicit" inference that Beam does, such as automatically 
> detecting function return types.  I think it should be possible to do a lot 
> of that as a mypy plugin, and in fact, since it has little to do with Beam it 
> could grow into its own project with outside contributors.

Yeah, I don't think, as is, it can replace what we do, but with
plugins I think it could possibly come closer. Certainly there is
information that is only available at runtime (e.g. reading from a
database or avro/parquet file could provide the schema which can be
used for downstream checking) which may limit the ability to do
everything statically (even Beam Java is moving this direction). Mypy
clearly has an implementation of the "is compatible with" operator
that I would love to borrow, but unfortunately it's not (easily?)
exposed.

That being said, we should leverage what we can for pipeline
authoring, and it'll be a great development too regardless.


Re: RFC: python static typing PR

2019-10-30 Thread Ismaël Mejía
The herculean term is perfect to describe this impressive achievement Chad.

Congratulations and thanks for the effort to make this happen.  This will give
Beam users not only improved functionality but as Robert mentioned help
others to understand more quickly the internals of the python SDK.
Maintenance is a
small price to pay for the big wins of typing.

Huge +1



On Wed, Oct 30, 2019 at 9:26 PM Chad Dombrova  wrote:
>
>
>>
>> Do you believe that a future mypy plugin could replace pipeline type checks 
>> in Beam, or are there limits to what it can do?
>
>
> mypy will get us quite far on its own once we completely annotate the beam 
> code.  That said, my PR does not include my efforts to turn PTransforms into 
> Generics, which will be required to properly analyze pipelines, so there's 
> still a lot more work to do.  I've experimented with a mypy plugin to smooth 
> over some of the rough spots in that workflow and I will just say that the 
> mypy API has a very steep learning curve.
>
> Another thing to note: mypy is very explicit about function annotations.  It 
> does not do the "implicit" inference that Beam does, such as automatically 
> detecting function return types.  I think it should be possible to do a lot 
> of that as a mypy plugin, and in fact, since it has little to do with Beam it 
> could grow into its own project with outside contributors.
>
> -chad
>


Re: Python SDK timestamp precision

2019-10-30 Thread Robert Bradshaw
On Wed, Oct 30, 2019 at 2:00 AM Jan Lukavský  wrote:
>
> TL;DR - can we solve this by representing aggregations as not point-wise
> events in time, but time ranges? Explanation below.
>
> Hi,
>
> this is pretty interesting from a theoretical point of view. The
> question generally seems to be - having two events, can I reliably order
> them? One event might be end of one window and the other event might be
> start of another window. There is strictly required causality in this
> (although these events in fact have the same limiting timestamp). On the
> other hand, when I have two (random) point-wise events (one with
> timestamp T1 and the other with timestamp T2), then causality of these
> two depend on distance in space. If these two events differ by single
> nanosecond, then if they do not originate very "close" to each other,
> then there is high probability that different observers will see them in
> different order (and hence there is no causality possible and the order
> doesn't matter).
>
> That is to say - if I'm dealing with single external event (someone
> tells me something has happened at time T), then - around the boundaries
> of windows - it doesn't matter which window is this event placed into.
> There is no causal connection between start of window and the event.

I resolve this theoretical issue by working in a space with a single
time dimension and zero spatial dimensions. That is, the location of
an event is completely determined by a single coordinate in time, and
distance and causality are hence well defined for all possible
observers :). Realistically, there is both error and ambiguity in
assigning absolute timestamps to real-world events, but it's important
that this choice of ordering should be preserved (e.g. not compressed
away) by the system.

> Different situation is when we actually perform an aggregation on
> multiple elements (GBK) - then although we produce output at certain
> event-time timestamp, this event is not "point-wise external", but is a
> running aggregation on multiple (and possibly long term) data. That is
> of course causally preceding opening a new (tumbling) window.
>
> The question now is - could we extend WindowedValue so that it doesn't
> contain only single point-wise event in time, but a time range in case
> of aggregations? Then the example of Window.into -> GBK -> Window.into
> could actually behave correctly (respecting causality) and another
> positive thing could be, that the resulting timestamp of the aggregation
> would no longer have to be window.maxTimestamp - 1 (which is kind of
> strange).
>
> This might be actually equivalent to the "minus epsilon" approach, but
> maybe better understandable. Moreover, this should be probably available
> to users, because one can easily get to the same problems when using
> stateful ParDo to perform a GBK-like aggregation.
>
> Thoughts?

This is quite an interesting idea. In some sense, timestamps become
like interval windows, and window assignment is similar to the window
mapping fns that we use for side inputs. I still think the idea of a
timestmap for an element and a window for an element is needed (e.g.
one can have elements in a single window, especially the global window
that have different timestamps), but this could be interesting to
explore. It could definitely get rid of the "minus epsilon" weirdness,
though I don't think it completely solves the granularity issues.

> On 10/30/19 1:05 AM, Robert Bradshaw wrote:
> > On Tue, Oct 29, 2019 at 4:20 PM Kenneth Knowles  wrote:
> >> Point (1) is compelling. Solutions to the "minus epsilon" seem a bit 
> >> complex. On the other hand, an opaque and abstract Timestamp type (in each 
> >> SDK) going forward seems like a Pretty Good Idea (tm). Would you really 
> >> have to go floating point? Could you just have a distinguished 
> >> representation for non-inclusive upper/lower bounds? These could be at the 
> >> same reduced resolution as timestamps in element metadata, since that is 
> >> all they are compared against.
> > If I were coming up with an abstract, opaque representation of
> > Timestamp (and Duration) for Beam, I would explicitly include the
> > "minus epsilon" concept. One could still do arithmetic with these.
> > This would make any conversion to standard datetime libraries lossy
> > though.
> >
> >> Point (2) is also good, though it seems like something that could be 
> >> cleverly engineered and/or we just provide one implementation and it is 
> >> easy to make your own for finer granularity, since a WindowFn separately 
> >> receives the Timestamp (here I'm pretending it is abstract and opaque and 
> >> likely approximate) and the original element with whatever precision the 
> >> original data included.
> > Yes, but I don't see how a generic WindowFn would reach into the
> > (arbitrary) element and pull out this original data. One of the
> > benefits of the Beam model is that the WindowFn does not have to
> > depend on the element type.
> 

Re: RFC: python static typing PR

2019-10-30 Thread Chad Dombrova
> Do you believe that a future mypy plugin could replace pipeline type
> checks in Beam, or are there limits to what it can do?
>

mypy will get us quite far on its own once we completely annotate the beam
code.  That said, my PR does not include my efforts to turn PTransforms
into Generics, which will be required to properly analyze pipelines, so
there's still a lot more work to do.  I've experimented with a mypy plugin
to smooth over some of the rough spots in that workflow and I will just say
that the mypy API has a very steep learning curve.

Another thing to note: mypy is very explicit about function annotations.
It does not do the "implicit" inference that Beam does, such as
automatically detecting function return types.  I *think* it should be
possible to do a lot of that as a mypy plugin, and in fact, since it has
little to do with Beam it could grow into its own project with outside
contributors.

-chad


Re: Rethinking the Flink Runner modes

2019-10-30 Thread Robert Bradshaw
One more question: https://issues.apache.org/jira/browse/BEAM-8396
still seems valuable, but with [auto] as the default, how should we
detect whether LOOPBACK is safe to enable from Python?

On Wed, Oct 30, 2019 at 11:53 AM Robert Bradshaw  wrote:
>
> Sounds good to me.
>
> One thing I don't understand is what it means for "CLI or REST API
> context [to be] present." Where does this context come from? A config
> file in a standard location on the user's machine? Or is this
> something that is only present when a user uploads a jar and then
> Flink runs it in a specific context? Or both?
>
> On Tue, Oct 29, 2019 at 3:27 AM Maximilian Michels  wrote:
> >
> > tl;dr:
> >
> > - I see consensus for inferring "http://; in Python to align it with the
> > Java behavior which currently requires leaving out the protocol scheme.
> > Optionally, Java could also accept a scheme which gets removed as
> > required by the Flink Java Rest client.
> >
> > - We won't support "https://; in Python for now, because it requires
> > additional SSL setup, i.e. parsing the Flink config file and setting up
> > the truststore
> >
> > - We want to keep "[auto]"/"[local]" but fix the current broken behavior
> > via https://issues.apache.org/jira/browse/BEAM-8507
> >
> >
> > Additional comments below:
> >
> > > One concern with this is that just supplying host:port is the existing
> > > behavior, so we can't start requiring the http://
> >
> > I wouldn't require it but optionally support it, otherwise add it
> > automatically.
> >
> > > One question I have is if there are other
> > > authentication parameters that may be required for speaking to a flink
> > > endpoint that we should be aware of (that would normally be buried in
> > > the config file).
> >
> > Yes, essentially all the SSL configuration is in the config file,
> > including the location of the truststore, the password, certificates, etc.
> >
> > For now, I would say we cannot properly support SSL in Python, unless we
> > find a way to load the truststore from Python.
> >
> > > I do like being explicit with something like [local] rather than
> > > treating the empty string in a magical way.
> >
> > Fine, we can keep "[local]" and throw an error in case the address is
> > empty. Let's also throw an error in case the Flink CLI tool is used with
> > local execution because that is clearly not what the user wants.
> >
> > >> I'd like to see this issue resolved before 2.17 as changing the public 
> > >> API once it's released will be harder.
> > >
> > > +1. In particular, I misunderstood that [auto] is not supported by 
> > > `FlinkUberJarJobServer`. Since [auto] is now the default, it's broken for 
> > > Python 3.6+.
> >
> > +1 Let's fix this in time for the release.
> >
> > > The user shouldn't have to specify a protocol for Python, I think it's 
> > > preferable and reasonable to handle that for them in order to maintain 
> > > existing behavior and align with Java SDK.
> >
> > +1
> >
> > > Looks like we also have a [collection] configuration value [1].
> >
> > Yeah, I think it is acceptable to remove this entirely. This has never
> > been used by anyone and is an unmaintained Flink feature.
> >
> > > I would find it acceptable to interpret absence of the option as 
> > > "[auto]", which really means use CLI or REST API context when present, or 
> > > local. I would prefer to not have an empty string value default (but 
> > > rather None/null) and no additional magic values.
> >
> > Let's keep "[auto]" then to keep it explicit. An empty string should
> > throw an error.
> >
> >
> > > One more reason that was not part of this discussion yet.
> >
> > @Jan: Supporting context classloaders in local mode is a new feature and
> > for keeping it simple, I'd start a new thread for it.
> >
> >
> > On 29.10.19 10:55, Jan Lukavský wrote:
> > > Hi,
> > >
> > > +1 for empty string being interpreted as [auto] and anything else having
> > > explicit notation.
> > >
> > > One more reason that was not part of this discussion yet. In [1] there
> > > was a discussion about LocalEnvironment (that is the one that is
> > > responsible for spawning in process Flink cluster) not using context
> > > classloader and thus can fail loading some user code (if this code was
> > > added to context classloader *after* application has been run).
> > > LocalEnvironment on the other hand supposes that all classes can be
> > > loaded by applicaiton's classloader and doesn't accept any "client
> > > jars". Therefore - when application generates classes dynamically during
> > > runtime it is currently impossible to run those using local flink
> > > runner. There is a nasty hack for JDK <= 8 (injecting URL into
> > > applications URLClassLoader), but that fails hard on JDK >= 9 (obviously).
> > >
> > > The conclusion from that thread is that it could be solved by manually
> > > running MiniCluster (which will run on localhost:8081 by default) and
> > > then use this REST address for RemoteEnvironment so that the 

Re: Rethinking the Flink Runner modes

2019-10-30 Thread Robert Bradshaw
Sounds good to me.

One thing I don't understand is what it means for "CLI or REST API
context [to be] present." Where does this context come from? A config
file in a standard location on the user's machine? Or is this
something that is only present when a user uploads a jar and then
Flink runs it in a specific context? Or both?

On Tue, Oct 29, 2019 at 3:27 AM Maximilian Michels  wrote:
>
> tl;dr:
>
> - I see consensus for inferring "http://; in Python to align it with the
> Java behavior which currently requires leaving out the protocol scheme.
> Optionally, Java could also accept a scheme which gets removed as
> required by the Flink Java Rest client.
>
> - We won't support "https://; in Python for now, because it requires
> additional SSL setup, i.e. parsing the Flink config file and setting up
> the truststore
>
> - We want to keep "[auto]"/"[local]" but fix the current broken behavior
> via https://issues.apache.org/jira/browse/BEAM-8507
>
>
> Additional comments below:
>
> > One concern with this is that just supplying host:port is the existing
> > behavior, so we can't start requiring the http://
>
> I wouldn't require it but optionally support it, otherwise add it
> automatically.
>
> > One question I have is if there are other
> > authentication parameters that may be required for speaking to a flink
> > endpoint that we should be aware of (that would normally be buried in
> > the config file).
>
> Yes, essentially all the SSL configuration is in the config file,
> including the location of the truststore, the password, certificates, etc.
>
> For now, I would say we cannot properly support SSL in Python, unless we
> find a way to load the truststore from Python.
>
> > I do like being explicit with something like [local] rather than
> > treating the empty string in a magical way.
>
> Fine, we can keep "[local]" and throw an error in case the address is
> empty. Let's also throw an error in case the Flink CLI tool is used with
> local execution because that is clearly not what the user wants.
>
> >> I'd like to see this issue resolved before 2.17 as changing the public API 
> >> once it's released will be harder.
> >
> > +1. In particular, I misunderstood that [auto] is not supported by 
> > `FlinkUberJarJobServer`. Since [auto] is now the default, it's broken for 
> > Python 3.6+.
>
> +1 Let's fix this in time for the release.
>
> > The user shouldn't have to specify a protocol for Python, I think it's 
> > preferable and reasonable to handle that for them in order to maintain 
> > existing behavior and align with Java SDK.
>
> +1
>
> > Looks like we also have a [collection] configuration value [1].
>
> Yeah, I think it is acceptable to remove this entirely. This has never
> been used by anyone and is an unmaintained Flink feature.
>
> > I would find it acceptable to interpret absence of the option as "[auto]", 
> > which really means use CLI or REST API context when present, or local. I 
> > would prefer to not have an empty string value default (but rather 
> > None/null) and no additional magic values.
>
> Let's keep "[auto]" then to keep it explicit. An empty string should
> throw an error.
>
>
> > One more reason that was not part of this discussion yet.
>
> @Jan: Supporting context classloaders in local mode is a new feature and
> for keeping it simple, I'd start a new thread for it.
>
>
> On 29.10.19 10:55, Jan Lukavský wrote:
> > Hi,
> >
> > +1 for empty string being interpreted as [auto] and anything else having
> > explicit notation.
> >
> > One more reason that was not part of this discussion yet. In [1] there
> > was a discussion about LocalEnvironment (that is the one that is
> > responsible for spawning in process Flink cluster) not using context
> > classloader and thus can fail loading some user code (if this code was
> > added to context classloader *after* application has been run).
> > LocalEnvironment on the other hand supposes that all classes can be
> > loaded by applicaiton's classloader and doesn't accept any "client
> > jars". Therefore - when application generates classes dynamically during
> > runtime it is currently impossible to run those using local flink
> > runner. There is a nasty hack for JDK <= 8 (injecting URL into
> > applications URLClassLoader), but that fails hard on JDK >= 9 (obviously).
> >
> > The conclusion from that thread is that it could be solved by manually
> > running MiniCluster (which will run on localhost:8081 by default) and
> > then use this REST address for RemoteEnvironment so that the application
> > would be actually submitted as if it would be run on remote cluster and
> > therefore a dynamically generated JAR can be attached to it.
> >
> > That would mean, that we can actually have two "local" modes - one using
> > LocalEnvironment and one with manual MiniCluster + RemoteEnvironment (if
> > for whatever reason we would like to keep both mode of local operation).
> > That could mean two masters - e.g. [local] and [local-over-remote] or
> 

Re: [spark structured streaming runner] merge to master?

2019-10-30 Thread Alexey Romanenko
Yes, agree, two jars included in uber jar will work in the similar way. Though 
having 3 jars looks still quite confusing for me.

> On 29 Oct 2019, at 23:54, Kenneth Knowles  wrote:
> 
> Is it just as easy to have two jars and build an uber jar with both included? 
> Then the runner can still be toggled with a flag.
> 
> Kenn
> 
> On Tue, Oct 29, 2019 at 9:38 AM Alexey Romanenko  > wrote:
> Hmm, I don’t think that jar size should play a big role comparing to the 
> whole size of shaded jar of users job. Even more, I think it will be quite 
> confusing for users to choose which jar to use if we will have 3 different 
> ones for similar purposes. Though, let’s see what others think.
> 
>> On 29 Oct 2019, at 15:32, Etienne Chauchot > > wrote:
>> 
>> Hi Alexey, 
>> Thanks for your opinion !
>> 
>> Comments inline
>> 
>> Etienne
>> On 28/10/2019 17:34, Alexey Romanenko wrote:
>>> Let me share some of my thoughts on this.
> - shall we filter out the package name from the release? 
>>> Until new runner is not ready to be used in production (or, at least, be 
>>> used for beta testing but users should be clearly warned about that in this 
>>> case), I believe we need to filter out its classes from published jar to 
>>> avoid a confusion.
>> Yes that is what I think also
> - should we release 2 jars: one for the old and one for the new ? 
> - should we release 3 jars: one for the new, one for the new and one 
> for both ?
> 
>>> Once new runner will be released, then I think we need to provide only one 
>>> single jar and allow user to switch between different Spark runners with 
>>> CLI option.
>> I would vote for 3 jars: one for new, one for old, and one for both. Indeed, 
>> in some cases, users are looking very closely at the size of jars. This 
>> solution meets all use cases
> - should we create a special entry to the capability matrix ?
> 
>>> 
>>> Sure, since it has its own uniq characteristics and implementation, but 
>>> again, only once new runner will be "officially released".
>> +1
>>> 
>>> 
 On 28 Oct 2019, at 10:27, Etienne Chauchot >>> > wrote:
 
 Hi guys,
 
 Any opinions on the point2 communication to users ?
 
 Etienne
 On 24/10/2019 15:44, Etienne Chauchot wrote:
> Hi guys,
> 
> I'm glad to announce that the PR for the merge to master of the new 
> runner based on Spark Structured Streaming framework is submitted:
> 
> https://github.com/apache/beam/pull/9866 
> 
> 
> 1. Regarding the status of the runner: 
> -the runner passes 93% of the validates runner tests in batch mode.
> 
> -Streaming mode is barely started (waiting for the multi-aggregations 
> support in spark Structured Streaming framework from the Spark community)
> 
> -Runner can execute Nexmark
> 
> -Some things are not wired up yet
> 
>   -Beam Schemas not wired with Spark Schemas
> 
>   -Optional features of the model not implemented: state api, timer api, 
> splittable doFn api, …
> 
> 2. Regarding the communication to users:
> 
> - for reasons explained by Ismael: the runner is in the same module as 
> the "older" one. But it is in a different sub-package and both runners 
> share the same build.  
> - How should we communicate to users: 
> - shall we filter out the package name from the release? 
> - should we release 2 jars: one for the old and one for the new ? 
> - should we release 3 jars: one for the new, one for the new and one 
> for both ?
> 
> - should we create a special entry to the capability matrix ?
> 
> WDYT ?
> Best
> 
> Etienne
> 
> On 23/10/2019 19:11, Mikhail Gryzykhin wrote:
>> +1 to merge.
>> 
>> It is worth keeping things in master with explicitly marked status. It 
>> will make effort more visible to users and easier to get feedback upon.
>> 
>> --Mikhail
>> 
>> On Wed, Oct 23, 2019 at 8:36 AM Etienne Chauchot > > wrote:
>> Hi guys,
>> 
>> The new spark runner now supports beam coders and passes 93% of the 
>> batch validates runner tests (+4%). I think it is time to merge it to 
>> master. I will submit a PR in the coming days.
>> 
>> next steps: support schemas and thus better leverage catalyst optimizer 
>> (among other things optims based on data), port perfs optims that were 
>> done in the current runner.
>> Best
>> Etienne
>> On 11/10/2019 22:48, Pablo Estrada wrote:
>>> +1 for merging : )
>>> 
>>> On Fri, Oct 11, 2019 at 12:43 PM Robert Bradshaw >> > wrote:
>>> Sounds like a good plan to me. 
>>> 
>>> On Fri, Oct 11, 2019 at 6:20 AM Etienne 

Re: why are so many transformation needed for a simple TextIO.write() operation

2019-10-30 Thread Luke Cwik
A lot of the logic is around handling various error scenarios.

You should notice that the majority of that graph is about passing around
metadata around what files were written and what errors there were. That
metadata is tiny in comparison and should only be a blip when compared to
writing the files themselves.

On Sun, Oct 20, 2019 at 10:17 PM Pulasthi Supun Wickramasinghe <
pulasthi...@gmail.com> wrote:

> Hi Dev's
>
> I was trying to understand the transformations created for the
> following pipeline, which seems to be pretty simple from the looks of it.
> But the graph created seems to be pretty complex. I have attached a rough
> sketch of the graph that I understood from debugging the code below [1].
> Was a little bit puzzled as to why so many transformations are introduced
> for the write() operation, is this the normal behavior for I/O operations
> or am I missing something? doesn't this introduced a lot of unwanted
> overhead to a simple operation?
>
> PCollection result =
> p.apply(GenerateSequence.from(0).to(10))
> .apply(
> ParDo.of(
> new DoFn() {
>   @ProcessElement
>   public void processElement(ProcessContext c) throws
> Exception {
> c.output(c.element().toString());
>   }
> }));
>
> result.apply(TextIO.write().to(new URI(resultPath).getPath() +
> "/part"));
>
>
> [1]
>  beam graph
> 
>
> Best Regards,
> Pulasthi
> --
> Pulasthi S. Wickramasinghe
> PhD Candidate  | Research Assistant
> School of Informatics and Computing | Digital Science Center
> Indiana University, Bloomington
> cell: 224-386-9035 <(224)%20386-9035>
>


Re: RFC: python static typing PR

2019-10-30 Thread Chad Dombrova
>
> As Beam devs will be gaining more first-hand experience with the tooling,
> we may need to add a style guide/best practices/FAQ to our contributor
> guide to clarify known issues.
>

I'm happy to help out with that, just let me know.

-chad


Re: RFC: python static typing PR

2019-10-30 Thread Luke Cwik
+1 for type annotations.

On Mon, Oct 28, 2019 at 7:41 PM Robert Burke  wrote:

> As someone who cribs from the Python SDK to make changes in the Go SDK,
> this will make things much easier to follow! Thank you.
>
> On Mon, Oct 28, 2019, 6:52 PM Chad Dombrova  wrote:
>
>>
>> Wow, that is an incredible amount of work!
>>>
>>
>> Some people meditate.  I annotate ;)
>>
>> I'm definitely of the opinion that there's no viable counterargument to
>>> the value of types, especially for large or complex codebases.
>>>
>>
>> Agreed.  That's part of why I waited until I got the whole thing passing
>> before really promoting the review of this PR.
>>
>> Robert and I have worked out a rough plan for merging:
>>
>>- I'll make a new PR with the foundations of the existing PR -- these
>>are almost entirely type comments so no appreciable runtime changes --
>>along with the mypy lint, which will be part of python precommit but not
>>affect its failure status.   We'll get that merged first.
>>- From there it should be easy for others to review and merge the
>>remaining commits in parallel.
>>- Once everything is in, we'll make a final commit to stop ignoring
>>failures for the mypy job.
>>
>>
>> For those concerned about yet another lint (a very reasonable concern!)
>> the py37-mypy tox job completes in 37s on my macbook pro (excluding
>> virtualenv setup time).
>>
>> -chad
>>
>>
>>
>>
>


Streaming data from Pubsub to Spanner with Beam dataflow pipeline

2019-10-30 Thread Taher Koitawala
Hi All,
  My current use-case is to write data from Pubsub to Spanner using
a streaming pipeline. I do see that Beam does have a SpannerIO to write.

  However, pubsub being streaming and Spanner being RDBMS like, it
would be helpful to you guys can tell me if this will be performant enough
or not. If someone has already tried this out and can give me a few
caveats, then that would be really awesome.


Regards,
Taher Koitawala


Re: Python SDK timestamp precision

2019-10-30 Thread Jan Lukavský
TL;DR - can we solve this by representing aggregations as not point-wise 
events in time, but time ranges? Explanation below.


Hi,

this is pretty interesting from a theoretical point of view. The 
question generally seems to be - having two events, can I reliably order 
them? One event might be end of one window and the other event might be 
start of another window. There is strictly required causality in this 
(although these events in fact have the same limiting timestamp). On the 
other hand, when I have two (random) point-wise events (one with 
timestamp T1 and the other with timestamp T2), then causality of these 
two depend on distance in space. If these two events differ by single 
nanosecond, then if they do not originate very "close" to each other, 
then there is high probability that different observers will see them in 
different order (and hence there is no causality possible and the order 
doesn't matter).


That is to say - if I'm dealing with single external event (someone 
tells me something has happened at time T), then - around the boundaries 
of windows - it doesn't matter which window is this event placed into. 
There is no causal connection between start of window and the event.


Different situation is when we actually perform an aggregation on 
multiple elements (GBK) - then although we produce output at certain 
event-time timestamp, this event is not "point-wise external", but is a 
running aggregation on multiple (and possibly long term) data. That is 
of course causally preceding opening a new (tumbling) window.


The question now is - could we extend WindowedValue so that it doesn't 
contain only single point-wise event in time, but a time range in case 
of aggregations? Then the example of Window.into -> GBK -> Window.into 
could actually behave correctly (respecting causality) and another 
positive thing could be, that the resulting timestamp of the aggregation 
would no longer have to be window.maxTimestamp - 1 (which is kind of 
strange).


This might be actually equivalent to the "minus epsilon" approach, but 
maybe better understandable. Moreover, this should be probably available 
to users, because one can easily get to the same problems when using 
stateful ParDo to perform a GBK-like aggregation.


Thoughts?

Jan


I see two distinct "objects" for both of which we currently use the same

On 10/30/19 1:05 AM, Robert Bradshaw wrote:

On Tue, Oct 29, 2019 at 4:20 PM Kenneth Knowles  wrote:

Point (1) is compelling. Solutions to the "minus epsilon" seem a bit complex. 
On the other hand, an opaque and abstract Timestamp type (in each SDK) going forward 
seems like a Pretty Good Idea (tm). Would you really have to go floating point? Could you 
just have a distinguished representation for non-inclusive upper/lower bounds? These 
could be at the same reduced resolution as timestamps in element metadata, since that is 
all they are compared against.

If I were coming up with an abstract, opaque representation of
Timestamp (and Duration) for Beam, I would explicitly include the
"minus epsilon" concept. One could still do arithmetic with these.
This would make any conversion to standard datetime libraries lossy
though.


Point (2) is also good, though it seems like something that could be cleverly 
engineered and/or we just provide one implementation and it is easy to make 
your own for finer granularity, since a WindowFn separately receives the 
Timestamp (here I'm pretending it is abstract and opaque and likely 
approximate) and the original element with whatever precision the original data 
included.

Yes, but I don't see how a generic WindowFn would reach into the
(arbitrary) element and pull out this original data. One of the
benefits of the Beam model is that the WindowFn does not have to
depend on the element type.


Point (3) the model/runner owns the timestamp metadata so I feel fine about it 
being approximated as long as any original user data is still present. I don't 
recall seeing a compelling case where the timestamp metadata that the runner 
tracks and understands is required to be exactly the same as a user value 
(assuming users understand this distinction, which is another issue that I 
would separate from whether it is technically feasible).

As we provide the ability to designate user data as the runner
timestamp against which to window, and promote the runner timestamp
back to user data (people are going to want to get DateTime or Instant
objects out of it), it seems tricky to explain to users that one or
both of these operations may be lossy (and, in addition, I don't think
there's a consistently safe direction to round).


The more I think about the very real problems you point out, the more I think 
that our backwards-incompatible move should be to our own abstract Timestamp 
type, putting the design decision behind a minimal interface. If we see a 
concrete design for that data type, we might be inspired how to support more 
possibilities.

As for the