Re: RabbitMQ and CheckpointMark feasibility

2019-11-13 Thread Jan Lukavský

Hi Danny,

as Eugene pointed out, there are essentially two "modes of operation" of 
CheckpointMark. It can:


 a) be used to somehow restore state of a reader (in call to 
UnboundedSource#createReader)


 b) confirm processed elements in CheckpointMark#finalizeCheckpoint

If your source doesn't provide a persistent position in data stream that 
can be referred to (and serialized - example of this would be kafka 
offsets), then what you actually need to serialize is not the channel, 
but a way how to restore it - e.g. by opening a new channel with a given 
'consumer group name'. Then you just use this checkpoint to commit your 
processed data in finalizeCheckpoint.


Note that the finalizeCheckpoint is not guaranteed to be called - that 
can happen in cases when an error occurs and the source has to be rewind 
back - that is what direct runner emulates with the probability of 
'readerReuseChance'.


I'm reading the documentation of RabbitMQ very quickly, but if I 
understand it correctly, then you have to create a subscription to the 
broker, serialize identifier of the subscription into the checkpointmark 
and then just recover the subscription in call to 
UnboundedSource#createReader. That should do the trick.


Hope this helps, sorry if I'm not using 100% correct RabbitMQ 
terminology as I said, I'm not quite familiar with it.


Best,

 Jan

On 11/14/19 5:26 AM, Daniel Robert wrote:


I believe I've nailed down a situation that happens in practice that 
causes Beam and Rabbit to be incompatible. It seems that runners can 
and do make assumptions about the serializability (via Coder) of a 
CheckpointMark.


To start, these are the semantics of RabbitMQ:

- the client establishes a connection to the server
- client opens a channel on the connection
- messages are either pulled or pushed to the client from the server 
along this channel
- when messages are done processing, they are acknowledged 
*client-side* and must be acknowledged on the *same channel* that 
originally received the message.


Since a channel (or any open connection) is non-serializable, it means 
that a CheckpointMark that has been serialized cannot ever be used to 
acknowledge these messages and correctly 'finalize' the checkpoint. It 
also, as previously discussed in this thread, implies a rabbit Reader 
cannot accept an existing CheckpointMark at all; the Reader and the 
CheckpointMark must share the same connection to the rabbit server 
("channel").


Next, I've found how DirectRunner (and presumably others) can attempt 
to serialize a CheckpointMark that has not been finalized. In 
https://github.com/apache/beam/blob/master/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java#L150, 
the DirectRunner applies a probability and if it hits, it sets the 
current reader to 'null' but retains the existing CheckpointMark, 
which it then attempts to pass to a new reader via a Coder.


This puts the shard, the runner, and the reader with differing views 
of the world. In UnboundedReadEvaluatorFactory's processElement 
function, a call to getReader(shard) ( 
https://github.com/apache/beam/blob/master/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java#L132 
) clones the shard's checkpoint mark and passes that to the new 
reader. The reader ignores it, creating its own, but even if it 
accepted it, it would be accepting a serialized CheckpointMark, which 
wouldn't work. Later, the runner calls finishRead ( 
https://github.com/apache/beam/blob/master/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java#L246 
). The shard's CheckpointMark (unserialized; which should still be 
valid) is finalized. The reader's CheckpointMark (which may be a 
different instance) becomes the return value, which is referred to as 
"finishedCheckpoint" in the calling code, which is misleading at best 
and problematic at worst as *this* checkpoint has not been finalized.


So, tl;dr: I cannot find any means of maintaining a persistent 
connection to the server for finalizing checkpoints that is safe 
across runners. If there's a guarantee all of the shards are on the 
same JVM instance, I could rely on global, static 
collections/instances as a workaround, but if other runners might 
serialize this across the wire, I'm stumped. The only workable 
situation I can think of right now is to proactively acknowledge 
messages as they are received and effectively no-op in 
finalizeCheckpoint. This is very different, semantically, and can lead 
to dropped messages if a pipeline doesn't finish processing the given 
message.


Any help would be much appreciated.

Thanks,
-Danny

On 11/7/19 10:27 PM, Eugene Kirpichov wrote:

Hi Daniel,

This is probably insufficiently well documented. The CheckpointMark 
is used for two purposes:
1) To persistently store some notion of how much of the stream has 
been consumed, so that if something fails 

Re: [discuss] Using a logger hierarchy in Python

2019-11-13 Thread Chad Dombrova
Hi Thomas,


> Will this include the ability for users to configure logging via pipeline
> options?
>

We're working on a proposal to allow pluggable logging handlers that can be
configured via pipeline options.  For example, it would allow you to add a
new logging handler for StackDriver or Elasticsearch.  Will hopefully have
a document to share soon.

-chad


Re: RabbitMQ and CheckpointMark feasibility

2019-11-13 Thread Daniel Robert
I believe I've nailed down a situation that happens in practice that 
causes Beam and Rabbit to be incompatible. It seems that runners can and 
do make assumptions about the serializability (via Coder) of a 
CheckpointMark.


To start, these are the semantics of RabbitMQ:

- the client establishes a connection to the server
- client opens a channel on the connection
- messages are either pulled or pushed to the client from the server 
along this channel
- when messages are done processing, they are acknowledged *client-side* 
and must be acknowledged on the *same channel* that originally received 
the message.


Since a channel (or any open connection) is non-serializable, it means 
that a CheckpointMark that has been serialized cannot ever be used to 
acknowledge these messages and correctly 'finalize' the checkpoint. It 
also, as previously discussed in this thread, implies a rabbit Reader 
cannot accept an existing CheckpointMark at all; the Reader and the 
CheckpointMark must share the same connection to the rabbit server 
("channel").


Next, I've found how DirectRunner (and presumably others) can attempt to 
serialize a CheckpointMark that has not been finalized. In 
https://github.com/apache/beam/blob/master/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java#L150, 
the DirectRunner applies a probability and if it hits, it sets the 
current reader to 'null' but retains the existing CheckpointMark, which 
it then attempts to pass to a new reader via a Coder.


This puts the shard, the runner, and the reader with differing views of 
the world. In UnboundedReadEvaluatorFactory's processElement function, a 
call to getReader(shard) ( 
https://github.com/apache/beam/blob/master/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java#L132 
) clones the shard's checkpoint mark and passes that to the new reader. 
The reader ignores it, creating its own, but even if it accepted it, it 
would be accepting a serialized CheckpointMark, which wouldn't work. 
Later, the runner calls finishRead ( 
https://github.com/apache/beam/blob/master/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java#L246 
). The shard's CheckpointMark (unserialized; which should still be 
valid) is finalized. The reader's CheckpointMark (which may be a 
different instance) becomes the return value, which is referred to as 
"finishedCheckpoint" in the calling code, which is misleading at best 
and problematic at worst as *this* checkpoint has not been finalized.


So, tl;dr: I cannot find any means of maintaining a persistent 
connection to the server for finalizing checkpoints that is safe across 
runners. If there's a guarantee all of the shards are on the same JVM 
instance, I could rely on global, static collections/instances as a 
workaround, but if other runners might serialize this across the wire, 
I'm stumped. The only workable situation I can think of right now is to 
proactively acknowledge messages as they are received and effectively 
no-op in finalizeCheckpoint. This is very different, semantically, and 
can lead to dropped messages if a pipeline doesn't finish processing the 
given message.


Any help would be much appreciated.

Thanks,
-Danny

On 11/7/19 10:27 PM, Eugene Kirpichov wrote:

Hi Daniel,

This is probably insufficiently well documented. The CheckpointMark is 
used for two purposes:
1) To persistently store some notion of how much of the stream has 
been consumed, so that if something fails we can tell the underlying 
streaming system where to start reading when we re-create the reader. 
This is why CheckpointMark is Serializable. E.g. this makes sense for 
Kafka.
2) To do acks - to let the underlying streaming system know that the 
Beam pipeline will never need data up to this CheckpointMark. Acking 
does not require serializability - runners call ack() on the same 
in-memory instance of CheckpointMark that was produced by the reader. 
E.g. this makes sense for RabbitMq or Pubsub.


In practice, these two capabilities tend to be mutually exclusive: 
some streaming systems can provide a serializable CheckpointMark, some 
can do acks, some can do neither - but very few (or none) can do both, 
and it's debatable whether it even makes sense for a system to provide 
both capabilities: usually acking is an implicit form of 
streaming-system-side checkpointing, i.e. when you re-create the 
reader you don't actually need to carry over any information from an 
old CheckpointMark - the necessary state (which records should be 
delivered) is maintained on the streaming system side.


These two are lumped together into one API simply because that was the 
best design option we came up with (not for lack of trying, but 
suggestions very much welcome - AFAIK nobody is happy with it).


RabbitMQ is under #2 - it can't do serializable checkpoint marks, but 
it can do acks. So you can simply 

Why is Pipeline not Serializable and can it be changed to be Serializable

2019-11-13 Thread Pulasthi Supun Wickramasinghe
Hi Dev's

Currently, the Pipeline class in Beam is not Serializable. This is not a
problem for the current runners since the pipeline is translated and
submitted through a centralized Driver like model. However, if the runner
has a decentralized model similar to OpenMPI (MPI), which is also the case
with Twister2, which I am developing a runner currently, it would have been
better if the pipeline itself was Serializable.

Currently, I am trying to transform the Pipeline into a Twister2 graph and
then send over to the workers, however since there are some functions such
as "SystemReduceFn" that are not serializable this also is somewhat
troublesome.

Was the decision to make Pipelines not Serializable made due to some
specific reason or because all the current use cases did not present any
valid requirement to make them Serializable?

Best Regards,
Pulasthi
-- 
Pulasthi S. Wickramasinghe
PhD Candidate  | Research Assistant
School of Informatics and Computing | Digital Science Center
Indiana University, Bloomington
cell: 224-386-9035


Re: session window puzzle

2019-11-13 Thread Aaron Dixon
This is a great help. Thank you. I like the custom window solution
pattern as a way to hold the watermark and merge down to keep the watermark
where it is needed. Perhaps there is some interesting generalized session
window here.. I'll have to digest the stateful DoFn approach. Avoiding
unnecessary shuffles is a good note.

As a side note, there is MIN, MAX and END_OF_WINDOW TimestampCombiner. Has
it been discussed to ever allow more customization here? Seems like
customizing the combiner with element-awareness would have solved this
problem, as well.


On Wed, Nov 13, 2019 at 7:56 PM Kenneth Knowles  wrote:

> You've done a very good analysis* and I think your solution is pretty
> clever. The simple fact is this: the watermark has to be held to the
> minimum of any output you intend to produce. So for your use case, the hold
> has to be the timestamp of the Green element. Your solution does hold the
> watermark to the right time. I have a couple thoughts that may be helpful.
>
> 0. If you partition by user does the stream contain a bunch of Orange,
> Green, Blue elements? Is it possible that a session contains multiple
> [Orange, Green, Blue] sequences? Is it possible that an [Orange, Green,
> Blue] sequence is split across multiple sessions?
>
> 1. In your proposed solution, it probably could be expressed as a new
> merging WindowFn. You would assign each Green element to two tagged windows
> that were GreenFromOrange and GreenToBlue type, and have a separate window
> tag for OrangeWindow and BlueWindow. Then GreenFromOrange merges with
> OrangeWindow only, etc.
>
> 2. This might also turn out simply as a stateful DoFn, where you manually
> manage what state the funnel is in. When you set a timer to wait for the
> Orange element, you may need an upcoming feature where you set a timer for
> a future event time but the watermark is held to the Green element's
> timestamp. CC Reuven on that use case.
>
> What I would like to avoid is you having to do two shuffles (on whatever
> runner). This should be doable with one.
>
> *SessionWindow plus EARLIEST holding up the watermark/pipeline was an
> early complaint. That is part of why we switched the default to
> end-of-window (also it is easier to understand and more efficient to
> compute)
>
> Kenn
>
> On Wed, Nov 13, 2019 at 3:25 PM Aaron Dixon  wrote:
>
>> This is a real use case we have, but simplified:
>>
>> My user session look like this: user visits a page, and clicks three
>> buttons: Orange then Green then Blue.
>>
>> I need to compute the average time between Orange & Blue clicks but I
>> need to window on the timestamp of the green button click.
>>
>> In requirements terms: Compute average time between Orange and Blue for
>> all Green clicks that occur on Monday. (So User could click Orange on
>> Sunday, Green on Monday and Blue on Tuesday.)
>>
>> One strategy is to try to use a single SessionWindow to capture the
>> entire user session; then calculate the *span* (time between Orange and
>> Blue clicks) and *then* compute average of all spans.
>>
>> To do this the *span*/counts would have to all "land" in a window
>> representing Monday.
>>
>> If I use a SessionWindow w/ TimestampCombiner/EARLIEST then I can make
>> sure they land in this window using .outputWithTimestamp without worrying
>> that I'll be regressing the event timestamp.
>>
>> Except when I use this Combiner/EARLIEST strategy my watermark is held up
>> substantially (and incidentally seems to drag the pipeline).
>>
>> But if I use Beam's default TimestampCombiner/END_OF_WINDOW then I won't
>> be able to output the *span* result at a timestamp representing the
>> Green click.
>>
>> So a single SessionWindow seems out. (Unless I'm missing something.)
>>
>> The only other strategy I can conceive of at the moment is to capture
>> *two* sessions, representing each "leg" of the overall session. One
>> windows on the [Orange,Green] (using END_OF_WINDOW); the other [Green,Blue]
>> (using EARLIEST). Then I can "join" these two to get both legs together and
>> compute the overall span. This seems like a quite complicated way to solve
>> this (simple?) problem.
>>
>> Thoughts? What am I missing?
>>
>


Re: session window puzzle

2019-11-13 Thread Kenneth Knowles
You've done a very good analysis* and I think your solution is pretty
clever. The simple fact is this: the watermark has to be held to the
minimum of any output you intend to produce. So for your use case, the hold
has to be the timestamp of the Green element. Your solution does hold the
watermark to the right time. I have a couple thoughts that may be helpful.

0. If you partition by user does the stream contain a bunch of Orange,
Green, Blue elements? Is it possible that a session contains multiple
[Orange, Green, Blue] sequences? Is it possible that an [Orange, Green,
Blue] sequence is split across multiple sessions?

1. In your proposed solution, it probably could be expressed as a new
merging WindowFn. You would assign each Green element to two tagged windows
that were GreenFromOrange and GreenToBlue type, and have a separate window
tag for OrangeWindow and BlueWindow. Then GreenFromOrange merges with
OrangeWindow only, etc.

2. This might also turn out simply as a stateful DoFn, where you manually
manage what state the funnel is in. When you set a timer to wait for the
Orange element, you may need an upcoming feature where you set a timer for
a future event time but the watermark is held to the Green element's
timestamp. CC Reuven on that use case.

What I would like to avoid is you having to do two shuffles (on whatever
runner). This should be doable with one.

*SessionWindow plus EARLIEST holding up the watermark/pipeline was an early
complaint. That is part of why we switched the default to end-of-window
(also it is easier to understand and more efficient to compute)

Kenn

On Wed, Nov 13, 2019 at 3:25 PM Aaron Dixon  wrote:

> This is a real use case we have, but simplified:
>
> My user session look like this: user visits a page, and clicks three
> buttons: Orange then Green then Blue.
>
> I need to compute the average time between Orange & Blue clicks but I need
> to window on the timestamp of the green button click.
>
> In requirements terms: Compute average time between Orange and Blue for
> all Green clicks that occur on Monday. (So User could click Orange on
> Sunday, Green on Monday and Blue on Tuesday.)
>
> One strategy is to try to use a single SessionWindow to capture the entire
> user session; then calculate the *span* (time between Orange and Blue
> clicks) and *then* compute average of all spans.
>
> To do this the *span*/counts would have to all "land" in a window
> representing Monday.
>
> If I use a SessionWindow w/ TimestampCombiner/EARLIEST then I can make
> sure they land in this window using .outputWithTimestamp without worrying
> that I'll be regressing the event timestamp.
>
> Except when I use this Combiner/EARLIEST strategy my watermark is held up
> substantially (and incidentally seems to drag the pipeline).
>
> But if I use Beam's default TimestampCombiner/END_OF_WINDOW then I won't
> be able to output the *span* result at a timestamp representing the Green
> click.
>
> So a single SessionWindow seems out. (Unless I'm missing something.)
>
> The only other strategy I can conceive of at the moment is to capture
> *two* sessions, representing each "leg" of the overall session. One
> windows on the [Orange,Green] (using END_OF_WINDOW); the other [Green,Blue]
> (using EARLIEST). Then I can "join" these two to get both legs together and
> compute the overall span. This seems like a quite complicated way to solve
> this (simple?) problem.
>
> Thoughts? What am I missing?
>


Re: [discuss] Using a logger hierarchy in Python

2019-11-13 Thread Pablo Estrada
Okay, I've just gone and done this for most modules:

Runners modules: https://github.com/apache/beam/pull/10097
IO modules: https://github.com/apache/beam/pull/10099
Other modules (testing, utils): https://github.com/apache/beam/pull/10100

I imagine the trickier one will be runners, since there's a few files that
do handling of logs coming from the SDK to the runner. Other than that, the
changes should be relatively straightforward.
Best
-P.

On Wed, Nov 13, 2019 at 3:45 PM Thomas Weise  wrote:

> +1 for using a logger hierarchy.
>
> Will this include the ability for users to configure logging via pipeline
> options?
>
>
> On Wed, Nov 13, 2019 at 11:04 AM Chad Dombrova  wrote:
>
>>
>> On Wed, Nov 13, 2019 at 10:52 AM Robert Bradshaw 
>> wrote:
>>
>>> I would be in favor of using module-level loggers as well.
>>
>>
>> +1
>>
>>
>


Re: [Discuss] Beam mascot

2019-11-13 Thread Valentyn Tymofieiev
I like the firefly sketch a lot, it's my favorite so far.

On Wed, Nov 13, 2019 at 12:58 PM Robert Bradshaw 
wrote:

> #37 from the sketches was the cuttlefish, which would put it at (with
> 4 votes) the most popular so far. I do like the firefly too.
>
> On Wed, Nov 13, 2019 at 12:03 PM Gris Cuevas  wrote:
> >
> > Hi everyone, so exciting to see this convo taking off!
> >
> > I loved Alex's firefly! -- it can have so many cool variations, and as a
> stuffed animal is very original.
> >
> > Other ideas I had are a caterpillar because it looks like a data
> pipeline, lol or the beaver!
> >
> > Feedback on the current sketches.
> > - They resemble a lot either the Octocat or Totoro [1]
> > - I'd like to see something that is completely new and original,
> pancakes from gRPC is an example[2]
> > - Something more caricaturesque is better, since we can dress it up and
> modify it
> >
> > To move forward, it seems that the animals that were winners in this
> thread are:
> >
> > Beaver (3)
> > Firefly (3)
> > Lemur or votes on sketches (3)
> > Cuttlefish (2)
> > Hedgehog (1)
> > Salmon (1)
> >
> > So let's focus the design proposals on the three winners: beaver,
> firefly and lemur.
> > I'd like to see more options on beavers and fireflies, the current
> sketch options I think are based on the cuttlefish and the lemur (?)
> >
> > I think it's a good idea to get sketches from multiple designers, since
> like someone else pointed out, we'll get variations based on their personal
> styles, and someone else mentioned here that we have teams/companies
>  with designers in their teams,
> so let's take advantage of that as well :)
> >
> > I'd suggest to fork the conversation into a call for sketched and we
> vote on that.
> >
> > [1]
> https://www.google.com/search?q=totoro=ACYBGNTFW6vq76cHp05g4vBaR-SVJNI1iw:1573674471669=lnms=isch=X=0ahUKEwiTwICf-uflAhUKHqwKHVtzAykQ_AUIEigB=1440=735
> > [2]
> https://www.google.com/search?q=pancakes+grpc=isch=2ahUKEwixgqjV-uflAhUBOawKHX2pAfsQ2-cCegQIABAA=pancakes+grpc_l=img.3...13774.22674..22826...10.0..0.112.1818.13j6..01..gws-wiz-img.10..35i39j0j0i67j35i362i39j0i10j0i5i30j0i8i30.hNtaBfSYNv8=WV7MXfHxIYHysAX90obYDw=735=1440
>


Re: [Discuss] Beam mascot

2019-11-13 Thread Robert Bradshaw
#37 from the sketches was the cuttlefish, which would put it at (with
4 votes) the most popular so far. I do like the firefly too.

On Wed, Nov 13, 2019 at 12:03 PM Gris Cuevas  wrote:
>
> Hi everyone, so exciting to see this convo taking off!
>
> I loved Alex's firefly! -- it can have so many cool variations, and as a 
> stuffed animal is very original.
>
> Other ideas I had are a caterpillar because it looks like a data pipeline, 
> lol or the beaver!
>
> Feedback on the current sketches.
> - They resemble a lot either the Octocat or Totoro [1]
> - I'd like to see something that is completely new and original, pancakes 
> from gRPC is an example[2]
> - Something more caricaturesque is better, since we can dress it up and 
> modify it
>
> To move forward, it seems that the animals that were winners in this thread 
> are:
>
> Beaver (3)
> Firefly (3)
> Lemur or votes on sketches (3)
> Cuttlefish (2)
> Hedgehog (1)
> Salmon (1)
>
> So let's focus the design proposals on the three winners: beaver, firefly and 
> lemur.
> I'd like to see more options on beavers and fireflies, the current sketch 
> options I think are based on the cuttlefish and the lemur (?)
>
> I think it's a good idea to get sketches from multiple designers, since like 
> someone else pointed out, we'll get variations based on their personal 
> styles, and someone else mentioned here that we have teams/companies with 
> designers in their teams, so let's take advantage of that as well :)
>
> I'd suggest to fork the conversation into a call for sketched and we vote on 
> that.
>
> [1] 
> https://www.google.com/search?q=totoro=ACYBGNTFW6vq76cHp05g4vBaR-SVJNI1iw:1573674471669=lnms=isch=X=0ahUKEwiTwICf-uflAhUKHqwKHVtzAykQ_AUIEigB=1440=735
> [2] 
> https://www.google.com/search?q=pancakes+grpc=isch=2ahUKEwixgqjV-uflAhUBOawKHX2pAfsQ2-cCegQIABAA=pancakes+grpc_l=img.3...13774.22674..22826...10.0..0.112.1818.13j6..01..gws-wiz-img.10..35i39j0j0i67j35i362i39j0i10j0i5i30j0i8i30.hNtaBfSYNv8=WV7MXfHxIYHysAX90obYDw=735=1440


[PROPOSAL] Add support for writing flattened schemas to pubsub

2019-11-13 Thread Brian Hulette
I've been looking into adding support for writing (i.e. INSERT INTO
statements) for the pubsub DDL, which currently only supports reading. This
DDL requires the defined schema to have exactly three fields:
event_timestamp, attributes, and payload, corresponding to the fields in
PubsubMessage (event_timestamp can be configured to come from either
publish time or from the value in a particular attribute, and the payload
must be a ROW with a schema corresponding to the JSON written to the pubsub
topic).

When writing, I think it's a bit onerous to require users to use exactly
these three top-level fields. For example imagine we have two topics:
people, and eligible_voters. people contains a stream of {"name": "..",
age: XX} items, and we want eligible_voters to contain a stream with
{"name": ".."} items corresponding to people with age >= 18. With the
current approach this would look like:

```
CREATE TABLE people (
event_timestamp TIMESTAMP,
attributes MAP,
payload ROW
  )
  TYPE 'pubsub'
  LOCATION 'projects/my-project/topics/my-topic'

CREATE TABLE eligible_voters 

INSERT INTO eligible_voters (
  SELECT
ROW(payload.name AS name) AS payload
FROM people
WHERE payload.age >= 18
)
```

This query has lots of renaming and boiler-plate, and furthermore, ROW(..)
doesn't seem well supported in Calcite, I had to jump through some hoops
(like calling my fields $col1), to make something like this work.
I think it would be great if we could instead handle flattened,
payload-only schemas. We would still need to have a separate
event_timestamp field, but everything else would map to a field in the
payload. With this change the previous example would look like:

```
CREATE TABLE people (
event_timestamp TIMESTAMP,
name VARCHAR,
age INTEGER
  )
  TYPE 'pubsub'
  LOCATION 'projects/my-project/topics/my-topic'

CREATE TABLE eligible_voters ...

INSERT INTO eligible_voters (
  SELECT
name
FROM people
WHERE age >= 18
)
```

This is much cleaner! But the overall approach has an obvious downside -
with the tabke definition written like this it's impossible to read from or
write to the message attributes (unless one is being used for
event_timestamp). I think we can mitigate this in two ways:
1. In the future, this flattened schema definition could be represented as
something like a view on the expanded definition. We could allow users to
provide some metadata indicating that a column should correspond to a
particular attribute, rather than a field in the payload. To me this feels
similar to how you indicate a column should be indexed in a database. It's
data that's relevant to the storage system, and not to the actual query, so
it belongs in CREATE TABLE.
2. In the meantime, we can continue to support the current syntax. If a
pubsub table definition has *exactly* three fields with the expected types:
event_timestamp TIMESTAMP, payload ROW<...>, and attributes MAP, we can continue to use the current codepath. Otherwise we will
use the flattened schema.

Please let me know if anyone has any objections to this approach, otherwise
I plan on moving forward with it - I should have a PR up shortly.

Brian


Re: [Discuss] Beam mascot

2019-11-13 Thread Gris Cuevas
Hi everyone, so exciting to see this convo taking off! 

I loved Alex's firefly! -- it can have so many cool variations, and as a 
stuffed animal is very original. 

Other ideas I had are a caterpillar because it looks like a data pipeline, lol 
or the beaver! 

Feedback on the current sketches. 
- They resemble a lot either the Octocat or Totoro [1]
- I'd like to see something that is completely new and original, pancakes from 
gRPC is an example[2]
- Something more caricaturesque is better, since we can dress it up and modify 
it

To move forward, it seems that the animals that were winners in this thread 
are: 

Beaver (3)
Firefly (3)
Lemur or votes on sketches (3)
Cuttlefish (2)
Hedgehog (1)
Salmon (1) 

So let's focus the design proposals on the three winners: beaver, firefly and 
lemur. 
I'd like to see more options on beavers and fireflies, the current sketch 
options I think are based on the cuttlefish and the lemur (?)

I think it's a good idea to get sketches from multiple designers, since like 
someone else pointed out, we'll get variations based on their personal styles, 
and someone else mentioned here that we have teams/companies with designers in 
their teams, so let's take advantage of that as well :) 

I'd suggest to fork the conversation into a call for sketched and we vote on 
that. 

[1] 
https://www.google.com/search?q=totoro=ACYBGNTFW6vq76cHp05g4vBaR-SVJNI1iw:1573674471669=lnms=isch=X=0ahUKEwiTwICf-uflAhUKHqwKHVtzAykQ_AUIEigB=1440=735
[2] 
https://www.google.com/search?q=pancakes+grpc=isch=2ahUKEwixgqjV-uflAhUBOawKHX2pAfsQ2-cCegQIABAA=pancakes+grpc_l=img.3...13774.22674..22826...10.0..0.112.1818.13j6..01..gws-wiz-img.10..35i39j0j0i67j35i362i39j0i10j0i5i30j0i8i30.hNtaBfSYNv8=WV7MXfHxIYHysAX90obYDw=735=1440


Re: Pipeline AttributeError on Python3

2019-11-13 Thread Valentyn Tymofieiev
I also opened https://issues.apache.org/jira/browse/BEAM-8651 to track this
issue and any recommendation for the users that will come out of it.

On Thu, Nov 7, 2019 at 6:25 PM Valentyn Tymofieiev 
wrote:

>  I think we have heard of this issue from the same source:
>
> This looks exactly like a race condition that we've encountered on Python
>> 3.7.1: There's a bug in some older 3.7.x releases that breaks the
>> thread-safety of the unpickler, as concurrent unpickle threads can access a
>> module before it has been fully imported. See
>> https://bugs.python.org/issue34572 for more information.
>>
>> The traceback shows a Python 3.6 venv so this could be a different issue
>> (the unpickle bug was introduced in version 3.7). If it's the same bug then
>> upgrading to Python 3.7.3 or higher should fix that issue. One potential
>> workaround is to ensure that all of the modules get imported during the
>> initialization of the sdk_worker, as this bug only affects imports done by
>> the unpickler.
>
>
> The symptoms do sound similar, so I would try to reproduce your issue on
> 3.7.3 and see if it is gone, or try to reproduce
> https://bugs.python.org/issue34572 in the version of interpreter you use.
> If this doesn't help, you can try to reproduce the race using your input.
>
> To get the output of serialized do fn, you could do the following:
> 1. Patch https://github.com/apache/beam/pull/10036.
> 2. Set logging level to DEBUG, see:
> https://github.com/apache/beam/blob/90d587843172143c15ed392513e396b74569a98c/sdks/python/apache_beam/examples/wordcount.py#L137
> .
> 3. Check for log output for payload of your transform, it may look like:
>
> transforms {
>   key: "ref_AppliedPTransform_write/Write/WriteImpl/PreFinalize_42"
>   value {
> spec {
>   urn: "beam:transform:pardo:v1"
>   payload: "\n\347\006\n\275\006\n
> beam:dofn:pickled_python_info:v1\032\230\006eNptU1tPFTEQPgqIFFT
> 
>
> Then you can extract the output of pickled fn:
>
> from apache_beam.utils import proto_utils
> from apache_beam.portability.api import beam_runner_api_pb2
> from apache_beam.internal import pickler
>
> payload = b'\n\347\006\n\275\006\n
> beam:dofn:pickled_python_info:v1\032\230\006eNptU1tPFTEQPgqIFFT...'
> pardo_payload = proto_utils.parse_Bytes(x,
> beam_runner_api_pb2.ParDoPayload)
> pickled_fn = pardo_payload.do_fn.spec.payload
>
> pickler.loads(pickle_fn) # Presumably the race happens here when
> unpickling one of your transforms
> (pricingrealtime.aggregation.aggregation_transform).
>
>
> On Wed, Nov 6, 2019 at 10:54 PM Rakesh Kumar  wrote:
>
>> Thanks Valentyn,
>>
>> Aggregation_transform.py doesn't have any transformation method which
>> extends beam.DoFn. We are using plain python method which we passed in
>> beam.Map().  I am not sure how to get the dump of serialized_fn. Can you
>> please let me the process?
>>
>> I also heard that some people ran into this issue on Python 3.7.1 but the
>> same issue is not present on Python 3.7.3. Can you confirm this?
>>
>>
>>
>> On Mon, Oct 28, 2019 at 5:00 PM Valentyn Tymofieiev 
>> wrote:
>>
>>> +user@, bcc: dev@
>>> https://issues.apache.org/jira/browse/BEAM-6158 may be contributing to
>>> this issue, although we saw instances of this bug in exactly opposite
>>> scenarios - when pipeline was defined *in one file*, but not in
>>> multiple files.
>>>
>>> Could you try replacing instances of super() in
>>> aggregation_transform.py  as done in
>>> https://github.com/apache/beam/pull/9513 and see if this issue is still
>>> reproducible?
>>>
>>> If that doesn't work, I would try to get the dump of serialized_fn, and
>>> try to reproduce the issue in isolated environment, such as:
>>>
>>> form apache_beam.internal import pickler
>>> serialized_fn = "..content.."
>>> pickler.loads(serialized_fn)
>>>
>>> then I would try to trim the doFn in the example to a
>>> minimally-reproducible example. It could be another issue with dill
>>> dependency.
>>>
>>>
>>> On Mon, Oct 28, 2019 at 2:48 PM Rakesh Kumar 
>>> wrote:
>>>
 Hi All,

 We have noticed a weird intermittent issue on Python3 but we don't run
 into this issue on python2. Sometimes when we are trying to submit the
 pipeline, we get AttributeError (Check the stack trace below).  we have
 double-checked and we do find the attribute/methods are present in the
 right module and in right place but somehow the pipeline still complains
 about it. In some cases, we refer methods before their definition. We tried
 to reorder the method definition but that didn't help at all.

 We don't see the same issue when the entire pipeline is defined in one
 file. Also, note that this doesn't happen all the time when we submit the
 pipeline, so I feel it is some kind of race condition. When we enable the
 worker recycle logic it happens most of the time when sdk worker is
 recycled.

 Some more information about the environment:
 Python version: 

org.apache.beam.sdk.io.clickhouse.AtomicInsertTest.testIdempotentInsert fails

2019-11-13 Thread Tomo Suzuki
Hi Beam developers,

The org.apache.beam.sdk.io.clickhouse.AtomicInsertTest fails in my
development environment. Created
https://issues.apache.org/jira/browse/BEAM-8650 The error message indicates
that ClickHouse (which I'm not familiar with) is trying to connect (random)
strange IP address for Zookeeper.

Does anybody know how to solve this?

-- 
Regards,
Tomo


Re: Type of builtin PTransform/PCollection metrics

2019-11-13 Thread Robert Bradshaw
On Wed, Nov 13, 2019 at 10:56 AM Maximilian Michels  wrote:
>
> > Are you referring specifically to?
> > * beam:metric:element_count:v1
> > * beam:metric:pardo_execution_time:start_bundle_msecs:v1
> > * beam:metric:pardo_execution_time:process_bundle_msecs:v1
> > * beam:metric:pardo_execution_time:finish_bundle_msecs:v1
> > * beam:metric:ptransform_execution_time:total_msecs:v1
>
> Yes.
>
> > Would the gauge be grouped per element or per bundle?
>
> Per bundle. These are reported when the bundle finishes.
>
> > If grouped at the bundle level the metrics are arbitrary to the user since 
> > the bundle size is chosen by the runner.
>
> Not necessarily because the bundle size is typically fixed (at least in
> the Flink Runner). In any case, it provides information about how much
> activity occurred in a bundle which is useful to know.
>
> > There is also a very significant overhead for tracking low level metrics
>
> I can't imagine tracking a per-bundle element count or execution time is
> that expensive. Maybe I'm wrong.

These are element counts and execution time per operation (e.g. per
DoFn). FWIW, process_bundle_msecs is mis-named, it should be
"process_element" or just "process" as it refers to the time spend in
that method. beam:metric:ptransform_execution_time:total_msecs:v1
seems redundant with the sum of the others. (Unless it includes
setup/teardown, which it seems are missing as separate values?)

I think what you want is new metrics associated with the bundle +
executable stage as a whole. Distribution metrics would make the most
sense here. (Gauge metrics would just report the value of whatever
bundle finished last...) I don't know how they'd be named, perhaps
they'd be labeled with the full set of transforms that the stage
contains (which is of course not stable)?

> On 13.11.19 18:58, Luke Cwik wrote:
> > Are you referring specifically to?
> > * beam:metric:element_count:v1
> > * beam:metric:pardo_execution_time:start_bundle_msecs:v1
> > * beam:metric:pardo_execution_time:process_bundle_msecs:v1
> > * beam:metric:pardo_execution_time:finish_bundle_msecs:v1
> > * beam:metric:ptransform_execution_time:total_msecs:v1
> >
> > Would the gauge be grouped per element or per bundle?
> > If grouped at the bundle level the metrics are arbitrary to the user
> > since the bundle size is chosen by the runner.
> > If grouped at the element level then only a few of the metrics make sense:
> > * element_count becomes number of outputs per input element
> > * process_bundle_msecs becomes amount of time to process a single input
> > element (does this still apply to elements that can be split?)
> >
> > There is also a very significant overhead for tracking low level metrics
> > in great detail which is why timing is done through a sampling
> > technique. I'm sure if we could do it cheaply then it would make sense
> > to get those metrics. This is also a place where we want each SDK to
> > implement these metrics so complexity may slow down SDK authors from
> > developing them.
> >
> >
> > On Wed, Nov 13, 2019 at 5:13 AM Maximilian Michels  > > wrote:
> >
> > Hi,
> >
> > We have a series of builtin PTransform/PCollection metrics:
> > 
> > https://github.com/apache/beam/blob/808cb35018cd228a59b152234b655948da2455fa/model/pipeline/src/main/proto/metrics.proto#L74
> >
> > Why are those of counters ("beam:metrics:sum_int_64")? I think the
> > better default type for most users would be gauge
> > ("beam:metrics:latest_int_64").
> >
> > I understand that counters are useful because they retain the sum of
> > all
> > reported values, but for getting an idea about the deviation of a
> > metric, gauges could be more useful.
> >
> > Perhaps we could make this configurable?
> >
> > Thanks,
> > Max
> >


Re: [Discuss] Beam mascot

2019-11-13 Thread Jozef Vilcek
Interesting topic :) I kind of liked also Alex's firefly. The impression it
made on me. To drive it further, hands on hips make strong / serious pose,
hovering in the air above all.
I would put logo on the him, to become is torso / body or a dress. Logo
with a big B on it almost looks like superhero dress / cap ( which is
popular tune these "days" :) ).

To summarize, I would love mascot not only looks nice, but be deadly
serious about doing cool stuff and pulling superhero features out of it's
sleeve :D

On Wed, Nov 13, 2019 at 3:17 PM Maximilian Michels  wrote:

> > Same. What about 37 with the eyes from 52?
>
> +1 That would combine two ideas: (1) "Beam" eyes and (2) sea animal.
>
> We could set this as the working idea and build a logo based off that.
>
> On 12.11.19 22:41, Robert Bradshaw wrote:
> > On Tue, Nov 12, 2019 at 1:29 PM Aizhamal Nurmamat kyzy
> >  wrote:
> >> 52 and 37 for me. I don't know what 53 is, but I like it too.
> >
> > Same. What about 37 with the eyes from 52?
> >
> >> On Tue, Nov 12, 2019 at 9:19 AM Maximilian Michels 
> wrote:
> >>>
> >>> More logos :D
> >>>
> >>> (35) - (37), (51), (48), (53) go into the direction of cuttlefish.
> >>>
> >>>   From the new ones I like (52) because of the eyes. (53) If we want to
> >>> move into the direction of a water animal, the small ones are quite
> >>> recognizable. Also, (23) and (36) are kinda cute.
> >>>
> >>> Cheers,
> >>> Max
> >>>
> >>> On 12.11.19 02:09, Robert Bradshaw wrote:
>  Cuttlefish are cool, but I don't know how recognizable they are, and
>  they don't scream "fast" or "stream-y" or "parallel processing" to me
>  (not that that's a requirement...) I like that firefly, nice working
>  the logo into the trailing beam of light.
> 
>  On Mon, Nov 11, 2019 at 5:03 PM Udi Meiri  wrote:
> >
> > Dumbo octopus anyone? https://youtu.be/DmqikqvLLLw?t=263
> >
> >
> > On Mon, Nov 11, 2019 at 2:06 PM Luke Cwik  wrote:
> >>
> >> The real answer, what cool schwag can we get based upon the mascot.
> >>
> >> On Mon, Nov 11, 2019 at 2:04 PM Kenneth Knowles 
> wrote:
> >>>
> >>> I'm with Luke on cuttlefish. We can have color changing schwag...
> >>>
> >>> On Mon, Nov 11, 2019 at 9:57 AM David Cavazos 
> wrote:
> 
>  I like 9 as well. Not related to anything, but chinchillas are
> also cute.
> 
>  On Mon, Nov 11, 2019 at 8:25 AM Luke Cwik 
> wrote:
> >
> > 9 and 7 for me (in that order)
> >
> > On Mon, Nov 11, 2019 at 7:18 AM Maximilian Michels <
> m...@apache.org> wrote:
> >>
> >> Here are some sketches from the designer. I've put them all in
> one image
> >> and added labels to make it easier to refer to them. My
> favorites are
> >> (2) and (9).
> >>
> >> Cheers,
> >> Max
> >>
> >> On 09.11.19 19:43, Maximilian Michels wrote:
> >>> I like that sketch! The designer has also sent me some rough
> sketches,
> >>> I'll share these here when I get consent from the designer.
> >>>
> >>> -Max
> >>>
> >>> On 09.11.19 19:22, Alex Van Boxel wrote:
>  +1 for a FireFly. Ok, I can't draw, but it's to make a point
> ;-)
> 
>  Fire2.jpg
> 
> 
> 
>  _/
>  _/ Alex Van Boxel
> 
> 
>  On Sat, Nov 9, 2019 at 12:26 AM Kyle Weaver <
> kcwea...@google.com
>  > wrote:
> 
>    Re fish: The authors of the Streaming Systems went with
> trout, but
>    the book mentioned a missed opportunity to make their
> cover a "robot
>    dinosaur with a Scottish accent." Perhaps that idea is
> worth
>  revisiting?
> 
>    On Fri, Nov 8, 2019 at 3:20 PM Luke Cwik <
> lc...@google.com
>    > wrote:
> 
>    My top suggestion is a cuttlefish.
> 
>    On Thu, Nov 7, 2019 at 10:28 PM Reza Rokni <
> r...@google.com
>    > wrote:
> 
>    Salmon... they love streams? :-)
> 
>    On Fri, 8 Nov 2019 at 12:00, Kenneth Knowles
>    mailto:k...@apache.org>>
> wrote:
> 
>    Agree with Aizhamal that it doesn't matter
> if they are
>    taken if they are not too close in space to
> Beam: Apache
>    projects, big data, log processing, stream
> processing.
>    Not a legal opinion, but an aesthetic
> opinion. So I
>    would keep Lemur as a possibility.
> Definitely nginx is
> 

Re: Make environment_id a top level attribute of PTransform

2019-11-13 Thread Chamikara Jayalath
On Wed, Nov 13, 2019 at 10:42 AM Luke Cwik  wrote:

> The original ideology was around having only those attributes that
> required to set it would contain the attribute but once something becomes
> common enough it makes sense to have it as an optional parameter so +1.
>
> Are there areas where the environment id will still exist outside of a
> PTransform?
>

Only scenario I can think of is, support for first order functions (UDFs)
in cross-language transforms where a function might have to be executed in
a different environment than the PTransform. But I don't think we should
make the very common case of having both PTransforms and associated
functions in the same environment hard/error-prone due to this. We could
later introduce specifying environment along with associated functions (and
any other properties we need) when we design support for first order
functions in cross-language transforms.

Thanks,
Cham


>
>
> On Tue, Nov 12, 2019 at 9:25 PM Chamikara Jayalath 
> wrote:
>
>> This was discussed in a JIRA [1] but don't think this was mentioned in
>> the dev list.
>>
>> Not having environment_id as a top level attribute of PTransform [2]
>> makes it difficult to track the Environment [3] a given PTransform should
>> be executed in. For example, in Dataflow, we have to fork code in several
>> places to filter out the Environment from a given PTransform proto.
>>
>> Making environment_id a top level attribute of PTransform and removing it
>> from various payload types will make tracking environments easier. Also
>> code will become less error prone since we don't have to fork for all
>> possible payload types.
>>
>> Any objections to doing this change ?
>>
>> Thanks,
>> Cham
>>
>> [1] https://issues.apache.org/jira/browse/BEAM-7850
>> [2]
>> https://github.com/apache/beam/blob/master/model/pipeline/src/main/proto/beam_runner_api.proto#L99
>> [3]
>> https://github.com/apache/beam/blob/master/model/pipeline/src/main/proto/beam_runner_api.proto#L1021
>>
>


Re: [discuss] Using a logger hierarchy in Python

2019-11-13 Thread Chad Dombrova
On Wed, Nov 13, 2019 at 10:52 AM Robert Bradshaw 
wrote:

> I would be in favor of using module-level loggers as well.


+1


Re: Type of builtin PTransform/PCollection metrics

2019-11-13 Thread Maximilian Michels

Are you referring specifically to?
* beam:metric:element_count:v1
* beam:metric:pardo_execution_time:start_bundle_msecs:v1
* beam:metric:pardo_execution_time:process_bundle_msecs:v1
* beam:metric:pardo_execution_time:finish_bundle_msecs:v1
* beam:metric:ptransform_execution_time:total_msecs:v1


Yes.


Would the gauge be grouped per element or per bundle?


Per bundle. These are reported when the bundle finishes.


If grouped at the bundle level the metrics are arbitrary to the user since the 
bundle size is chosen by the runner.


Not necessarily because the bundle size is typically fixed (at least in 
the Flink Runner). In any case, it provides information about how much 
activity occurred in a bundle which is useful to know.


There is also a very significant overhead for tracking low level metrics 


I can't imagine tracking a per-bundle element count or execution time is 
that expensive. Maybe I'm wrong.


-Max

On 13.11.19 18:58, Luke Cwik wrote:

Are you referring specifically to?
* beam:metric:element_count:v1
* beam:metric:pardo_execution_time:start_bundle_msecs:v1
* beam:metric:pardo_execution_time:process_bundle_msecs:v1
* beam:metric:pardo_execution_time:finish_bundle_msecs:v1
* beam:metric:ptransform_execution_time:total_msecs:v1

Would the gauge be grouped per element or per bundle?
If grouped at the bundle level the metrics are arbitrary to the user 
since the bundle size is chosen by the runner.

If grouped at the element level then only a few of the metrics make sense:
* element_count becomes number of outputs per input element
* process_bundle_msecs becomes amount of time to process a single input 
element (does this still apply to elements that can be split?)


There is also a very significant overhead for tracking low level metrics 
in great detail which is why timing is done through a sampling 
technique. I'm sure if we could do it cheaply then it would make sense 
to get those metrics. This is also a place where we want each SDK to 
implement these metrics so complexity may slow down SDK authors from 
developing them.



On Wed, Nov 13, 2019 at 5:13 AM Maximilian Michels > wrote:


Hi,

We have a series of builtin PTransform/PCollection metrics:

https://github.com/apache/beam/blob/808cb35018cd228a59b152234b655948da2455fa/model/pipeline/src/main/proto/metrics.proto#L74

Why are those of counters ("beam:metrics:sum_int_64")? I think the
better default type for most users would be gauge
("beam:metrics:latest_int_64").

I understand that counters are useful because they retain the sum of
all
reported values, but for getting an idea about the deviation of a
metric, gauges could be more useful.

Perhaps we could make this configurable?

Thanks,
Max



Re: [discuss] Using a logger hierarchy in Python

2019-11-13 Thread Robert Bradshaw
I would be in favor of using module-level loggers as well. I think
per-class would be overkill and unlike Java not everything is in a
class, as well as being more conventional in Python (where modules are
generally seen as the unit of compilation, vs. Java where classes are
the unit of compilation and there's really no analogous of modules
(short of say a static-member only class, packages aren't the same
thing)).

On Wed, Nov 13, 2019 at 9:22 AM Luke Cwik  wrote:
>
> That doesn't seem like a very invasive change so if we adopt it we should 
> adopt it everywhere in the same CL so people see the common pattern and use 
> it.
>
> I'm for using a named logger and would rather that it is per class instead of 
> per module since many of the modules have lots of classes but +1 from me 
> overall.
>
> On Tue, Nov 12, 2019 at 4:37 PM Pablo Estrada  wrote:
>>
>> Hi all,
>> as of today, the Python SDK uses the root logger wherever we log. This means 
>> that it's impossible to have different logging levels depending on the 
>> section of the code that we want to debug most.
>>
>> I have been doing some work on the FnApiRunner, and adding logging for it. I 
>> would like to start using a logger hierarchy, and slowly transition the rest 
>> of the project to use per-module loggers.
>>
>> On each module, we could have a line like so:
>>
>> _LOGGER = logging.getLogger(__name__)
>>
>> and simply log everything on that _LOGGER. Is that an acceptable thing to do 
>> for everyone?
>>
>> If I see no objections, I will change the FnApiRunner to use a logger like 
>> this, and change other sections of the code as I interact with them.
>> Best
>> -P.


Re: Make environment_id a top level attribute of PTransform

2019-11-13 Thread Luke Cwik
The original ideology was around having only those attributes that required
to set it would contain the attribute but once something becomes common
enough it makes sense to have it as an optional parameter so +1.

Are there areas where the environment id will still exist outside of a
PTransform?


On Tue, Nov 12, 2019 at 9:25 PM Chamikara Jayalath 
wrote:

> This was discussed in a JIRA [1] but don't think this was mentioned in the
> dev list.
>
> Not having environment_id as a top level attribute of PTransform [2] makes
> it difficult to track the Environment [3] a given PTransform should be
> executed in. For example, in Dataflow, we have to fork code in several
> places to filter out the Environment from a given PTransform proto.
>
> Making environment_id a top level attribute of PTransform and removing it
> from various payload types will make tracking environments easier. Also
> code will become less error prone since we don't have to fork for all
> possible payload types.
>
> Any objections to doing this change ?
>
> Thanks,
> Cham
>
> [1] https://issues.apache.org/jira/browse/BEAM-7850
> [2]
> https://github.com/apache/beam/blob/master/model/pipeline/src/main/proto/beam_runner_api.proto#L99
> [3]
> https://github.com/apache/beam/blob/master/model/pipeline/src/main/proto/beam_runner_api.proto#L1021
>


Re: Date/Time Ranges & Protobuf

2019-11-13 Thread Luke Cwik
I do agree that Apache Beam can represent dates and times with arbitrary
precision and can do it many different ways.

My argument has always been should around whether we restrict this range to
a common standard to increase interoperability across other systems. For
example, SQL database servers have varying degrees as to what ranges they
support:
* Oracle 10[1]: 0001-01-01 to -12-31
* Oracle 11g[2]: Julian era, ranging from January 1, 4712 BCE through
December 31,  CE (Common Era, or 'AD'). Unless BCE ('BC' in the format
mask)
* MySQL[3]: '1000-01-01 00:00:00' to '-12-31 23:59:59'
* Microsoft SQL:  January 1, 1753, through December 31,  for
datetime[4] and January 1,1 CE through December 31,  CE for datetime2[5]

The common case of the global window containing timestamps that are before
and after all of these supported ranges above means that our users can't
represent a global window within a database using its common data types.

1: https://docs.oracle.com/javadb/10.8.3.0/ref/rrefdttlimits.html
2:
https://docs.oracle.com/cd/B28359_01/server.111/b28318/datatype.htm#CNCPT413
3: https://dev.mysql.com/doc/refman/8.0/en/datetime.html
4:
https://docs.microsoft.com/en-us/sql/t-sql/data-types/datetime-transact-sql?view=sql-server-ver15
5:
https://docs.microsoft.com/en-us/sql/t-sql/data-types/datetime2-transact-sql?view=sql-server-ver15

On Wed, Nov 13, 2019 at 3:28 AM Jan Lukavský  wrote:

> Hi,
>
> just an idea on these related topics that appear these days - it might
> help to realize, that what we actually don't need a full arithmetic on
> timestamps (Beam model IMHO doesn't need to know exactly what is the exact
> difference of two events). What we actually need is a slightly simplified
> algebra. Given two timestamps T1 and T2 and a "duration" (a different type
> from timestamp), we need operations (not 100% sure that this is exhaustive,
> but seems to be):
>
>  - is_preceding(T1, T2): bool
>
>- important !is_preceding(T1, T2) does NOT imply that is_preceding(T2,
> T1) - !is_preceding(T1, T2) && !is_preceding(T2, T1) would mean events are
> _concurrent_
>
>- this relation has to be also antisymmetric
>
>- given this function we can construct a comparator, where multiple
> distinct timestamps can be "equal" (or with no particular ordering, which
> is natural property of time)
>
>  - min_timestamp_following(T1, duration): T2
>
>- that would return a timestamp for which is_preceding(T1 + duration,
> T2) would return true and no other timestamp X would exist for which
> is_preceding(T1 + duration, X) && is_preceding(X, T2) would be true
>
>- actually, this function would serve as the definition for the
> duration object
>
> If we can supply this algebra, it seems that we can use any representation
> of timestamps and intervals. It might be (probably) even possible to let
> user specify his own type used as timestamps and durations, which could
> solve the issues of not currently being able to correctly represent
> timestamps lower than Long.MIN_VALUE (although we can get data for that low
> timestamps - cosmic microwave background being one example :)). Specifying
> this algebra actually probably boils down to proposal (3) in Robert's
> thread [1].
>
> Just my 2 cents.
>
> Jan
>
> [1]
> https://lists.apache.org/thread.html/1672898393cb0d54a77a879be0fb5725902289a3e5063d0f9ec36fe1@%3Cdev.beam.apache.org%3E
> On 11/13/19 10:11 AM, jincheng sun wrote:
>
> Thanks for bringing up this discussion @Luke.
>
> As @Kenn mentioned, in Beam we have defined the constants value for the
> min/max/end of global window. I noticed that
> google.protobuf.Timestamp/Duration is only used in window definitions,
> such as FixedWindowsPayload, SlidingWindowsPayload, SessionsPayload, etc.
>
> I think that both RFC 3339 and Beam's current implementation are big
> enough to express a common window definitions. But users can really
> define a window size that outside the scope of the RFC 3339. Conceptually,
> we should not limit the time range for window(although I think the range of
> RPC 3339 is big enough in most cases).
>
> To ensure that people well know the background of the discussion, hope you
> don't mind that I put the original conversion thread[1] here.
>
> Best,
> Jincheng
>
> [1] https://github.com/apache/beam/pull/10041#discussion_r344380809
>
> Robert Bradshaw  于2019年11月12日周二 下午4:09写道:
>
>> I agree about it being a tagged union in the model (together with
>> actual_time(...) - epsilon). It's not just a performance hack though,
>> it's also (as discussed elsewhere) a question of being able to find an
>> embedding into existing datetime libraries. The real question here is
>> whether we should limit ourselves to just these 1 years AD, or
>> find value in being able to process events for the lifetime of the
>> universe (or, at least, recorded human history). Artificially limiting
>> in this way would seem surprising to me at least.
>>
>> On Mon, Nov 11, 2019 at 11:58 PM Kenneth 

Re: Cleaning up Approximate Algorithms in Beam

2019-11-13 Thread Reuven Lax
On Wed, Nov 13, 2019 at 9:58 AM Ahmet Altay  wrote:

> Thank you for writing this summary.
>
> On Tue, Nov 12, 2019 at 6:35 PM Reza Rokni  wrote:
>
>> Hi everyone;
>>
>> TL/DR : Discussion on Beam's various Approximate Distinct Count
>> algorithms.
>>
>> Today there are several options for Approximate Algorithms in Apache Beam
>> 2.16 with HLLCount being the most recently added. Would like to canvas
>> opinions here on the possibility of rationalizing these API's by removing
>> obsolete / less efficient implementations.
>> The current situation:
>>
>> There are three options available to users: ApproximateUnique.java
>> ,
>> ApproximateDistinct.java
>> 
>> and HllCount.java
>> .
>> A quick summary of these API's as I understand them:
>>
>> HllCount.java
>> :
>> Marked as @Experimental
>>
>> PTransforms to compute HyperLogLogPlusPlus (HLL++) sketches on data
>> streams based on the ZetaSketch 
>> implementation.Detailed design of this class, see
>> https://s.apache.org/hll-in-beam.
>>
>> ApproximateUnique.java
>> :
>> Not Marked with experimental
>>
>> This API does not expose the ability to create sketches so it's not
>> suitable for the OLAP use case that HLL++ is geared towards (do
>> pre-aggregation into sketches to allow interactive query speeds). It's also
>> less precise for the same amount of memory used: the error bounds in the
>> doc comments give :
>>
>> /* The error is about
>>
>> {@code 2 * / sqrt(sampleSize)},) */
>>
>> Compared to the default HLLCount sketch size, its error is 10X larger
>> than the HLL++ error.
>>
>
> FWIW, There is a python implementation only for this version:
> https://github.com/apache/beam/blob/master/sdks/python/apache_beam/transforms/stats.py#L38
>
>
>
>> ApproximateDistinct.java
>> 
>> Marked with @Experimental
>>
>> This is a re-implementation of the HLL++ algorithm, based on the paper
>> published in 2013. It is exposing sketches via a HyperLogLogPlusCoder. We
>> have not run any benchmarks to compare this implementation compared to the
>> HLLCount and we need to be careful to ensure that if we were to change any
>> of these API's that the binary format of the sketches should never change,
>> there could be users who have stored previous sketches using
>> ApproximateDistinct and it will be important to try and ensure they do not
>> have a bad experience.
>>
>>
>> Proposals:
>>
>> There are two classes of users expected for these algorithms:
>>
>> 1) Users who simply use the transform to estimate the size of their data
>> set in Beam
>>
>> 2) Users who want to create sketches and store them, either for
>> interoperability with other systems, or as features to be used in further
>> data processing.
>>
>>
>>
>> For use case 1, it is possible to make use of naming which does not
>> expose the implementation, however for use case 2 it is important for the
>> implementation to be explicit as sketches produced with one implementation
>> will not work with other implementations.
>>
>> ApproximateUnique.java
>> 
>> :
>>
>> This one does not provide sketches and based on the notes above, is not
>> as efficient as HLLCount. However it does have a very searchable name and
>> is likely to be the one users will gravitate to when searching for
>> Approximate unique algorithms but do not need the capabilities of sketches.
>>
>> Ideally we should think about switching the implementation of this
>> transform to wrap HLLCount. However this could mean changing things in a
>> way which is not transparent to the end developer.  Although as a result of
>> the change they would get a better implementation for free on an upgrade :-)
>>
>> Another option would be to mark this transform as @Deprecated and create
>> a new transform ApproximateCountDistinct which would wrap HLLCount. The
>> name is also much clearer.
>>
>
> Marking it deprecated instead of changing its implementation will probably
> 

Re: Cleaning up Approximate Algorithms in Beam

2019-11-13 Thread Ahmet Altay
Thank you for writing this summary.

On Tue, Nov 12, 2019 at 6:35 PM Reza Rokni  wrote:

> Hi everyone;
>
> TL/DR : Discussion on Beam's various Approximate Distinct Count algorithms.
>
> Today there are several options for Approximate Algorithms in Apache Beam
> 2.16 with HLLCount being the most recently added. Would like to canvas
> opinions here on the possibility of rationalizing these API's by removing
> obsolete / less efficient implementations.
> The current situation:
>
> There are three options available to users: ApproximateUnique.java
> ,
> ApproximateDistinct.java
> 
> and HllCount.java
> .
> A quick summary of these API's as I understand them:
>
> HllCount.java
> :
> Marked as @Experimental
>
> PTransforms to compute HyperLogLogPlusPlus (HLL++) sketches on data
> streams based on the ZetaSketch 
> implementation.Detailed design of this class, see
> https://s.apache.org/hll-in-beam.
>
> ApproximateUnique.java
> :
> Not Marked with experimental
>
> This API does not expose the ability to create sketches so it's not
> suitable for the OLAP use case that HLL++ is geared towards (do
> pre-aggregation into sketches to allow interactive query speeds). It's also
> less precise for the same amount of memory used: the error bounds in the
> doc comments give :
>
> /* The error is about
>
> {@code 2 * / sqrt(sampleSize)},) */
>
> Compared to the default HLLCount sketch size, its error is 10X larger than
> the HLL++ error.
>

FWIW, There is a python implementation only for this version:
https://github.com/apache/beam/blob/master/sdks/python/apache_beam/transforms/stats.py#L38



> ApproximateDistinct.java
> 
> Marked with @Experimental
>
> This is a re-implementation of the HLL++ algorithm, based on the paper
> published in 2013. It is exposing sketches via a HyperLogLogPlusCoder. We
> have not run any benchmarks to compare this implementation compared to the
> HLLCount and we need to be careful to ensure that if we were to change any
> of these API's that the binary format of the sketches should never change,
> there could be users who have stored previous sketches using
> ApproximateDistinct and it will be important to try and ensure they do not
> have a bad experience.
>
>
> Proposals:
>
> There are two classes of users expected for these algorithms:
>
> 1) Users who simply use the transform to estimate the size of their data
> set in Beam
>
> 2) Users who want to create sketches and store them, either for
> interoperability with other systems, or as features to be used in further
> data processing.
>
>
>
> For use case 1, it is possible to make use of naming which does not expose
> the implementation, however for use case 2 it is important for the
> implementation to be explicit as sketches produced with one implementation
> will not work with other implementations.
>
> ApproximateUnique.java
> 
> :
>
> This one does not provide sketches and based on the notes above, is not as
> efficient as HLLCount. However it does have a very searchable name and is
> likely to be the one users will gravitate to when searching for Approximate
> unique algorithms but do not need the capabilities of sketches.
>
> Ideally we should think about switching the implementation of this
> transform to wrap HLLCount. However this could mean changing things in a
> way which is not transparent to the end developer.  Although as a result of
> the change they would get a better implementation for free on an upgrade :-)
>
> Another option would be to mark this transform as @Deprecated and create a
> new transform ApproximateCountDistinct which would wrap HLLCount. The
> name is also much clearer.
>

Marking it deprecated instead of changing its implementation will probably
create a less surprising experience for the users.


>
> ApproximateDistinct.java
> 

Re: Type of builtin PTransform/PCollection metrics

2019-11-13 Thread Luke Cwik
Are you referring specifically to?
* beam:metric:element_count:v1
* beam:metric:pardo_execution_time:start_bundle_msecs:v1
* beam:metric:pardo_execution_time:process_bundle_msecs:v1
* beam:metric:pardo_execution_time:finish_bundle_msecs:v1
* beam:metric:ptransform_execution_time:total_msecs:v1

Would the gauge be grouped per element or per bundle?
If grouped at the bundle level the metrics are arbitrary to the user since
the bundle size is chosen by the runner.
If grouped at the element level then only a few of the metrics make sense:
* element_count becomes number of outputs per input element
* process_bundle_msecs becomes amount of time to process a single input
element (does this still apply to elements that can be split?)

There is also a very significant overhead for tracking low level metrics in
great detail which is why timing is done through a sampling technique. I'm
sure if we could do it cheaply then it would make sense to get those
metrics. This is also a place where we want each SDK to implement these
metrics so complexity may slow down SDK authors from developing them.


On Wed, Nov 13, 2019 at 5:13 AM Maximilian Michels  wrote:

> Hi,
>
> We have a series of builtin PTransform/PCollection metrics:
>
> https://github.com/apache/beam/blob/808cb35018cd228a59b152234b655948da2455fa/model/pipeline/src/main/proto/metrics.proto#L74
>
> Why are those of counters ("beam:metrics:sum_int_64")? I think the
> better default type for most users would be gauge
> ("beam:metrics:latest_int_64").
>
> I understand that counters are useful because they retain the sum of all
> reported values, but for getting an idea about the deviation of a
> metric, gauges could be more useful.
>
> Perhaps we could make this configurable?
>
> Thanks,
> Max
>


Re: [discuss] Using a logger hierarchy in Python

2019-11-13 Thread Luke Cwik
That doesn't seem like a very invasive change so if we adopt it we should
adopt it everywhere in the same CL so people see the common pattern and use
it.

I'm for using a named logger and would rather that it is per class instead
of per module since many of the modules have lots of classes but +1 from me
overall.

On Tue, Nov 12, 2019 at 4:37 PM Pablo Estrada  wrote:

> Hi all,
> as of today, the Python SDK uses the root logger wherever we log. This
> means that it's impossible to have different logging levels depending on
> the section of the code that we want to debug most.
>
> I have been doing some work on the FnApiRunner, and adding logging for it.
> I would like to start using a logger hierarchy, and slowly transition the
> rest of the project to use per-module loggers.
>
> On each module, we could have a line like so:
>
> _LOGGER = logging.getLogger(__name__)
>
> and simply log everything on that _LOGGER. Is that an acceptable thing to
> do for everyone?
>
> If I see no objections, I will change the FnApiRunner to use a logger like
> this, and change other sections of the code as I interact with them.
> Best
> -P.
>


Re: [Discuss] Beam mascot

2019-11-13 Thread Maximilian Michels

Same. What about 37 with the eyes from 52?


+1 That would combine two ideas: (1) "Beam" eyes and (2) sea animal.

We could set this as the working idea and build a logo based off that.

On 12.11.19 22:41, Robert Bradshaw wrote:

On Tue, Nov 12, 2019 at 1:29 PM Aizhamal Nurmamat kyzy
 wrote:

52 and 37 for me. I don't know what 53 is, but I like it too.


Same. What about 37 with the eyes from 52?


On Tue, Nov 12, 2019 at 9:19 AM Maximilian Michels  wrote:


More logos :D

(35) - (37), (51), (48), (53) go into the direction of cuttlefish.

  From the new ones I like (52) because of the eyes. (53) If we want to
move into the direction of a water animal, the small ones are quite
recognizable. Also, (23) and (36) are kinda cute.

Cheers,
Max

On 12.11.19 02:09, Robert Bradshaw wrote:

Cuttlefish are cool, but I don't know how recognizable they are, and
they don't scream "fast" or "stream-y" or "parallel processing" to me
(not that that's a requirement...) I like that firefly, nice working
the logo into the trailing beam of light.

On Mon, Nov 11, 2019 at 5:03 PM Udi Meiri  wrote:


Dumbo octopus anyone? https://youtu.be/DmqikqvLLLw?t=263


On Mon, Nov 11, 2019 at 2:06 PM Luke Cwik  wrote:


The real answer, what cool schwag can we get based upon the mascot.

On Mon, Nov 11, 2019 at 2:04 PM Kenneth Knowles  wrote:


I'm with Luke on cuttlefish. We can have color changing schwag...

On Mon, Nov 11, 2019 at 9:57 AM David Cavazos  wrote:


I like 9 as well. Not related to anything, but chinchillas are also cute.

On Mon, Nov 11, 2019 at 8:25 AM Luke Cwik  wrote:


9 and 7 for me (in that order)

On Mon, Nov 11, 2019 at 7:18 AM Maximilian Michels  wrote:


Here are some sketches from the designer. I've put them all in one image
and added labels to make it easier to refer to them. My favorites are
(2) and (9).

Cheers,
Max

On 09.11.19 19:43, Maximilian Michels wrote:

I like that sketch! The designer has also sent me some rough sketches,
I'll share these here when I get consent from the designer.

-Max

On 09.11.19 19:22, Alex Van Boxel wrote:

+1 for a FireFly. Ok, I can't draw, but it's to make a point ;-)

Fire2.jpg



_/
_/ Alex Van Boxel


On Sat, Nov 9, 2019 at 12:26 AM Kyle Weaver mailto:kcwea...@google.com>> wrote:

  Re fish: The authors of the Streaming Systems went with trout, but
  the book mentioned a missed opportunity to make their cover a "robot
  dinosaur with a Scottish accent." Perhaps that idea is worth
revisiting?

  On Fri, Nov 8, 2019 at 3:20 PM Luke Cwik mailto:lc...@google.com>> wrote:

  My top suggestion is a cuttlefish.

  On Thu, Nov 7, 2019 at 10:28 PM Reza Rokni mailto:r...@google.com>> wrote:

  Salmon... they love streams? :-)

  On Fri, 8 Nov 2019 at 12:00, Kenneth Knowles
  mailto:k...@apache.org>> wrote:

  Agree with Aizhamal that it doesn't matter if they are
  taken if they are not too close in space to Beam: Apache
  projects, big data, log processing, stream processing.
  Not a legal opinion, but an aesthetic opinion. So I
  would keep Lemur as a possibility. Definitely nginx is
  far away from Beam so it seems OK as long as the art is
  different.

  Also FWIW there are many kinds of Lemurs, and also
  related Tarsier, of the only uncontroversial and
  non-extinct infraorder within suborder Strepsirrhini. I
  think there's enough room for another mascot with big
  glowing eyes :-). The difference in the designer's art
  will be more significant than the taxonomy.

  Kenn

  On Tue, Nov 5, 2019 at 4:37 PM Aizhamal Nurmamat kyzy
  mailto:aizha...@apache.org>> wrote:

  Aww.. that Hoover beaver is cute. But then lemur is
  also "taken" [1] and the owl too [2].

  Personally, I don't think it matters much which
  mascots are taken, as long as the project is not too
  close in the same space as Beam. Also, it's good to
  just get all ideas out. We should still consider
  hedgehogs. I looked up fireflies, they don't look
  nice, but i am not dismissing the idea :/

  And thanks for reaching out to designers, Max. To
  your point:
   >how do we arrive at a concrete design
   >once we have consensus on the type of mascot?
  My thinking is that the designer will come up with
  few sketches, then we vote on one here in the dev@
list.

  [1]

https://www.nginx.com/blog/introducing-the-lemur-stack-and-an-official-nginx-mascot/

  [2]


Re: [spark structured streaming runner] merge to master?

2019-11-13 Thread Etienne Chauchot

Ok for 1 jar with the 2 runners then.
I'll add the banner to the logs and the Experimental in the code and in 
in the javadocs.


Thanks for your opinions guys !

Etienne

On 08/11/2019 18:50, Kenneth Knowles wrote:
On Thu, Nov 7, 2019 at 5:32 PM Etienne Chauchot > wrote:

>
> Hi guys
>
> @Kenn,
>
> I just wanted to mention that I did answered your question on 
dependencies here: 
https://lists.apache.org/thread.html/5a85caac41e796c2aa351d835b3483808ebbbd4512b480940d494439@%3Cdev.beam.apache.org%3E


Ah, sorry! In that case there is no problem at all.


> I'm not in favor of having the 2 runners in one jar, the point about 
having 2 jars was to:

>
> - avoid making promises to users on a work in progress runner (make 
it explicit with a different jar)

> - avoid confusion for them (why are there 2 pipeline options? etc)
>
> If the community believes that there is no confusion or wrong 
promises with the one jar solution, we could leave the 2 runners in 
one jar.

>
> Maybe we could start a vote on that?

It seems unanimous among others to have one jar. There were some 
suggestions of how to avoid promises and confusion, like Ryan's most 
recent email. Did any of the ideas sound good to you?


Kenn


I have no objection to putting the experimental runner alongside the
stable, mature runner.  We have some precedence with the portable
spark runner, and that's worked out pretty well -- at least, I haven't
heard any complaints from confused users!

That being said:

1.  It really should be marked @Experimental in the code *and* clearly
warned in API (javadoc) and documentation.

2.  Ideally, I'd like to see a warning banner in the logs when it's
used, pointing to the stable SparkRunner and/or documentation on the
current known issues.

All my best, Ryan






> regarding jars:
>
> I don't like 3 jars either.
>
>
> Etienne
>
> On 31/10/2019 02:06, Kenneth Knowles wrote:
>
> Very good points. We definitely ship a lot of code/features in
very early stages, and there seems to be no problem.
>
> I intend mostly to leave this judgment to people like you who
know better about Spark users.
>
> But I do think 1 or 2 jars is better than 3. I really don't like
"3 jars" and I did give two reasons:
>
> 1. diamond deps where things overlap
> 2. figuring out which thing to depend on
>
> Both are annoying for users. I am not certain if it could lead
to a real unsolvable situation. This is just a Java ecosystem
problem so I feel qualified to comment.
>
> I did also ask if there were major dependency differences
between the two that could cause problems for users. This question
was dropped and no one cares to comment so I assume it is not an
issue. So then I favor having just 1 jar with both runners.
>
> Kenn
>
> On Wed, Oct 30, 2019 at 2:46 PM Ismaël Mejía mailto:ieme...@gmail.com>> wrote:
>>
>> I am still a bit lost about why we are discussing options
without giving any
>> arguments or reasons for the options? Why is 2 modules better
than 3 or 3 better
>> than 2, or even better, what forces us to have something
different than a single
>> module?
>>
>> What are the reasons for wanting to have separate jars? If the
issue is that the
>> code is unfinished or not passing the tests, the impact for end
users is minimal
>> because they cannot accidentally end up running the new runner,
and if they
>> decide to do so we can warn them it is at their own risk and
not ready for
>> production in the documentation + runner.
>>
>> If the fear is that new code may end up being intertwined with
the classic and
>> portable runners and have some side effects. We have the
ValidatesRunner +
>> Nexmark in the CI to cover this so again I do not see what is
the problem that
>> requires modules to be separate.
>>
>> If the issue is being uncomfortable about having in-progress
code in released
>> artifacts we have been doing this in Beam forever, for example
most of the work
>> on portability and Schema/SQL, and all of those were still part
of artifacts
>> long time before they were ready for prime use, so I still
don't see why this
>> case is different to require different artifacts.
>>
>> I have the impression we are trying to solve a non-issue by
adding a lot of
>> artificial complexity (in particular to the users), or am I
missing something
>> else?
>>
>> On Wed, Oct 30, 2019 at 7:40 PM Kenneth Knowles
mailto:k...@apache.org>> wrote:
>> >
>> > Oh, I mean that we ship just 2 jars.
>> >
>> > And since Spark users always build an uber jar, they can
still depend on both of ours and be able to switch runners with a
flag.
>> >
>> > I 

Type of builtin PTransform/PCollection metrics

2019-11-13 Thread Maximilian Michels

Hi,

We have a series of builtin PTransform/PCollection metrics: 
https://github.com/apache/beam/blob/808cb35018cd228a59b152234b655948da2455fa/model/pipeline/src/main/proto/metrics.proto#L74


Why are those of counters ("beam:metrics:sum_int_64")? I think the 
better default type for most users would be gauge 
("beam:metrics:latest_int_64").


I understand that counters are useful because they retain the sum of all 
reported values, but for getting an idea about the deviation of a 
metric, gauges could be more useful.


Perhaps we could make this configurable?

Thanks,
Max


Re: Date/Time Ranges & Protobuf

2019-11-13 Thread Jan Lukavský

Hi,

just an idea on these related topics that appear these days - it might 
help to realize, that what we actually don't need a full arithmetic on 
timestamps (Beam model IMHO doesn't need to know exactly what is the 
exact difference of two events). What we actually need is a slightly 
simplified algebra. Given two timestamps T1 and T2 and a "duration" (a 
different type from timestamp), we need operations (not 100% sure that 
this is exhaustive, but seems to be):


 - is_preceding(T1, T2): bool

   - important !is_preceding(T1, T2) does NOT imply that 
is_preceding(T2, T1) - !is_preceding(T1, T2) && !is_preceding(T2, T1) 
would mean events are _concurrent_


   - this relation has to be also antisymmetric

   - given this function we can construct a comparator, where multiple 
distinct timestamps can be "equal" (or with no particular ordering, 
which is natural property of time)


 - min_timestamp_following(T1, duration): T2

   - that would return a timestamp for which is_preceding(T1 + 
duration, T2) would return true and no other timestamp X would exist for 
which is_preceding(T1 + duration, X) && is_preceding(X, T2) would be true


   - actually, this function would serve as the definition for the 
duration object


If we can supply this algebra, it seems that we can use any 
representation of timestamps and intervals. It might be (probably) even 
possible to let user specify his own type used as timestamps and 
durations, which could solve the issues of not currently being able to 
correctly represent timestamps lower than Long.MIN_VALUE (although we 
can get data for that low timestamps - cosmic microwave background being 
one example :)). Specifying this algebra actually probably boils down to 
proposal (3) in Robert's thread [1].


Just my 2 cents.

Jan

[1] 
https://lists.apache.org/thread.html/1672898393cb0d54a77a879be0fb5725902289a3e5063d0f9ec36fe1@%3Cdev.beam.apache.org%3E


On 11/13/19 10:11 AM, jincheng sun wrote:

Thanks for bringing up this discussion @Luke.

As @Kenn mentioned, in Beam we have defined the constants value for 
the min/max/end of global window. I noticed that 
google.protobuf.Timestamp/Duration is only used in window definitions, 
such as FixedWindowsPayload, SlidingWindowsPayload, SessionsPayload, etc.


I think that both RFC 3339 and Beam's current implementation are big 
enough to express a common window definitions. But users can really 
define a window size that outside the scope of the RFC 3339. 
Conceptually, we should not limit the time range for window(although I 
think the range of RPC 3339 is big enough in most cases).


To ensure that people well know the background of the discussion, hope 
you don't mind that I put the original conversion thread[1] here.


Best,
Jincheng

[1] https://github.com/apache/beam/pull/10041#discussion_r344380809

Robert Bradshaw mailto:rober...@google.com>> 
于2019年11月12日周二 下午4:09写道:


I agree about it being a tagged union in the model (together with
actual_time(...) - epsilon). It's not just a performance hack though,
it's also (as discussed elsewhere) a question of being able to find an
embedding into existing datetime libraries. The real question here is
whether we should limit ourselves to just these 1 years AD, or
find value in being able to process events for the lifetime of the
universe (or, at least, recorded human history). Artificially limiting
in this way would seem surprising to me at least.

On Mon, Nov 11, 2019 at 11:58 PM Kenneth Knowles mailto:k...@apache.org>> wrote:
>
> The max timestamp, min timestamp, and end of the global window
are all performance hacks in my view. Timestamps in beam are
really a tagged union:
>
>     timestamp ::= min | max | end_of_global | actual_time(...
some quantitative timestamp ...)
>
> with the ordering
>
>     min < actual_time(...) < end_of_global < max
>
> We chose arbitrary numbers so that we could do simple numeric
comparisons and arithmetic.
>
> Kenn
>
> On Mon, Nov 11, 2019 at 2:03 PM Luke Cwik mailto:lc...@google.com>> wrote:
>>
>> While crites@ was investigating using protobuf to represent
Apache Beam timestamps within the TestStreamEvents, he found out
that the well known type google.protobuf.Timestamp doesn't support
certain timestamps we were using in our tests (specifically the
max timestamp that Apache Beam supports).
>>
>> This lead me to investigate and the well known type
google.protobuf.Timestamp supports dates/times from
0001-01-01T00:00:00Z to -12-31T23:59:59.9Z which is
much smaller than the timestamp range that Apache Beam currently
supports -9223372036854775ms to 9223372036854775ms which is about
292277BC to 294247AD (it was difficult to find a time range that
represented this).
>>
>> Similarly the google.protobuf.Duration represents any time
range over those ~1 years. 

Re: Date/Time Ranges & Protobuf

2019-11-13 Thread jincheng sun
Thanks for bringing up this discussion @Luke.

As @Kenn mentioned, in Beam we have defined the constants value for the
min/max/end of global window. I noticed that
google.protobuf.Timestamp/Duration is only used in window definitions, such
as FixedWindowsPayload, SlidingWindowsPayload, SessionsPayload, etc.

I think that both RFC 3339 and Beam's current implementation are big enough
to express a common window definitions. But users can really define a
window size that outside the scope of the RFC 3339. Conceptually, we should
not limit the time range for window(although I think the range of RPC 3339
is big enough in most cases).

To ensure that people well know the background of the discussion, hope you
don't mind that I put the original conversion thread[1] here.

Best,
Jincheng

[1] https://github.com/apache/beam/pull/10041#discussion_r344380809

Robert Bradshaw  于2019年11月12日周二 下午4:09写道:

> I agree about it being a tagged union in the model (together with
> actual_time(...) - epsilon). It's not just a performance hack though,
> it's also (as discussed elsewhere) a question of being able to find an
> embedding into existing datetime libraries. The real question here is
> whether we should limit ourselves to just these 1 years AD, or
> find value in being able to process events for the lifetime of the
> universe (or, at least, recorded human history). Artificially limiting
> in this way would seem surprising to me at least.
>
> On Mon, Nov 11, 2019 at 11:58 PM Kenneth Knowles  wrote:
> >
> > The max timestamp, min timestamp, and end of the global window are all
> performance hacks in my view. Timestamps in beam are really a tagged union:
> >
> > timestamp ::= min | max | end_of_global | actual_time(... some
> quantitative timestamp ...)
> >
> > with the ordering
> >
> > min < actual_time(...) < end_of_global < max
> >
> > We chose arbitrary numbers so that we could do simple numeric
> comparisons and arithmetic.
> >
> > Kenn
> >
> > On Mon, Nov 11, 2019 at 2:03 PM Luke Cwik  wrote:
> >>
> >> While crites@ was investigating using protobuf to represent Apache
> Beam timestamps within the TestStreamEvents, he found out that the well
> known type google.protobuf.Timestamp doesn't support certain timestamps we
> were using in our tests (specifically the max timestamp that Apache Beam
> supports).
> >>
> >> This lead me to investigate and the well known type
> google.protobuf.Timestamp supports dates/times from 0001-01-01T00:00:00Z to
> -12-31T23:59:59.9Z which is much smaller than the timestamp
> range that Apache Beam currently supports -9223372036854775ms to
> 9223372036854775ms which is about 292277BC to 294247AD (it was difficult to
> find a time range that represented this).
> >>
> >> Similarly the google.protobuf.Duration represents any time range over
> those ~1 years. Google decided to limit their range to be compatible
> with the RFC 3339[2] standard to which does simplify many things since it
> guarantees that all RFC 3339 time parsing/manipulation libraries are
> supported.
> >>
> >> Should we:
> >> A) define our own timestamp/duration types to be able to represent the
> full time range that Apache Beam can express?
> >> B) limit the valid timestamps in Apache Beam to some standard such as
> RFC 3339?
> >>
> >> This discussion is somewhat related to the efforts to support nano
> timestamps[2].
> >>
> >> 1: https://tools.ietf.org/html/rfc3339
> >> 2:
> https://lists.apache.org/thread.html/86a4dcabdaa1dd93c9a55d16ee51edcff6266eda05221acbf9cf666d@%3Cdev.beam.apache.org%3E
>