Re: Removing DATETIME in Java Schemas

2020-05-15 Thread Reuven Lax
On Fri, May 15, 2020 at 8:10 PM Kenneth Knowles  wrote:

>
>
> On Fri, May 15, 2020 at 5:25 PM Brian Hulette  wrote:
>
>> After thinking about this more I've softened on it some, but I'm still a
>> little wary. I like Kenn's suggestion:
>>
>> > - it is OK to convert known logical types like MillisInstant or
>> NanosInstant to a ZetaSQL "TIMESTAMP" or Calcite SQL "TIMESTAMP WITH LOCAL
>> TIME ZONE"
>> > - it is OK to convert unknown logical types into their representation
>> type
>> > - coming out the other side of a SQL statement, a user should expect
>> the type to be a row where every field has a type from the SQL dialect; the
>> input types are gone
>>
>> I think what you're proposing is basically a preprocessing step on
>> SqlTransform that demotes any unrecognized logical types to their
>> representation. Is that a fair characterization?
>>
>
> Exactly
>
>
>> I wonder if we could push the demotion further into the transform to
>> enable some additional uses. For example "SELECT * FROM PCOLL WHERE ..." -
>> if some logical type SQL knows nothing about is pulled into the SELECT *,
>> it seems reasonable (from a user-perspective) to just pass the type through
>> untouched, and automatically demoting it might be surprising.
>>
>
> To me, it seems fancy to pass the type through. And I expect users will
> soon try to do non-pass-through things involving equality or ordering,
> where only specialized representations work right. So for the very limited
> cases where it doesn't matter, I'm not opposed, but I wouldn't work hard to
> make it happen. If Calcite implicit conversions are a good way to implement
> this, that is fine by me.
>
> Put another way: if the downstream transforms expect to get the logical
> type back, won't automatic schema conversion already "just work"?
>

No. Let's say you are reading protos and writing protos with a SQL filter
statement in between. If we lose the logical type, then the output schema
may not be able to match against the proto schema. .e.g. if the proto has a
field of type FIXED32, we reflect this with a Fixed32 logical type with a
base type of integer. SQL can operate on the base type just fine because
it's just an int32, but if you strip the logical type from the output then
it will have trouble matching against the proto schema as Beam will infer
an int32 type.


> (I don't have any response to the avro/proto points except I think I agree
> with both of you)
>
> Kenn
>
> What if we had the ability to implicitly convert logical types to their
>> representation? Then the above example would work, because no implicit
>> conversion is necessary, and also relations that introspect the logical
>> type (e.g. projecting f_location.lat) could work by including an implicit
>> conversion. The implicit conversion would still be a one-way operation, but
>> it would only happen when necessary.
>>
>> I really have no idea how complicated this would be to implement. Would
>> it be feasible? Is it ill-advised?
>>
>> Brian
>>
>> On Fri, May 15, 2020 at 2:43 PM Reuven Lax  wrote:
>>
>>> I would not describe the base type as the "wire type." If that were
>>> true, then the only base type we should support should be byte array.
>>>
>>> My simple point is that this is no different than normal schema fields.
>>> You will find many normal schemas containing data encoded into other field
>>> types. You will also find this in SQL databases. If naming these fields
>>> with a logical type causes us to throw up our hands and declare that they
>>> can't be introspectable, then our users will likely just avoid logical
>>> types and encode this data into their own LONG and STRING fields. It will
>>> also mean that SQL will not work well over Avro or Proto input.
>>>
>> Is the Avro/Proto connection really a problem? I would think that we
>> could map most common Avro or Proto types to either primitive types or
>> "standard" logical types which are the same as the ones used by SQL
>> wherever possible, e.g. NanosInstant and NanosDuration added for proto.
>>
>>>
>>> Reuven
>>>
>>> On Fri, May 15, 2020 at 2:19 PM Andrew Pilloud 
>>> wrote:
>>>
 My understanding is that the base type is effectively the wire format
 at the type, where the logical type is the in-memory representation for
 Java. For org.joda.time.Instant, this is just a wrapper around the
 underlying Long. However for the Date logical type, the LocalDate type has
 struct as the in-memory representation. There is a performance penalty to
 this conversion and I think there are cases where users (SQL for example)
 would want a fast path that skips the serialization/deserialization steps
 if possible.

 As for more complex logical types, I would urge caution in making
 assumptions around the base types. I expect there will be base types of
 strings containing JSON or byte arrays containing protobuf or avro. ZetaSQL
 has a civil time module that packs a struct into a Long. All of these are
 

Re: Removing DATETIME in Java Schemas

2020-05-15 Thread Reuven Lax
On Fri, May 15, 2020 at 5:25 PM Brian Hulette  wrote:

> After thinking about this more I've softened on it some, but I'm still a
> little wary. I like Kenn's suggestion:
>
> > - it is OK to convert known logical types like MillisInstant or
> NanosInstant to a ZetaSQL "TIMESTAMP" or Calcite SQL "TIMESTAMP WITH LOCAL
> TIME ZONE"
> > - it is OK to convert unknown logical types into their representation
> type
> > - coming out the other side of a SQL statement, a user should expect the
> type to be a row where every field has a type from the SQL dialect; the
> input types are gone
>
> I think what you're proposing is basically a preprocessing step on
> SqlTransform that demotes any unrecognized logical types to their
> representation. Is that a fair characterization?
>

so SELECT * FROM stream would modify the stream? I guess we could do that,
but might be a bit surprising if SELECT * is not faithful.


>
> I wonder if we could push the demotion further into the transform to
> enable some additional uses. For example "SELECT * FROM PCOLL WHERE ..." -
> if some logical type SQL knows nothing about is pulled into the SELECT *,
> it seems reasonable (from a user-perspective) to just pass the type through
> untouched, and automatically demoting it might be surprising.
>

> What if we had the ability to implicitly convert logical types to their
> representation? Then the above example would work, because no implicit
> conversion is necessary, and also relations that introspect the logical
> type (e.g. projecting f_location.lat) could work by including an implicit
> conversion. The implicit conversion would still be a one-way operation, but
> it would only happen when necessary.
>

> I really have no idea how complicated this would be to implement. Would it
> be feasible? Is it ill-advised?
>

I kinda like it. I. don't know if it would be complicated to implement, but
maybe tricky?



>
> Brian
>
> On Fri, May 15, 2020 at 2:43 PM Reuven Lax  wrote:
>
>> I would not describe the base type as the "wire type." If that were true,
>> then the only base type we should support should be byte array.
>>
>> My simple point is that this is no different than normal schema fields.
>> You will find many normal schemas containing data encoded into other field
>> types. You will also find this in SQL databases. If naming these fields
>> with a logical type causes us to throw up our hands and declare that they
>> can't be introspectable, then our users will likely just avoid logical
>> types and encode this data into their own LONG and STRING fields. It will
>> also mean that SQL will not work well over Avro or Proto input.
>>
> Is the Avro/Proto connection really a problem? I would think that we could
> map most common Avro or Proto types to either primitive types or "standard"
> logical types which are the same as the ones used by SQL wherever possible,
> e.g. NanosInstant and NanosDuration added for proto.
>
>>
>> Reuven
>>
>> On Fri, May 15, 2020 at 2:19 PM Andrew Pilloud 
>> wrote:
>>
>>> My understanding is that the base type is effectively the wire format at
>>> the type, where the logical type is the in-memory representation for Java.
>>> For org.joda.time.Instant, this is just a wrapper around the underlying
>>> Long. However for the Date logical type, the LocalDate type has struct as
>>> the in-memory representation. There is a performance penalty to this
>>> conversion and I think there are cases where users (SQL for example) would
>>> want a fast path that skips the serialization/deserialization steps if
>>> possible.
>>>
>>> As for more complex logical types, I would urge caution in making
>>> assumptions around the base types. I expect there will be base types of
>>> strings containing JSON or byte arrays containing protobuf or avro. ZetaSQL
>>> has a civil time module that packs a struct into a Long. All of these are
>>> things you can write SQL around, but I think the users might be somewhat
>>> surprised when they come out less than usable by default.
>>>
>>> On Fri, May 15, 2020 at 2:12 PM Reuven Lax  wrote:
>>>


 On Fri, Apr 24, 2020 at 11:56 AM Brian Hulette 
 wrote:

> When we created the portable representation of schemas last summer we
> intentionally did not include DATETIME as a primitive type [1], even 
> though
> it already existed as a primitive type in Java [2]. There seemed to be
> consensus around a couple of points: 1) At the very least DATETIME is a
> poor name and it should be renamed to INSTANT or TIMESTAMP, and 2) a
> logical type is better suited for this purpose.
>
> Since then, it's been my intention to replace Java's DATETIME with a
> MillisInstant logical type backed by Long milliseconds since the epoch 
> (see
> BEAM-7554 [3]). I've finally managed to get a PR together that does so 
> (and
> keeps tests passing): https://github.com/apache/beam/pull/11456
>
> There could be a couple of concerns with this PR

Re: Removing DATETIME in Java Schemas

2020-05-15 Thread Kenneth Knowles
On Fri, May 15, 2020 at 5:25 PM Brian Hulette  wrote:

> After thinking about this more I've softened on it some, but I'm still a
> little wary. I like Kenn's suggestion:
>
> > - it is OK to convert known logical types like MillisInstant or
> NanosInstant to a ZetaSQL "TIMESTAMP" or Calcite SQL "TIMESTAMP WITH LOCAL
> TIME ZONE"
> > - it is OK to convert unknown logical types into their representation
> type
> > - coming out the other side of a SQL statement, a user should expect the
> type to be a row where every field has a type from the SQL dialect; the
> input types are gone
>
> I think what you're proposing is basically a preprocessing step on
> SqlTransform that demotes any unrecognized logical types to their
> representation. Is that a fair characterization?
>

Exactly


> I wonder if we could push the demotion further into the transform to
> enable some additional uses. For example "SELECT * FROM PCOLL WHERE ..." -
> if some logical type SQL knows nothing about is pulled into the SELECT *,
> it seems reasonable (from a user-perspective) to just pass the type through
> untouched, and automatically demoting it might be surprising.
>

To me, it seems fancy to pass the type through. And I expect users will
soon try to do non-pass-through things involving equality or ordering,
where only specialized representations work right. So for the very limited
cases where it doesn't matter, I'm not opposed, but I wouldn't work hard to
make it happen. If Calcite implicit conversions are a good way to implement
this, that is fine by me.

Put another way: if the downstream transforms expect to get the logical
type back, won't automatic schema conversion already "just work"?

(I don't have any response to the avro/proto points except I think I agree
with both of you)

Kenn

What if we had the ability to implicitly convert logical types to their
> representation? Then the above example would work, because no implicit
> conversion is necessary, and also relations that introspect the logical
> type (e.g. projecting f_location.lat) could work by including an implicit
> conversion. The implicit conversion would still be a one-way operation, but
> it would only happen when necessary.
>
> I really have no idea how complicated this would be to implement. Would it
> be feasible? Is it ill-advised?
>
> Brian
>
> On Fri, May 15, 2020 at 2:43 PM Reuven Lax  wrote:
>
>> I would not describe the base type as the "wire type." If that were true,
>> then the only base type we should support should be byte array.
>>
>> My simple point is that this is no different than normal schema fields.
>> You will find many normal schemas containing data encoded into other field
>> types. You will also find this in SQL databases. If naming these fields
>> with a logical type causes us to throw up our hands and declare that they
>> can't be introspectable, then our users will likely just avoid logical
>> types and encode this data into their own LONG and STRING fields. It will
>> also mean that SQL will not work well over Avro or Proto input.
>>
> Is the Avro/Proto connection really a problem? I would think that we could
> map most common Avro or Proto types to either primitive types or "standard"
> logical types which are the same as the ones used by SQL wherever possible,
> e.g. NanosInstant and NanosDuration added for proto.
>
>>
>> Reuven
>>
>> On Fri, May 15, 2020 at 2:19 PM Andrew Pilloud 
>> wrote:
>>
>>> My understanding is that the base type is effectively the wire format at
>>> the type, where the logical type is the in-memory representation for Java.
>>> For org.joda.time.Instant, this is just a wrapper around the underlying
>>> Long. However for the Date logical type, the LocalDate type has struct as
>>> the in-memory representation. There is a performance penalty to this
>>> conversion and I think there are cases where users (SQL for example) would
>>> want a fast path that skips the serialization/deserialization steps if
>>> possible.
>>>
>>> As for more complex logical types, I would urge caution in making
>>> assumptions around the base types. I expect there will be base types of
>>> strings containing JSON or byte arrays containing protobuf or avro. ZetaSQL
>>> has a civil time module that packs a struct into a Long. All of these are
>>> things you can write SQL around, but I think the users might be somewhat
>>> surprised when they come out less than usable by default.
>>>
>>> On Fri, May 15, 2020 at 2:12 PM Reuven Lax  wrote:
>>>


 On Fri, Apr 24, 2020 at 11:56 AM Brian Hulette 
 wrote:

> When we created the portable representation of schemas last summer we
> intentionally did not include DATETIME as a primitive type [1], even 
> though
> it already existed as a primitive type in Java [2]. There seemed to be
> consensus around a couple of points: 1) At the very least DATETIME is a
> poor name and it should be renamed to INSTANT or TIMESTAMP, and 2) a
> logical type is better suite

Re: [Proposal] Apache Beam Fn API - GCP IO Debuggability Metrics

2020-05-15 Thread Alex Amato
Thanks everyone. I was able to collect a lot of good feedback from everyone
who contributed. I am going to wrap it up for now and label the design as
"Design Finalized (Unimplemented)".

I really believe we have made a much better design than I initially wrote
up. I couldn't have done it without the help of everyone who offered their
time, energy and viewpoints. :)

Thanks again, please let me know if you see any major issues with the
design still. I think I have enough information to begin some
implementation as soon as I have some time in the coming weeks.
Alex

https://s.apache.org/beam-gcp-debuggability
https://s.apache.org/beam-histogram-metrics

On Thu, May 14, 2020 at 5:22 PM Alex Amato  wrote:

> Thanks to all who have spent their time on this, there were many great
> suggestions, just another reminder that tomorrow I will be finalizing the
> documents, unless there are any major objections left. Please take a look
> at it if you are interested.
>
> I will still welcome feedback at any time :).
>
> But I believe we have gathered enough information to produce a good
> design, which I will start to work on soon.
> I will begin to build the necessary subset of the new features proposed to
> support the BigQueryIO metrics use case, proposed.
> I will likely start with the python SDK first.
>
> https://s.apache.org/beam-gcp-debuggability
> https://s.apache.org/beam-histogram-metrics
>
>
> On Wed, May 13, 2020 at 3:07 PM Alex Amato  wrote:
>
>> Thanks again for more feedback :). I have iterated on things again. I'll
>> report back at the end of the week. If there are no major disagreements
>> still, I'll close the discussion, believe it to be in a good enough state
>> to start some implementation. But welcome feedback.
>>
>> Latest changes are changing the exponential format to allow denser
>> buckets. Using only two MonitoringInfoSpec now for all of the IOs to use.
>> Requiring some labels, but allowing optional
>> ones for specific IOs to provide more contents.
>>
>> https://s.apache.org/beam-gcp-debuggability
>> https://s.apache.org/beam-histogram-metrics
>>
>> On Mon, May 11, 2020 at 4:24 PM Alex Amato  wrote:
>>
>>> Thanks for the great feedback so far :). I've included many new ideas,
>>> and made some revisions. Both docs have changed a fair bit since the
>>> initial mail out.
>>>
>>> https://s.apache.org/beam-gcp-debuggability
>>> https://s.apache.org/beam-histogram-metrics
>>>
>>> PTAL and let me know what you think, and hopefully we can resolve major
>>> issues by the end of the week. I'll try to finalize things by then, but of
>>> course always stay open to your great ideas. :)
>>>
>>> On Wed, May 6, 2020 at 6:19 PM Alex Amato  wrote:
>>>
 Thanks everyone so far for taking a look so far :).

 I am hoping to have this finalize the two reviews by the end of next
 week, May 15th.

 I'll continue to follow up on feedback and make changes, and I will add
 some more mentions to the documents to draw attention

 https://s.apache.org/beam-gcp-debuggability
  https://s.apache.org/beam-histogram-metrics

 On Wed, May 6, 2020 at 10:00 AM Luke Cwik  wrote:

> Thanks, also took a look and left some comments.
>
> On Tue, May 5, 2020 at 6:24 PM Alex Amato  wrote:
>
>> Hello,
>>
>> I created another design document. This time for GCP IO Debuggability
>> Metrics. Which defines some new metrics to collect in the GCP IO 
>> libraries.
>> This is for monitoring request counts and request latencies.
>>
>> Please take a look and let me know what you think:
>> https://s.apache.org/beam-gcp-debuggability
>>
>> I also sent out a separate design yesterday (
>> https://s.apache.org/beam-histogram-metrics) which is related as
>> this document uses a Histogram style metric :).
>>
>> I would love some feedback to make this feature the best possible :D,
>> Alex
>>
>


Re: [DISCUSS] New process for proposing ApacheBeam tweets

2020-05-15 Thread Robert Bradshaw
Sounds like a good plan to me, but I haven't been the one monitoring this
spreadsheet (or twitter for that matter). Spam is a concern, but
everything is moderated so I think we try it out and see if the volume is
really high enough to be an issue.

On Fri, May 15, 2020 at 4:46 PM Kenneth Knowles  wrote:

> I like having easier notifications. It would be great if the notifications
> had the content also. I get notifications on the spreadsheet, but since I
> have to click through to look at them there is a little bit of friction.
>
> 1. Is it still easy to add the other columns to record LGTM and when they
> have been sent?
> 2. Risk of spam?
>
> Kenn
>
> On Fri, May 15, 2020 at 2:56 PM Aizhamal Nurmamat kyzy <
> aizha...@apache.org> wrote:
>
>> Hi all,
>>
>> I wanted to propose some improvements to the existing process of
>> proposing tweets for the Apache Beam Twitter account.
>>
>> Currently the process requires people to request edit access, and then
>> add tweets on the spreadsheet [1]. I use it a lot because I know the
>> process well, but I think this may limit newcomers from proposing changes,
>> and can make things difficult for them.
>> I am proposing to use a Google Form (see example[2]), which will populare
>> the same spreadsheet. The advantages of this is that it will be open for
>> everyone, and it will be easier to subscribe to notifications every time
>> someone adds a new tweet proposal.
>>
>> The review process by the PMC would remain in place as-is.
>>
>> What does everyone think?
>> Thanks
>> Aizhamal
>>
>> [1] https://s.apache.org/beam-tweets
>> [2]
>> https://docs.google.com/forms/d/e/1FAIpQLSepI3zVZNmA9RUAzAnhH45IZU48uCt0qmPPYyZOEZCFONQZ6w/viewform
>>
>


Re: Removing DATETIME in Java Schemas

2020-05-15 Thread Brian Hulette
After thinking about this more I've softened on it some, but I'm still a
little wary. I like Kenn's suggestion:

> - it is OK to convert known logical types like MillisInstant or
NanosInstant to a ZetaSQL "TIMESTAMP" or Calcite SQL "TIMESTAMP WITH LOCAL
TIME ZONE"
> - it is OK to convert unknown logical types into their representation type
> - coming out the other side of a SQL statement, a user should expect the
type to be a row where every field has a type from the SQL dialect; the
input types are gone

I think what you're proposing is basically a preprocessing step on
SqlTransform that demotes any unrecognized logical types to their
representation. Is that a fair characterization?

I wonder if we could push the demotion further into the transform to enable
some additional uses. For example "SELECT * FROM PCOLL WHERE ..." - if some
logical type SQL knows nothing about is pulled into the SELECT *, it seems
reasonable (from a user-perspective) to just pass the type through
untouched, and automatically demoting it might be surprising.

What if we had the ability to implicitly convert logical types to their
representation? Then the above example would work, because no implicit
conversion is necessary, and also relations that introspect the logical
type (e.g. projecting f_location.lat) could work by including an implicit
conversion. The implicit conversion would still be a one-way operation, but
it would only happen when necessary.

I really have no idea how complicated this would be to implement. Would it
be feasible? Is it ill-advised?

Brian

On Fri, May 15, 2020 at 2:43 PM Reuven Lax  wrote:

> I would not describe the base type as the "wire type." If that were true,
> then the only base type we should support should be byte array.
>
> My simple point is that this is no different than normal schema fields.
> You will find many normal schemas containing data encoded into other field
> types. You will also find this in SQL databases. If naming these fields
> with a logical type causes us to throw up our hands and declare that they
> can't be introspectable, then our users will likely just avoid logical
> types and encode this data into their own LONG and STRING fields. It will
> also mean that SQL will not work well over Avro or Proto input.
>
Is the Avro/Proto connection really a problem? I would think that we could
map most common Avro or Proto types to either primitive types or "standard"
logical types which are the same as the ones used by SQL wherever possible,
e.g. NanosInstant and NanosDuration added for proto.

>
> Reuven
>
> On Fri, May 15, 2020 at 2:19 PM Andrew Pilloud 
> wrote:
>
>> My understanding is that the base type is effectively the wire format at
>> the type, where the logical type is the in-memory representation for Java.
>> For org.joda.time.Instant, this is just a wrapper around the underlying
>> Long. However for the Date logical type, the LocalDate type has struct as
>> the in-memory representation. There is a performance penalty to this
>> conversion and I think there are cases where users (SQL for example) would
>> want a fast path that skips the serialization/deserialization steps if
>> possible.
>>
>> As for more complex logical types, I would urge caution in making
>> assumptions around the base types. I expect there will be base types of
>> strings containing JSON or byte arrays containing protobuf or avro. ZetaSQL
>> has a civil time module that packs a struct into a Long. All of these are
>> things you can write SQL around, but I think the users might be somewhat
>> surprised when they come out less than usable by default.
>>
>> On Fri, May 15, 2020 at 2:12 PM Reuven Lax  wrote:
>>
>>>
>>>
>>> On Fri, Apr 24, 2020 at 11:56 AM Brian Hulette 
>>> wrote:
>>>
 When we created the portable representation of schemas last summer we
 intentionally did not include DATETIME as a primitive type [1], even though
 it already existed as a primitive type in Java [2]. There seemed to be
 consensus around a couple of points: 1) At the very least DATETIME is a
 poor name and it should be renamed to INSTANT or TIMESTAMP, and 2) a
 logical type is better suited for this purpose.

 Since then, it's been my intention to replace Java's DATETIME with a
 MillisInstant logical type backed by Long milliseconds since the epoch (see
 BEAM-7554 [3]). I've finally managed to get a PR together that does so (and
 keeps tests passing): https://github.com/apache/beam/pull/11456

 There could be a couple of concerns with this PR that I think would be
 better discussed here rather than on github.


 ## Breaking changes in low-level APIs
 The PR includes some breaking changes in public, low-level schema APIs:
 It removes DATETIME from the TypeName enum [4], and it will also change the
 type returned by Row#getBaseValue for DATETIME/MillisInstant fields (Long
 milliseconds since epoch rather than org.joda.time.Instant). Both of th

Re: [DISCUSS] New process for proposing ApacheBeam tweets

2020-05-15 Thread Kenneth Knowles
I like having easier notifications. It would be great if the notifications
had the content also. I get notifications on the spreadsheet, but since I
have to click through to look at them there is a little bit of friction.

1. Is it still easy to add the other columns to record LGTM and when they
have been sent?
2. Risk of spam?

Kenn

On Fri, May 15, 2020 at 2:56 PM Aizhamal Nurmamat kyzy 
wrote:

> Hi all,
>
> I wanted to propose some improvements to the existing process of proposing
> tweets for the Apache Beam Twitter account.
>
> Currently the process requires people to request edit access, and then add
> tweets on the spreadsheet [1]. I use it a lot because I know the process
> well, but I think this may limit newcomers from proposing changes, and can
> make things difficult for them.
> I am proposing to use a Google Form (see example[2]), which will populare
> the same spreadsheet. The advantages of this is that it will be open for
> everyone, and it will be easier to subscribe to notifications every time
> someone adds a new tweet proposal.
>
> The review process by the PMC would remain in place as-is.
>
> What does everyone think?
> Thanks
> Aizhamal
>
> [1] https://s.apache.org/beam-tweets
> [2]
> https://docs.google.com/forms/d/e/1FAIpQLSepI3zVZNmA9RUAzAnhH45IZU48uCt0qmPPYyZOEZCFONQZ6w/viewform
>


Re: Writing a new IO on beam, should I use the source API or SDF?

2020-05-15 Thread Boyuan Zhang
Hi Steve,

Yes that's correct.

On Fri, May 15, 2020 at 2:11 PM Steve Niemitz  wrote:

> ah! ok awesome, I think that was the piece I was misunderstanding.  So I
> _can_ use a SDF to split the work initially (like I was manually doing in
> #1), but it just won't be further split dynamically on dataflow v1 right
> now.  Is my understanding there correct?
>
> On Fri, May 15, 2020 at 5:03 PM Luke Cwik  wrote:
>
>> #3 is the best when you implement @SplitRestriction on the SDF.
>>
>> The size of each restriction is used to better balance the splits within
>> Dataflow runner v2 so it is less susceptible to the too many or unbalanced
>> split problem.
>> For example, if you have 4 workers and make 20 splits, the splits will be
>> grouped based upon their sizes. So if 19 of those splits are small and 1 is
>> big, the 1 will execute by itself while the 19 will be done by the 3 other
>> workers.
>>
>> Also, dynamic work rebalancing isn't meant to replace those initial
>> splits but helps a lot with worker rebalancing since a few workers are
>> usually stragglers and will need some help at the end of a pipeline.
>>
>> On Fri, May 15, 2020 at 1:54 PM Steve Niemitz 
>> wrote:
>>
>>> Thanks for the replies so far.  I should have specifically mentioned
>>> above, I am building a bounded source.
>>>
>>> While I was thinking this through, I realized that I might not actually
>>> need any fancy splitting, since I can calculate all my split points up
>>> front.  I think this goes well with Ismaël's suggestion as well.
>>>
>>> I'm curious what the pros and cons would be of these options:
>>> 1) Presplit each file into N pieces (based on a target bundle size,
>>> similar to how it looks like the avro reader does it), using a
>>> standard DoFn to read each split.
>>> 2) Presplit, but use a SDF to support further splitting once it's
>>> supported in dataflow.  (this would also help if I have files that can't be
>>> split up front)
>>> 3) Don't pre-split, but use a SDF.
>>> 4) Use the source API
>>>
>>> I think we've covered 2 and 4 pretty well already, but curious
>>> specifically about the pre-split approach.  Thanks again so far!
>>>
>>> On Fri, May 15, 2020 at 1:11 PM Ismaël Mejía  wrote:
>>>
 For the Bounded case if you do not have a straight forward way to split
 at
 fractions, or simply if you do not care about Dynamic Work Rebalancing.
 You can
 get away implementing a simple DoFn (without Restrictions) based
 implementation
 and evolve from it. More and more IOs at Beam are becoming DoFn based
 (even if
 not SDF) because you win the composability advantages.

 An interesting question is when should we start deprecating the Source
 API and
 encourage people to write only DoFn based IOs. I think we are getting
 to the
 maturity point where we can start this discussion.

 On Fri, May 15, 2020 at 4:59 PM Luke Cwik  wrote:
 >
 > If it is an unbounded source then SDF is a winner since you are not
 giving up anything with it when compared to the legacy UnboundedSource API
 since Dataflow doesn't support dynamic splitting of unbounded SDFs or
 UnboundedSources (only initial splitting). You gain the ability to compose
 sources and the initial splitting is done at pipeline execution for SDFs vs
 pipeline construction time for UnboundedSource.
 >
 > If it is bounded, my gut is to still go with SDF since:
 > * Dataflow runner V2 supports SDF fully
 > * The Java/Python SDF APIs have gone through the majority of churn
 already, there are some minor clean-ups and then I would like to remove the
 @Experimental annotations from them after a discussion on dev@ about it
 > * Being able to compose "sources" is immensely powerful
 >
 > The caveat is that Dataflow runner V1 doesn't support dynamic
 splitting of SDFs today and depending on how well runner v2 rollout
 happens, may never. The big plus with the legacy source API is that there
 are already bounded/unbounded source wrappers that will convert them into
 SDFs so you get all of runner v1 and runner v2 support for what the legacy
 source API can do today but give up the composability and any splitting
 support for unbounded SDFs that will come later.
 >
 > Finally, there is a way to get limited support for dynamic splitting
 of bounded and unbounded SDFs for other runners using the composability of
 SDFs and the limited depth splitting proposal[1].
 >
 > 1:
 https://docs.google.com/document/d/1cKOB9ToasfYs1kLWQgffzvIbJx2Smy4svlodPRhFrk4/edit#heading=h.wkwslng744mv
 >
 > On Fri, May 15, 2020 at 7:08 AM Steve Niemitz 
 wrote:
 >>
 >> I'm going to be writing a new IO (in java) for reading files in a
 custom format, and want to make it splittable.  It seems like I have a
 choice between the "legacy" source API, and newer experimental SDF API.  Is
 there any guidance on which I

Re: Apache Beam application to Season of Docs 2020

2020-05-15 Thread Aizhamal Nurmamat kyzy
@all

We are receiving a few emails from interested applicants - feel no pressure
to respond to them. I will be monitoring the dev list and respond
accordingly. If they have specific questions regarding either of the
projects, I will direct them to the mentors.

Stay safe,
Aizhamal

On Mon, May 11, 2020 at 6:55 PM Pablo Estrada  wrote:

> I'll be happy to mentor for improvements to the Capability Matrix. There
> had been some discussions about that earlier.
> +Kenneth Knowles  +Robert Bradshaw  
> +Thomas
> Weise   I may set a quick meeting with each of you to get
> your thoughts about Capability Matrix improvements. (also happy to welcome
> an extra mentor if you feel up to it : ))
>
> On Mon, May 11, 2020 at 5:16 PM Kyle Weaver  wrote:
>
>> Cool! I signed up as a mentor.
>>
>> On Mon, May 11, 2020 at 4:56 PM Aizhamal Nurmamat kyzy <
>> aizha...@apache.org> wrote:
>>
>>> PS: I am registered as a mentor too, happy to onboard the tech
>>> writers to The Apache Way and Beam community processes when time comes.
>>>
>>> On Mon, May 11, 2020 at 1:52 PM Aizhamal Nurmamat kyzy <
>>> aizha...@apache.org> wrote:
>>>
 Apache Beam got accepted into Season of Docs, yay! [1]

 @Kyle/Pablo, could you please fill out the mentor registration form by
 tomorrow morning [2]?

 Is there anyone else interested in becoming a mentor for either
 of the 2 projects [3]? The program requires at least two open source
 mentors for each technical writer in case we get 2.

 [1] https://developers.google.com/season-of-docs/docs/participants/
 [2]
 https://docs.google.com/forms/d/e/1FAIpQLSfMZ3yCf24PFUbvpSOkVy4sZUJFY5oS7HbGyXTNXzFg2btp4Q/viewform
 [3]
 https://cwiki.apache.org/confluence/display/BEAM/Google+Season+of+Docs


 On Tue, May 5, 2020 at 5:31 PM Pablo Estrada 
 wrote:

> I think I am willing to help with Project 2.
>
> On Tue, May 5, 2020 at 2:01 PM Aizhamal Nurmamat kyzy <
> aizha...@apache.org> wrote:
>
>> Thank you, Kyle! really appreciate it! I will add you as a mentor
>> into the cwiki page, and let you know if Beam gets accepted to the 
>> program
>> on May 11th.
>>
>> On Tue, May 5, 2020 at 12:55 PM Kyle Weaver 
>> wrote:
>>
>>> Thanks Aizhamal! I would be willing to help with project 1.
>>>
>>> On Mon, May 4, 2020 at 5:04 PM Aizhamal Nurmamat kyzy <
>>> aizha...@apache.org> wrote:
>>>
 Hi all,

 I have submitted the application to the Season of Docs program with
 the project ideas we have developed last year [1]. I learnt about its
 deadline a few hours ago and didn't want to miss it.

 Feel free to add more project ideas (or edit the current ones)
 until May 7th.

 If Beam gets approved, we will get 1 or 2 experienced technical
 writers to help us document community processes or some Beam features. 
 Is
 anyone else willing to mentor for these projects?

 [1]
 https://cwiki.apache.org/confluence/display/BEAM/Google+Season+of+Docs

>>>


[DISCUSS] New process for proposing ApacheBeam tweets

2020-05-15 Thread Aizhamal Nurmamat kyzy
Hi all,

I wanted to propose some improvements to the existing process of proposing
tweets for the Apache Beam Twitter account.

Currently the process requires people to request edit access, and then add
tweets on the spreadsheet [1]. I use it a lot because I know the process
well, but I think this may limit newcomers from proposing changes, and can
make things difficult for them.
I am proposing to use a Google Form (see example[2]), which will populare
the same spreadsheet. The advantages of this is that it will be open for
everyone, and it will be easier to subscribe to notifications every time
someone adds a new tweet proposal.

The review process by the PMC would remain in place as-is.

What does everyone think?
Thanks
Aizhamal

[1] https://s.apache.org/beam-tweets
[2]
https://docs.google.com/forms/d/e/1FAIpQLSepI3zVZNmA9RUAzAnhH45IZU48uCt0qmPPYyZOEZCFONQZ6w/viewform


Re: Removing DATETIME in Java Schemas

2020-05-15 Thread Reuven Lax
I would not describe the base type as the "wire type." If that were true,
then the only base type we should support should be byte array.

My simple point is that this is no different than normal schema fields. You
will find many normal schemas containing data encoded into other field
types. You will also find this in SQL databases. If naming these fields
with a logical type causes us to throw up our hands and declare that they
can't be introspectable, then our users will likely just avoid logical
types and encode this data into their own LONG and STRING fields. It will
also mean that SQL will not work well over Avro or Proto input.

Reuven

On Fri, May 15, 2020 at 2:19 PM Andrew Pilloud  wrote:

> My understanding is that the base type is effectively the wire format at
> the type, where the logical type is the in-memory representation for Java.
> For org.joda.time.Instant, this is just a wrapper around the underlying
> Long. However for the Date logical type, the LocalDate type has struct as
> the in-memory representation. There is a performance penalty to this
> conversion and I think there are cases where users (SQL for example) would
> want a fast path that skips the serialization/deserialization steps if
> possible.
>
> As for more complex logical types, I would urge caution in making
> assumptions around the base types. I expect there will be base types of
> strings containing JSON or byte arrays containing protobuf or avro. ZetaSQL
> has a civil time module that packs a struct into a Long. All of these are
> things you can write SQL around, but I think the users might be somewhat
> surprised when they come out less than usable by default.
>
> On Fri, May 15, 2020 at 2:12 PM Reuven Lax  wrote:
>
>>
>>
>> On Fri, Apr 24, 2020 at 11:56 AM Brian Hulette 
>> wrote:
>>
>>> When we created the portable representation of schemas last summer we
>>> intentionally did not include DATETIME as a primitive type [1], even though
>>> it already existed as a primitive type in Java [2]. There seemed to be
>>> consensus around a couple of points: 1) At the very least DATETIME is a
>>> poor name and it should be renamed to INSTANT or TIMESTAMP, and 2) a
>>> logical type is better suited for this purpose.
>>>
>>> Since then, it's been my intention to replace Java's DATETIME with a
>>> MillisInstant logical type backed by Long milliseconds since the epoch (see
>>> BEAM-7554 [3]). I've finally managed to get a PR together that does so (and
>>> keeps tests passing): https://github.com/apache/beam/pull/11456
>>>
>>> There could be a couple of concerns with this PR that I think would be
>>> better discussed here rather than on github.
>>>
>>>
>>> ## Breaking changes in low-level APIs
>>> The PR includes some breaking changes in public, low-level schema APIs:
>>> It removes DATETIME from the TypeName enum [4], and it will also change the
>>> type returned by Row#getBaseValue for DATETIME/MillisInstant fields (Long
>>> milliseconds since epoch rather than org.joda.time.Instant). Both of these
>>> changes have the potential to break user code. That being said I think the
>>> risk is justified for a few reasons:
>>> - These are lower-level APIs that we don't intend for most users to use.
>>> The preferred higher-level APIs (SQL, Schema transforms, and inferred
>>> schemas for user types) should seamlessly transition over to the new
>>> MillisInstant logical type.
>>> - Schemas are an experimental feature.
>>> - We can clearly document this breaking change in the release that
>>> includes it.
>>>
>>
>> I am a bit worried about this. Schemas have been marked experimental for.
>> almost two years, and I've found that many Beam users are using them. If we
>> make a breaking change, can we make sure that it is extremely clear to
>> users how to fix their code? I'm afraid we are in the situation now where
>> the code is theoretically experimental but practically is widely used.
>>
>>
>>
>>>
>>>
>>> ## Mixing joda and java 8 time
>>> The NanosInstant logical type that Alex added uses java.time.Instant as
>>> it's input type, while my MillisInstant type uses org.joda.time.Instant for
>>> compatibility with the rest of Beam and the previous DATETIME primitive
>>> type. It feels weird, but it also makes a certain sort of sense to use joda
>>> time (which has millisecond precision) for MillisInstant, and java 8 time
>>> (which has nanos) for NanosInstant. Also, the choice shouldn't have a
>>> significant effect on end users - the schema inference code could generate
>>> conversions between java 8 time and joda time (as we already do for
>>> converting between various joda time types [5]) so user types can use
>>> either one.
>>>
>>
>> +1
>>
>>
>>>
>>>
>>> ## Arbitrary Logical Types and SQL
>>> Previously much of the SQL code was written to operate on the _base
>>> type_ value for any logical types. So for the new MillisInstant type, SQL
>>> would attempt to operate on the underlying Long, rather than on a
>>> org.joda.time.Instant instance

Re: Removing DATETIME in Java Schemas

2020-05-15 Thread Andrew Pilloud
My understanding is that the base type is effectively the wire format at
the type, where the logical type is the in-memory representation for Java.
For org.joda.time.Instant, this is just a wrapper around the underlying
Long. However for the Date logical type, the LocalDate type has struct as
the in-memory representation. There is a performance penalty to this
conversion and I think there are cases where users (SQL for example) would
want a fast path that skips the serialization/deserialization steps if
possible.

As for more complex logical types, I would urge caution in making
assumptions around the base types. I expect there will be base types of
strings containing JSON or byte arrays containing protobuf or avro. ZetaSQL
has a civil time module that packs a struct into a Long. All of these are
things you can write SQL around, but I think the users might be somewhat
surprised when they come out less than usable by default.

On Fri, May 15, 2020 at 2:12 PM Reuven Lax  wrote:

>
>
> On Fri, Apr 24, 2020 at 11:56 AM Brian Hulette 
> wrote:
>
>> When we created the portable representation of schemas last summer we
>> intentionally did not include DATETIME as a primitive type [1], even though
>> it already existed as a primitive type in Java [2]. There seemed to be
>> consensus around a couple of points: 1) At the very least DATETIME is a
>> poor name and it should be renamed to INSTANT or TIMESTAMP, and 2) a
>> logical type is better suited for this purpose.
>>
>> Since then, it's been my intention to replace Java's DATETIME with a
>> MillisInstant logical type backed by Long milliseconds since the epoch (see
>> BEAM-7554 [3]). I've finally managed to get a PR together that does so (and
>> keeps tests passing): https://github.com/apache/beam/pull/11456
>>
>> There could be a couple of concerns with this PR that I think would be
>> better discussed here rather than on github.
>>
>>
>> ## Breaking changes in low-level APIs
>> The PR includes some breaking changes in public, low-level schema APIs:
>> It removes DATETIME from the TypeName enum [4], and it will also change the
>> type returned by Row#getBaseValue for DATETIME/MillisInstant fields (Long
>> milliseconds since epoch rather than org.joda.time.Instant). Both of these
>> changes have the potential to break user code. That being said I think the
>> risk is justified for a few reasons:
>> - These are lower-level APIs that we don't intend for most users to use.
>> The preferred higher-level APIs (SQL, Schema transforms, and inferred
>> schemas for user types) should seamlessly transition over to the new
>> MillisInstant logical type.
>> - Schemas are an experimental feature.
>> - We can clearly document this breaking change in the release that
>> includes it.
>>
>
> I am a bit worried about this. Schemas have been marked experimental for.
> almost two years, and I've found that many Beam users are using them. If we
> make a breaking change, can we make sure that it is extremely clear to
> users how to fix their code? I'm afraid we are in the situation now where
> the code is theoretically experimental but practically is widely used.
>
>
>
>>
>>
>> ## Mixing joda and java 8 time
>> The NanosInstant logical type that Alex added uses java.time.Instant as
>> it's input type, while my MillisInstant type uses org.joda.time.Instant for
>> compatibility with the rest of Beam and the previous DATETIME primitive
>> type. It feels weird, but it also makes a certain sort of sense to use joda
>> time (which has millisecond precision) for MillisInstant, and java 8 time
>> (which has nanos) for NanosInstant. Also, the choice shouldn't have a
>> significant effect on end users - the schema inference code could generate
>> conversions between java 8 time and joda time (as we already do for
>> converting between various joda time types [5]) so user types can use
>> either one.
>>
>
> +1
>
>
>>
>>
>> ## Arbitrary Logical Types and SQL
>> Previously much of the SQL code was written to operate on the _base type_
>> value for any logical types. So for the new MillisInstant type, SQL would
>> attempt to operate on the underlying Long, rather than on a
>> org.joda.time.Instant instance. Thus when I switched over to a
>> MillisInstant logical type as the default for SQL date and time types any
>> tests that used them began failing with ClassCastExceptions.
>>
>> My solution was just to update SQL code to only ever reference the input
>> type (i.e. org.joda.time.Instant) for logical types. A good example of this
>> type of change is in BeamAggregationRel [6].
>>
>> My change does pass all of the SQL tests (including internal Google
>> tests), but I'm a little concerned that using the base type throughout SQL
>> was a conscious decision that I'm stomping on. In theory, it could ensure
>> that SqlTransform would be able to operate on user types that contain
>> custom user logical types, without requiring SqlTransform to understand
>> those types. This is just speculation though, the

Re: Writing a new IO on beam, should I use the source API or SDF?

2020-05-15 Thread Steve Niemitz
ah! ok awesome, I think that was the piece I was misunderstanding.  So I
_can_ use a SDF to split the work initially (like I was manually doing in
#1), but it just won't be further split dynamically on dataflow v1 right
now.  Is my understanding there correct?

On Fri, May 15, 2020 at 5:03 PM Luke Cwik  wrote:

> #3 is the best when you implement @SplitRestriction on the SDF.
>
> The size of each restriction is used to better balance the splits within
> Dataflow runner v2 so it is less susceptible to the too many or unbalanced
> split problem.
> For example, if you have 4 workers and make 20 splits, the splits will be
> grouped based upon their sizes. So if 19 of those splits are small and 1 is
> big, the 1 will execute by itself while the 19 will be done by the 3 other
> workers.
>
> Also, dynamic work rebalancing isn't meant to replace those initial splits
> but helps a lot with worker rebalancing since a few workers are usually
> stragglers and will need some help at the end of a pipeline.
>
> On Fri, May 15, 2020 at 1:54 PM Steve Niemitz  wrote:
>
>> Thanks for the replies so far.  I should have specifically mentioned
>> above, I am building a bounded source.
>>
>> While I was thinking this through, I realized that I might not actually
>> need any fancy splitting, since I can calculate all my split points up
>> front.  I think this goes well with Ismaël's suggestion as well.
>>
>> I'm curious what the pros and cons would be of these options:
>> 1) Presplit each file into N pieces (based on a target bundle size,
>> similar to how it looks like the avro reader does it), using a
>> standard DoFn to read each split.
>> 2) Presplit, but use a SDF to support further splitting once it's
>> supported in dataflow.  (this would also help if I have files that can't be
>> split up front)
>> 3) Don't pre-split, but use a SDF.
>> 4) Use the source API
>>
>> I think we've covered 2 and 4 pretty well already, but curious
>> specifically about the pre-split approach.  Thanks again so far!
>>
>> On Fri, May 15, 2020 at 1:11 PM Ismaël Mejía  wrote:
>>
>>> For the Bounded case if you do not have a straight forward way to split
>>> at
>>> fractions, or simply if you do not care about Dynamic Work Rebalancing.
>>> You can
>>> get away implementing a simple DoFn (without Restrictions) based
>>> implementation
>>> and evolve from it. More and more IOs at Beam are becoming DoFn based
>>> (even if
>>> not SDF) because you win the composability advantages.
>>>
>>> An interesting question is when should we start deprecating the Source
>>> API and
>>> encourage people to write only DoFn based IOs. I think we are getting to
>>> the
>>> maturity point where we can start this discussion.
>>>
>>> On Fri, May 15, 2020 at 4:59 PM Luke Cwik  wrote:
>>> >
>>> > If it is an unbounded source then SDF is a winner since you are not
>>> giving up anything with it when compared to the legacy UnboundedSource API
>>> since Dataflow doesn't support dynamic splitting of unbounded SDFs or
>>> UnboundedSources (only initial splitting). You gain the ability to compose
>>> sources and the initial splitting is done at pipeline execution for SDFs vs
>>> pipeline construction time for UnboundedSource.
>>> >
>>> > If it is bounded, my gut is to still go with SDF since:
>>> > * Dataflow runner V2 supports SDF fully
>>> > * The Java/Python SDF APIs have gone through the majority of churn
>>> already, there are some minor clean-ups and then I would like to remove the
>>> @Experimental annotations from them after a discussion on dev@ about it
>>> > * Being able to compose "sources" is immensely powerful
>>> >
>>> > The caveat is that Dataflow runner V1 doesn't support dynamic
>>> splitting of SDFs today and depending on how well runner v2 rollout
>>> happens, may never. The big plus with the legacy source API is that there
>>> are already bounded/unbounded source wrappers that will convert them into
>>> SDFs so you get all of runner v1 and runner v2 support for what the legacy
>>> source API can do today but give up the composability and any splitting
>>> support for unbounded SDFs that will come later.
>>> >
>>> > Finally, there is a way to get limited support for dynamic splitting
>>> of bounded and unbounded SDFs for other runners using the composability of
>>> SDFs and the limited depth splitting proposal[1].
>>> >
>>> > 1:
>>> https://docs.google.com/document/d/1cKOB9ToasfYs1kLWQgffzvIbJx2Smy4svlodPRhFrk4/edit#heading=h.wkwslng744mv
>>> >
>>> > On Fri, May 15, 2020 at 7:08 AM Steve Niemitz 
>>> wrote:
>>> >>
>>> >> I'm going to be writing a new IO (in java) for reading files in a
>>> custom format, and want to make it splittable.  It seems like I have a
>>> choice between the "legacy" source API, and newer experimental SDF API.  Is
>>> there any guidance on which I should use?  I can likely tolerate some API
>>> churn as well in the SDF APIs.
>>> >>
>>> >> My target runner is dataflow.
>>> >>
>>> >> Thanks!
>>>
>>


Re: Removing DATETIME in Java Schemas

2020-05-15 Thread Reuven Lax
On Fri, Apr 24, 2020 at 11:56 AM Brian Hulette  wrote:

> When we created the portable representation of schemas last summer we
> intentionally did not include DATETIME as a primitive type [1], even though
> it already existed as a primitive type in Java [2]. There seemed to be
> consensus around a couple of points: 1) At the very least DATETIME is a
> poor name and it should be renamed to INSTANT or TIMESTAMP, and 2) a
> logical type is better suited for this purpose.
>
> Since then, it's been my intention to replace Java's DATETIME with a
> MillisInstant logical type backed by Long milliseconds since the epoch (see
> BEAM-7554 [3]). I've finally managed to get a PR together that does so (and
> keeps tests passing): https://github.com/apache/beam/pull/11456
>
> There could be a couple of concerns with this PR that I think would be
> better discussed here rather than on github.
>
>
> ## Breaking changes in low-level APIs
> The PR includes some breaking changes in public, low-level schema APIs: It
> removes DATETIME from the TypeName enum [4], and it will also change the
> type returned by Row#getBaseValue for DATETIME/MillisInstant fields (Long
> milliseconds since epoch rather than org.joda.time.Instant). Both of these
> changes have the potential to break user code. That being said I think the
> risk is justified for a few reasons:
> - These are lower-level APIs that we don't intend for most users to use.
> The preferred higher-level APIs (SQL, Schema transforms, and inferred
> schemas for user types) should seamlessly transition over to the new
> MillisInstant logical type.
> - Schemas are an experimental feature.
> - We can clearly document this breaking change in the release that
> includes it.
>

I am a bit worried about this. Schemas have been marked experimental for.
almost two years, and I've found that many Beam users are using them. If we
make a breaking change, can we make sure that it is extremely clear to
users how to fix their code? I'm afraid we are in the situation now where
the code is theoretically experimental but practically is widely used.



>
>
> ## Mixing joda and java 8 time
> The NanosInstant logical type that Alex added uses java.time.Instant as
> it's input type, while my MillisInstant type uses org.joda.time.Instant for
> compatibility with the rest of Beam and the previous DATETIME primitive
> type. It feels weird, but it also makes a certain sort of sense to use joda
> time (which has millisecond precision) for MillisInstant, and java 8 time
> (which has nanos) for NanosInstant. Also, the choice shouldn't have a
> significant effect on end users - the schema inference code could generate
> conversions between java 8 time and joda time (as we already do for
> converting between various joda time types [5]) so user types can use
> either one.
>

+1


>
>
> ## Arbitrary Logical Types and SQL
> Previously much of the SQL code was written to operate on the _base type_
> value for any logical types. So for the new MillisInstant type, SQL would
> attempt to operate on the underlying Long, rather than on a
> org.joda.time.Instant instance. Thus when I switched over to a
> MillisInstant logical type as the default for SQL date and time types any
> tests that used them began failing with ClassCastExceptions.
>
> My solution was just to update SQL code to only ever reference the input
> type (i.e. org.joda.time.Instant) for logical types. A good example of this
> type of change is in BeamAggregationRel [6].
>
> My change does pass all of the SQL tests (including internal Google
> tests), but I'm a little concerned that using the base type throughout SQL
> was a conscious decision that I'm stomping on. In theory, it could ensure
> that SqlTransform would be able to operate on user types that contain
> custom user logical types, without requiring SqlTransform to understand
> those types. This is just speculation though, there aren't actually any
> tests verifying that functionality. Also, I don't think any such tests
> would have worked since there are several places where we explicitly check
> for particular logical types (e.g. in BeamCalcRel [7]).
>
> Personally I'm ok with this move, because I'm against the idea of
> implicitly stripping away logical types like this in schema-aware
> transforms. A logical type indicates that the value has some additional
> meaning beyond just the base type (and maybe a restricted range), and I
> don't think transforms should be able to ignore that meaning unless the
> user explicitly allows it, or first converts to the base type themselves.
>

I disagree with this personally:

Arguably, _every_ schema has "additional meaning" beyond the fields, and a
LogicalType is just a nice way of naming that and allowing the use of other
classes to manage that type. For example, Let's say you have a record
containing latitude and longitude fields - today you can easily write SQL
over these fields. Later you decide that this is a very common pattern in
your data an

Re: Writing a new IO on beam, should I use the source API or SDF?

2020-05-15 Thread Luke Cwik
#3 is the best when you implement @SplitRestriction on the SDF.

The size of each restriction is used to better balance the splits within
Dataflow runner v2 so it is less susceptible to the too many or unbalanced
split problem.
For example, if you have 4 workers and make 20 splits, the splits will be
grouped based upon their sizes. So if 19 of those splits are small and 1 is
big, the 1 will execute by itself while the 19 will be done by the 3 other
workers.

Also, dynamic work rebalancing isn't meant to replace those initial splits
but helps a lot with worker rebalancing since a few workers are usually
stragglers and will need some help at the end of a pipeline.

On Fri, May 15, 2020 at 1:54 PM Steve Niemitz  wrote:

> Thanks for the replies so far.  I should have specifically mentioned
> above, I am building a bounded source.
>
> While I was thinking this through, I realized that I might not actually
> need any fancy splitting, since I can calculate all my split points up
> front.  I think this goes well with Ismaël's suggestion as well.
>
> I'm curious what the pros and cons would be of these options:
> 1) Presplit each file into N pieces (based on a target bundle size,
> similar to how it looks like the avro reader does it), using a
> standard DoFn to read each split.
> 2) Presplit, but use a SDF to support further splitting once it's
> supported in dataflow.  (this would also help if I have files that can't be
> split up front)
> 3) Don't pre-split, but use a SDF.
> 4) Use the source API
>
> I think we've covered 2 and 4 pretty well already, but curious
> specifically about the pre-split approach.  Thanks again so far!
>
> On Fri, May 15, 2020 at 1:11 PM Ismaël Mejía  wrote:
>
>> For the Bounded case if you do not have a straight forward way to split at
>> fractions, or simply if you do not care about Dynamic Work Rebalancing.
>> You can
>> get away implementing a simple DoFn (without Restrictions) based
>> implementation
>> and evolve from it. More and more IOs at Beam are becoming DoFn based
>> (even if
>> not SDF) because you win the composability advantages.
>>
>> An interesting question is when should we start deprecating the Source
>> API and
>> encourage people to write only DoFn based IOs. I think we are getting to
>> the
>> maturity point where we can start this discussion.
>>
>> On Fri, May 15, 2020 at 4:59 PM Luke Cwik  wrote:
>> >
>> > If it is an unbounded source then SDF is a winner since you are not
>> giving up anything with it when compared to the legacy UnboundedSource API
>> since Dataflow doesn't support dynamic splitting of unbounded SDFs or
>> UnboundedSources (only initial splitting). You gain the ability to compose
>> sources and the initial splitting is done at pipeline execution for SDFs vs
>> pipeline construction time for UnboundedSource.
>> >
>> > If it is bounded, my gut is to still go with SDF since:
>> > * Dataflow runner V2 supports SDF fully
>> > * The Java/Python SDF APIs have gone through the majority of churn
>> already, there are some minor clean-ups and then I would like to remove the
>> @Experimental annotations from them after a discussion on dev@ about it
>> > * Being able to compose "sources" is immensely powerful
>> >
>> > The caveat is that Dataflow runner V1 doesn't support dynamic splitting
>> of SDFs today and depending on how well runner v2 rollout happens, may
>> never. The big plus with the legacy source API is that there are already
>> bounded/unbounded source wrappers that will convert them into SDFs so you
>> get all of runner v1 and runner v2 support for what the legacy source API
>> can do today but give up the composability and any splitting support for
>> unbounded SDFs that will come later.
>> >
>> > Finally, there is a way to get limited support for dynamic splitting of
>> bounded and unbounded SDFs for other runners using the composability of
>> SDFs and the limited depth splitting proposal[1].
>> >
>> > 1:
>> https://docs.google.com/document/d/1cKOB9ToasfYs1kLWQgffzvIbJx2Smy4svlodPRhFrk4/edit#heading=h.wkwslng744mv
>> >
>> > On Fri, May 15, 2020 at 7:08 AM Steve Niemitz 
>> wrote:
>> >>
>> >> I'm going to be writing a new IO (in java) for reading files in a
>> custom format, and want to make it splittable.  It seems like I have a
>> choice between the "legacy" source API, and newer experimental SDF API.  Is
>> there any guidance on which I should use?  I can likely tolerate some API
>> churn as well in the SDF APIs.
>> >>
>> >> My target runner is dataflow.
>> >>
>> >> Thanks!
>>
>


Re: Writing a new IO on beam, should I use the source API or SDF?

2020-05-15 Thread Steve Niemitz
Thanks for the replies so far.  I should have specifically mentioned above,
I am building a bounded source.

While I was thinking this through, I realized that I might not actually
need any fancy splitting, since I can calculate all my split points up
front.  I think this goes well with Ismaël's suggestion as well.

I'm curious what the pros and cons would be of these options:
1) Presplit each file into N pieces (based on a target bundle size, similar
to how it looks like the avro reader does it), using a standard DoFn to
read each split.
2) Presplit, but use a SDF to support further splitting once it's supported
in dataflow.  (this would also help if I have files that can't be split up
front)
3) Don't pre-split, but use a SDF.
4) Use the source API

I think we've covered 2 and 4 pretty well already, but curious specifically
about the pre-split approach.  Thanks again so far!

On Fri, May 15, 2020 at 1:11 PM Ismaël Mejía  wrote:

> For the Bounded case if you do not have a straight forward way to split at
> fractions, or simply if you do not care about Dynamic Work Rebalancing.
> You can
> get away implementing a simple DoFn (without Restrictions) based
> implementation
> and evolve from it. More and more IOs at Beam are becoming DoFn based
> (even if
> not SDF) because you win the composability advantages.
>
> An interesting question is when should we start deprecating the Source API
> and
> encourage people to write only DoFn based IOs. I think we are getting to
> the
> maturity point where we can start this discussion.
>
> On Fri, May 15, 2020 at 4:59 PM Luke Cwik  wrote:
> >
> > If it is an unbounded source then SDF is a winner since you are not
> giving up anything with it when compared to the legacy UnboundedSource API
> since Dataflow doesn't support dynamic splitting of unbounded SDFs or
> UnboundedSources (only initial splitting). You gain the ability to compose
> sources and the initial splitting is done at pipeline execution for SDFs vs
> pipeline construction time for UnboundedSource.
> >
> > If it is bounded, my gut is to still go with SDF since:
> > * Dataflow runner V2 supports SDF fully
> > * The Java/Python SDF APIs have gone through the majority of churn
> already, there are some minor clean-ups and then I would like to remove the
> @Experimental annotations from them after a discussion on dev@ about it
> > * Being able to compose "sources" is immensely powerful
> >
> > The caveat is that Dataflow runner V1 doesn't support dynamic splitting
> of SDFs today and depending on how well runner v2 rollout happens, may
> never. The big plus with the legacy source API is that there are already
> bounded/unbounded source wrappers that will convert them into SDFs so you
> get all of runner v1 and runner v2 support for what the legacy source API
> can do today but give up the composability and any splitting support for
> unbounded SDFs that will come later.
> >
> > Finally, there is a way to get limited support for dynamic splitting of
> bounded and unbounded SDFs for other runners using the composability of
> SDFs and the limited depth splitting proposal[1].
> >
> > 1:
> https://docs.google.com/document/d/1cKOB9ToasfYs1kLWQgffzvIbJx2Smy4svlodPRhFrk4/edit#heading=h.wkwslng744mv
> >
> > On Fri, May 15, 2020 at 7:08 AM Steve Niemitz 
> wrote:
> >>
> >> I'm going to be writing a new IO (in java) for reading files in a
> custom format, and want to make it splittable.  It seems like I have a
> choice between the "legacy" source API, and newer experimental SDF API.  Is
> there any guidance on which I should use?  I can likely tolerate some API
> churn as well in the SDF APIs.
> >>
> >> My target runner is dataflow.
> >>
> >> Thanks!
>


Re: Removing DATETIME in Java Schemas

2020-05-15 Thread Kenneth Knowles
This seems like a good idea. This stuff is all still marked "experimental"
for exactly this reason. This is a case where the name fits perfectly. Both
SQL and schemas are new and still working towards a form that can be
supported indefinitely without layers of workarounds that will never
quiesce. I think the user pain here is minimal. I don't think a deprecation
cycle is terrifically important, but we could do one release.

As for SQL, I believe the following:

 - we have multiple SQL dialects with different type systems
 - our SQL planner/runtime has also its own type system
 - when you are converting a Beam schema to pass to SqlTransform, you need
to provide a conversion from the Beam schema type system to the SQL
dialect's type system (sometimes the conversion is a noop)

So my thinking about the conversion:

 - it is OK to convert known logical types like MillisInstant or
NanosInstant to a ZetaSQL "TIMESTAMP" or Calcite SQL "TIMESTAMP WITH LOCAL
TIME ZONE"
 - it is OK to convert unknown logical types into their representation type
 - coming out the other side of a SQL statement, a user should expect the
type to be a row where every field has a type from the SQL dialect; the
input types are gone

I've heard some arguments (on list? offline?) that operating directly on
the representation of a logical type is wrong, because you can't really
know that what you are doing results in a valid value. I agree with that,
basically. That's why I frame it as a one-way conversion from the input
type to the SQL dialect's type. You aren't operating on the logical type
any more.

I feel I am probably forgetting some important details about the "logical
type inlining" discussion, so others should speak up if they have more to
add.

There is also a backwards-compatibility lesson here: don't show users your
enums if you can avoid it. If DATETIME had always been accessed via a
(trivial) factory method then changing the implementation would be
backwards compatible.

Kenn

On Fri, Apr 24, 2020 at 11:56 AM Brian Hulette  wrote:

> When we created the portable representation of schemas last summer we
> intentionally did not include DATETIME as a primitive type [1], even though
> it already existed as a primitive type in Java [2]. There seemed to be
> consensus around a couple of points: 1) At the very least DATETIME is a
> poor name and it should be renamed to INSTANT or TIMESTAMP, and 2) a
> logical type is better suited for this purpose.
>
> Since then, it's been my intention to replace Java's DATETIME with a
> MillisInstant logical type backed by Long milliseconds since the epoch (see
> BEAM-7554 [3]). I've finally managed to get a PR together that does so (and
> keeps tests passing): https://github.com/apache/beam/pull/11456
>
> There could be a couple of concerns with this PR that I think would be
> better discussed here rather than on github.
>
>
> ## Breaking changes in low-level APIs
> The PR includes some breaking changes in public, low-level schema APIs: It
> removes DATETIME from the TypeName enum [4], and it will also change the
> type returned by Row#getBaseValue for DATETIME/MillisInstant fields (Long
> milliseconds since epoch rather than org.joda.time.Instant). Both of these
> changes have the potential to break user code. That being said I think the
> risk is justified for a few reasons:
> - These are lower-level APIs that we don't intend for most users to use.
> The preferred higher-level APIs (SQL, Schema transforms, and inferred
> schemas for user types) should seamlessly transition over to the new
> MillisInstant logical type.
> - Schemas are an experimental feature.
> - We can clearly document this breaking change in the release that
> includes it.
>
>
> ## Mixing joda and java 8 time
> The NanosInstant logical type that Alex added uses java.time.Instant as
> it's input type, while my MillisInstant type uses org.joda.time.Instant for
> compatibility with the rest of Beam and the previous DATETIME primitive
> type. It feels weird, but it also makes a certain sort of sense to use joda
> time (which has millisecond precision) for MillisInstant, and java 8 time
> (which has nanos) for NanosInstant. Also, the choice shouldn't have a
> significant effect on end users - the schema inference code could generate
> conversions between java 8 time and joda time (as we already do for
> converting between various joda time types [5]) so user types can use
> either one.
>
>
> ## Arbitrary Logical Types and SQL
> Previously much of the SQL code was written to operate on the _base type_
> value for any logical types. So for the new MillisInstant type, SQL would
> attempt to operate on the underlying Long, rather than on a
> org.joda.time.Instant instance. Thus when I switched over to a
> MillisInstant logical type as the default for SQL date and time types any
> tests that used them began failing with ClassCastExceptions.
>
> My solution was just to update SQL code to only ever reference the input
> type (i.e. org.joda.t

Re: [DISCUSS] Dealing with @Ignored tests

2020-05-15 Thread Luke Cwik
For the ones without the label, someone would need to use blame and track
back to why it was sickbayed.

On Fri, May 15, 2020 at 1:08 PM Kenneth Knowles  wrote:

> There are 101 instances of @Ignore, and I've listed them below. A few
> takeaways:
>
>  - highly concentrated in ZetaSQL, and then second tier in various state
> tests specific to a runner
>  - there are not that many overall, so I'm not sure a report will add much
>  - they do not all have Jiras
>  - they do not even all have any explanation at all (some don't leave out
> the string parameter, but have an empty string!)
>
> Having a checkstyle that there is a Jira attached seems nice. Then we
> could easily grep out the Jiras and not depend on the "sickbay" label.
>
> Raw data (to see the individual items, just do the grep and not the
> processing)
>
>   % grep --recursive --exclude-dir build '@Ignore' . | cut -d ' ' -f 1 |
> sort | uniq -c | sort -r
>   27
> ./sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSQLDialectSpecTest.java:
>   11
> ./runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/FlinkBroadcastStateInternalsTest.java:
>7
> ./runners/spark/src/test/java/org/apache/beam/runners/spark/stateful/SparkStateInternalsTest.java:
>7
> ./runners/apex/src/test/java/org/apache/beam/runners/apex/translation/utils/ApexStateInternalsTest.java:
>4
> ./sdks/java/testing/nexmark/src/test/java/org/apache/beam/sdk/nexmark/queries/QueryTest.java:
>4
> ./runners/spark/src/test/java/org/apache/beam/runners/spark/structuredstreaming/StructuredStreamingPipelineStateTest.java:
>2
> ./sdks/java/io/xml/src/test/java/org/apache/beam/sdk/io/xml/XmlSourceTest.java:
>2
> ./sdks/java/io/mqtt/src/test/java/org/apache/beam/sdk/io/mqtt/MqttIOTest.java:
>2
> ./sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/pubsub/PubsubJsonIT.java:
>2
> ./sdks/java/extensions/sql/jdbc/src/test/java/org/apache/beam/sdk/extensions/sql/jdbc/BeamSqlLineIT.java:
>2
> ./sdks/java/extensions/euphoria/src/test/java/org/apache/beam/sdk/extensions/euphoria/core/testkit/ReduceByKeyTest.java:
>2
> ./sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java:
>2
> ./sdks/java/core/src/test/java/org/apache/beam/sdk/coders/PCollectionCustomCoderTest.java:
>2
> ./runners/direct-java/src/test/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutorTest.java:
>1
> ./sdks/java/testing/nexmark/src/test/java/org/apache/beam/sdk/nexmark/sources/UnboundedEventSourceTest.java:
>1
> ./sdks/java/testing/nexmark/src/test/java/org/apache/beam/sdk/nexmark/queries/sql/SqlQuery5Test.java:
>1
> ./sdks/java/io/kudu/src/test/java/org/apache/beam/sdk/io/kudu/KuduIOTest.java:
>1
> ./sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java:
>1
> ./sdks/java/io/hadoop-file-system/src/test/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemTest.java:
>1
> ./sdks/java/io/clickhouse/src/test/java/org/apache/beam/sdk/io/clickhouse/ClickHouseIOTest.java:
>1
> ./sdks/java/io/amazon-web-services2/src/test/java/org/apache/beam/sdk/io/aws2/dynamodb/DynamoDBIOTest.java:@Ignore
> ("[BEAM-7794]
>1
> ./sdks/java/io/amazon-web-services/src/test/java/org/apache/beam/sdk/io/aws/dynamodb/DynamoDBIOTest.java:@Ignore
> ("[BEAM-7794]
>1
> ./sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/KafkaCSVTableIT.java:@Ignore
> ("https://issues.apache.org/jira/projects/BEAM/issues/BEAM-7523";)
>1
> ./sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/JdbcDriverTest.java:
>1
> ./sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlExplainTest.java:
>1
> ./sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslSqlStdOperatorsTest.java:
>1
> ./sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamComplexTypeTest.java:
>1
> ./sdks/java/extensions/euphoria/src/test/java/org/apache/beam/sdk/extensions/euphoria/core/testkit/JoinTest.java:
>1
> ./sdks/java/extensions/euphoria/src/test/java/org/apache/beam/sdk/extensions/euphoria/core/docs/DocumentationExamplesTest.java:
>1
> ./sdks/java/core/src/test/java/org/apache/beam/sdk/values/PDoneTest.java:
>1
> ./sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/WatchTest.java:
>1
> ./sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SplittableDoFnTest.java:
>1
> ./sdks/java/core/src/test/java/org/apache/beam/sdk/io/BoundedReadFromUnboundedSourceTest.java:
>1
> ./sdks/java/core/src/test/java/org/apache/beam/sdk/coders/RowCoderTest.java:
>1
> ./runners/spark/src/test/java/org/apache/beam/runners/spark/structuredstreaming/translation/streaming/SimpleSourceTest.java:
>1
> ./runners/spark/src/test/java/org/apache/beam/runners/spark/structuredstreaming/aggregators/metri

Re: [DISCUSS] Dealing with @Ignored tests

2020-05-15 Thread Kenneth Knowles
There are 101 instances of @Ignore, and I've listed them below. A few
takeaways:

 - highly concentrated in ZetaSQL, and then second tier in various state
tests specific to a runner
 - there are not that many overall, so I'm not sure a report will add much
 - they do not all have Jiras
 - they do not even all have any explanation at all (some don't leave out
the string parameter, but have an empty string!)

Having a checkstyle that there is a Jira attached seems nice. Then we could
easily grep out the Jiras and not depend on the "sickbay" label.

Raw data (to see the individual items, just do the grep and not the
processing)

  % grep --recursive --exclude-dir build '@Ignore' . | cut -d ' ' -f 1 |
sort | uniq -c | sort -r
  27
./sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSQLDialectSpecTest.java:
  11
./runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/FlinkBroadcastStateInternalsTest.java:
   7
./runners/spark/src/test/java/org/apache/beam/runners/spark/stateful/SparkStateInternalsTest.java:
   7
./runners/apex/src/test/java/org/apache/beam/runners/apex/translation/utils/ApexStateInternalsTest.java:
   4
./sdks/java/testing/nexmark/src/test/java/org/apache/beam/sdk/nexmark/queries/QueryTest.java:
   4
./runners/spark/src/test/java/org/apache/beam/runners/spark/structuredstreaming/StructuredStreamingPipelineStateTest.java:
   2
./sdks/java/io/xml/src/test/java/org/apache/beam/sdk/io/xml/XmlSourceTest.java:
   2
./sdks/java/io/mqtt/src/test/java/org/apache/beam/sdk/io/mqtt/MqttIOTest.java:
   2
./sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/pubsub/PubsubJsonIT.java:
   2
./sdks/java/extensions/sql/jdbc/src/test/java/org/apache/beam/sdk/extensions/sql/jdbc/BeamSqlLineIT.java:
   2
./sdks/java/extensions/euphoria/src/test/java/org/apache/beam/sdk/extensions/euphoria/core/testkit/ReduceByKeyTest.java:
   2
./sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java:
   2
./sdks/java/core/src/test/java/org/apache/beam/sdk/coders/PCollectionCustomCoderTest.java:
   2
./runners/direct-java/src/test/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutorTest.java:
   1
./sdks/java/testing/nexmark/src/test/java/org/apache/beam/sdk/nexmark/sources/UnboundedEventSourceTest.java:
   1
./sdks/java/testing/nexmark/src/test/java/org/apache/beam/sdk/nexmark/queries/sql/SqlQuery5Test.java:
   1
./sdks/java/io/kudu/src/test/java/org/apache/beam/sdk/io/kudu/KuduIOTest.java:
   1
./sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java:
   1
./sdks/java/io/hadoop-file-system/src/test/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemTest.java:
   1
./sdks/java/io/clickhouse/src/test/java/org/apache/beam/sdk/io/clickhouse/ClickHouseIOTest.java:
   1
./sdks/java/io/amazon-web-services2/src/test/java/org/apache/beam/sdk/io/aws2/dynamodb/DynamoDBIOTest.java:@Ignore
("[BEAM-7794]
   1
./sdks/java/io/amazon-web-services/src/test/java/org/apache/beam/sdk/io/aws/dynamodb/DynamoDBIOTest.java:@Ignore
("[BEAM-7794]
   1
./sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/KafkaCSVTableIT.java:@Ignore
("https://issues.apache.org/jira/projects/BEAM/issues/BEAM-7523";)
   1
./sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/JdbcDriverTest.java:
   1
./sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlExplainTest.java:
   1
./sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslSqlStdOperatorsTest.java:
   1
./sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamComplexTypeTest.java:
   1
./sdks/java/extensions/euphoria/src/test/java/org/apache/beam/sdk/extensions/euphoria/core/testkit/JoinTest.java:
   1
./sdks/java/extensions/euphoria/src/test/java/org/apache/beam/sdk/extensions/euphoria/core/docs/DocumentationExamplesTest.java:
   1
./sdks/java/core/src/test/java/org/apache/beam/sdk/values/PDoneTest.java:
   1
./sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/WatchTest.java:
   1
./sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SplittableDoFnTest.java:
   1
./sdks/java/core/src/test/java/org/apache/beam/sdk/io/BoundedReadFromUnboundedSourceTest.java:
   1
./sdks/java/core/src/test/java/org/apache/beam/sdk/coders/RowCoderTest.java:
   1
./runners/spark/src/test/java/org/apache/beam/runners/spark/structuredstreaming/translation/streaming/SimpleSourceTest.java:
   1
./runners/spark/src/test/java/org/apache/beam/runners/spark/structuredstreaming/aggregators/metrics/sink/SparkMetricsSinkTest.java:@Ignore
("Has
   1
./runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/BatchStatefulParDoOverridesTest.java:
   1
./runners/direct-java/src/test/java/org/apache/beam/runners/direct/WatermarkManagerTest.java:
   1
./runners/apex/src/test/java/org/apache/beam/runners/apex/transla

Re: Java transform executed as Python transform?

2020-05-15 Thread Brian Hulette
I just started having an issue that looks similar this morning. I'm trying
out running the Python SqlTransform tests with fn_runner (currently they
only execute continuously on Flink and Spark), but I'm running into
occasional failures. The errors always come from either python or java
attempting to hydrate a coder that's native for the other SDK. I'm not sure
but maybe this is also caused by attempting to execute a transform in the
wrong environment?

One interesting data point is that this issue can be flakey.
SqlTransformTest.test_generate_data fails ~25% of the time when run on the
fn_api_runner. SqlTransformTest.test_tagged_join on the other hand just
fails 100% of the time.
Neither of these tests flakes for me at all when using --runner=FlinkRunner
(out of 10 runs).

Maybe we're seeing different manifestations of the same bug?

On Fri, May 15, 2020 at 3:08 AM Paweł Urbanowicz <
pawel.urbanow...@polidea.com> wrote:

> Hey,
> I created a transform method in Java and now I want to use it in Python
> using Cross-language.
>
> I got pretty stuck with the following problem:
> p
> | GenerateSequence(...)
> |ExternalTransform(...)
> *=> is working like a charm *
>
>
> p
> | Create(...)
> | ExternalTransform(...)
> *=> getting assert pardo_payload.do_fn.urn ==
> python_urns.PICKLED_DOFN_INFO *
>
> Based on https://www.mail-archive.com/user@beam.apache.org/msg04887.html
> it seems like a Create transform is being registered as a Java transform
> but executed as Python transform.
> Do you have any idea what is going on here? Thanks a lot for any help :)
>
>
> Traceback (most recent call last):
> File
> "/Users/urban/projects/beam/sdks/python/apache_beam/io/external/snowflake_test.py",
> line 92, in test_snowflake_write_read
> self.run_write()
> File
> "/Users/urban/projects/beam/sdks/python/apache_beam/io/external/snowflake_test.py",
> line 129, in run_write
> expansion_service=self.expansion_service,
> File "/Users/urban/projects/beam/sdks/python/apache_beam/pipeline.py",
> line 528, in __exit__
> self.run().wait_until_finish()
> File
> "/Users/urban/projects/beam/sdks/python/apache_beam/testing/test_pipeline.py",
> line 112, in run
> False if self.not_use_test_runner_api else test_runner_api))
> File "/Users/urban/projects/beam/sdks/python/apache_beam/pipeline.py",
> line 501, in run
> self._options).run(False)
> File "/Users/urban/projects/beam/sdks/python/apache_beam/pipeline.py",
> line 858, in from_runner_api
> p.transforms_stack = [context.transforms.get_by_id(root_transform_id)]
> File
> "/Users/urban/projects/beam/sdks/python/apache_beam/runners/pipeline_context.py",
> line 103, in get_by_id
> self._id_to_proto[id], self._pipeline_context)
> File "/Users/urban/projects/beam/sdks/python/apache_beam/pipeline.py",
> line 1231, in from_runner_api
> part = context.transforms.get_by_id(transform_id)
> File
> "/Users/urban/projects/beam/sdks/python/apache_beam/runners/pipeline_context.py",
> line 103, in get_by_id
> self._id_to_proto[id], self._pipeline_context)
> File "/Users/urban/projects/beam/sdks/python/apache_beam/pipeline.py",
> line 1231, in from_runner_api
> part = context.transforms.get_by_id(transform_id)
> File
> "/Users/urban/projects/beam/sdks/python/apache_beam/runners/pipeline_context.py",
> line 103, in get_by_id
> self._id_to_proto[id], self._pipeline_context)
> File "/Users/urban/projects/beam/sdks/python/apache_beam/pipeline.py",
> line 1231, in from_runner_api
> part = context.transforms.get_by_id(transform_id)
> File
> "/Users/urban/projects/beam/sdks/python/apache_beam/runners/pipeline_context.py",
> line 103, in get_by_id
> self._id_to_proto[id], self._pipeline_context)
> File "/Users/urban/projects/beam/sdks/python/apache_beam/pipeline.py",
> line 1231, in from_runner_api
> part = context.transforms.get_by_id(transform_id)
> File
> "/Users/urban/projects/beam/sdks/python/apache_beam/runners/pipeline_context.py",
> line 103, in get_by_id
> self._id_to_proto[id], self._pipeline_context)
> File "/Users/urban/projects/beam/sdks/python/apache_beam/pipeline.py",
> line 1170, in from_runner_api
> transform = ptransform.PTransform.from_runner_api(proto, context)
> File
> "/Users/urban/projects/beam/sdks/python/apache_beam/transforms/ptransform.py",
> line 685, in from_runner_api
> context)
> File
> "/Users/urban/projects/beam/sdks/python/apache_beam/transforms/core.py",
> line 1380, in from_runner_api_parameter
> assert pardo_payload.do_fn.urn == python_urns.PICKLED_DOFN_INFO
> AssertionError
>


BEAM-9958: Code Review Wanted for PR 11674

2020-05-15 Thread Tomo Suzuki
Hi Luke and Beam committers,

Would you check this PR to use Linkage Checker's exclusion file?
https://github.com/apache/beam/pull/11674
This script used to use "diff" command to identify new linkage errors by
comparing line by line. With this PR, it identifies new linkage errors in
an appropriate manner. This PR opens up an opportunity to set up a Jenkins
job to check dependency conflicts.

-- 
Regards,
Tomo


regarding Google Season of Docs

2020-05-15 Thread Yuvraj Manral
Respected sir/mam,

I came around the projects proposed by Apache Beam for Season of Docs 2020.
I am a newbie to organisation but really liked the ideas of projects and
would love to start contributing and prepare my proposal for Season of Docs.

Please guide me through. Where should I start and then proceed ?
Thanking you in anticipation

Yuvraj Manral 
RSVP


Re: Writing a new IO on beam, should I use the source API or SDF?

2020-05-15 Thread Ismaël Mejía
For the Bounded case if you do not have a straight forward way to split at
fractions, or simply if you do not care about Dynamic Work Rebalancing. You can
get away implementing a simple DoFn (without Restrictions) based implementation
and evolve from it. More and more IOs at Beam are becoming DoFn based (even if
not SDF) because you win the composability advantages.

An interesting question is when should we start deprecating the Source API and
encourage people to write only DoFn based IOs. I think we are getting to the
maturity point where we can start this discussion.

On Fri, May 15, 2020 at 4:59 PM Luke Cwik  wrote:
>
> If it is an unbounded source then SDF is a winner since you are not giving up 
> anything with it when compared to the legacy UnboundedSource API since 
> Dataflow doesn't support dynamic splitting of unbounded SDFs or 
> UnboundedSources (only initial splitting). You gain the ability to compose 
> sources and the initial splitting is done at pipeline execution for SDFs vs 
> pipeline construction time for UnboundedSource.
>
> If it is bounded, my gut is to still go with SDF since:
> * Dataflow runner V2 supports SDF fully
> * The Java/Python SDF APIs have gone through the majority of churn already, 
> there are some minor clean-ups and then I would like to remove the 
> @Experimental annotations from them after a discussion on dev@ about it
> * Being able to compose "sources" is immensely powerful
>
> The caveat is that Dataflow runner V1 doesn't support dynamic splitting of 
> SDFs today and depending on how well runner v2 rollout happens, may never. 
> The big plus with the legacy source API is that there are already 
> bounded/unbounded source wrappers that will convert them into SDFs so you get 
> all of runner v1 and runner v2 support for what the legacy source API can do 
> today but give up the composability and any splitting support for unbounded 
> SDFs that will come later.
>
> Finally, there is a way to get limited support for dynamic splitting of 
> bounded and unbounded SDFs for other runners using the composability of SDFs 
> and the limited depth splitting proposal[1].
>
> 1: 
> https://docs.google.com/document/d/1cKOB9ToasfYs1kLWQgffzvIbJx2Smy4svlodPRhFrk4/edit#heading=h.wkwslng744mv
>
> On Fri, May 15, 2020 at 7:08 AM Steve Niemitz  wrote:
>>
>> I'm going to be writing a new IO (in java) for reading files in a custom 
>> format, and want to make it splittable.  It seems like I have a choice 
>> between the "legacy" source API, and newer experimental SDF API.  Is there 
>> any guidance on which I should use?  I can likely tolerate some API churn as 
>> well in the SDF APIs.
>>
>> My target runner is dataflow.
>>
>> Thanks!


Re: TextIO. Writing late files

2020-05-15 Thread Reuven Lax
Lateness should never be introduced inside a pipeline - generally late data
can only come from a source.  If data was not dropped as late earlier in
the pipeline, it should not be dropped after the file write. I suspect that
this is a bug in how the Flink runner handles the Reshuffle transform, but
I'm not sure what the exact bug is.

Reuven

On Fri, May 15, 2020 at 2:23 AM Jozef Vilcek  wrote:

> Hi Jose,
>
> thank you for putting the effort to get example which demonstrate your
> problem.
>
> You are using a streaming pipeline and it seems that watermark in
> downstream already advanced further, so when your File pane arrives, it is
> already late. Since you define that lateness is not tolerated, it is
> dropped.
> I myself never had requirement to specify zero allowed lateness for
> streaming. It feels dangerous. Do you have a specific use case? Also, in
> may cases, after windowed files are written, I usually collect them into
> global window and specify a different triggering policy for collecting
> them. Both cases are why I never came across this situation.
>
> I do not have an explanation if it is a bug or not. I would guess that
> watermark can advance further, e.g. because elements can be processed in
> arbitrary order. Not saying this is the case.
> It needs someone with better understanding of how watermark advance is /
> should be handled within pipelines.
>
>
> P.S.: you can add `.withTimestampFn()` to your generate sequence, to get
> more stable timing, which is also easier to reason about:
>
> Dropping element at 1970-01-01T00:00:19.999Z for key
> ... window:[1970-01-01T00:00:15.000Z..1970-01-01T00:00:20.000Z) since too
> far behind inputWatermark:1970-01-01T00:00:24.000Z;
> outputWatermark:1970-01-01T00:00:24
> .000Z
>
>instead of
>
> Dropping element at 2020-05-15T08:52:34.999Z for key ...
> window:[2020-05-15T08:52:30.000Z..2020-05-15T08:52:35.000Z) since too far
> behind inputWatermark:2020-05-15T08:52:39.318Z;
> outputWatermark:2020-05-15T08:52:39.318Z
>
>
>
>
> In my
>
>
>
> On Thu, May 14, 2020 at 10:47 AM Jose Manuel 
> wrote:
>
>> Hi again,
>>
>> I have simplify the example to reproduce the data loss. The scenario is
>> the following:
>>
>> - TextIO write files.
>> - getPerDestinationOutputFilenames emits file names
>> - File names are processed by a aggregator (combine, distinct,
>> groupbyKey...) with a window **without allowlateness**
>> - File names are discarded as late
>>
>> Here you can see the data loss in the picture in
>> https://github.com/kiuby88/windowing-textio/blob/master/README.md#showing-data-loss
>>
>> Please, follow README to run the pipeline and find log traces that say
>> data are dropped as late.
>> Remember, you can run the pipeline with another window's  lateness values
>> (check README.md)
>>
>> Kby.
>>
>> El mar., 12 may. 2020 a las 17:16, Jose Manuel ()
>> escribió:
>>
>>> Hi,
>>>
>>> I would like to clarify that while TextIO is writing every data are in
>>> the files (shards). The losing happens when file names emitted by
>>> getPerDestinationOutputFilenames are processed by a window.
>>>
>>> I have created a pipeline to reproduce the scenario in which some
>>> filenames are loss after the getPerDestinationOutputFilenames. Please, note
>>> I tried to simplify the code as much as possible, but the scenario is not
>>> easy to reproduce.
>>>
>>> Please check this project https://github.com/kiuby88/windowing-textio
>>> Check readme to build and run (
>>> https://github.com/kiuby88/windowing-textio#build-and-run)
>>> Project contains only a class with the pipeline PipelineWithTextIo,
>>> a log4j2.xml file in the resources and the pom.
>>>
>>> The pipeline in PipelineWithTextIo generates unbounded data using a
>>> sequence. It adds a little delay (10s) per data entry, it uses a distinct
>>> (just to apply the window), and then it writes data using TexIO.
>>> The windows for the distinct is fixed (5 seconds) and it does not use
>>> lateness.
>>> Generated files can be found in
>>> windowing-textio/pipe_with_lateness_0s/files. To write files the
>>> FileNamePolicy uses window + timing + shards (see
>>> https://github.com/kiuby88/windowing-textio/blob/master/src/main/java/org/kby/PipelineWithTextIo.java#L135
>>> )
>>> Files are emitted using getPerDestinationOutputFilenames()
>>> (see the code here,
>>> https://github.com/kiuby88/windowing-textio/blob/master/src/main/java/org/kby/PipelineWithTextIo.java#L71-L78
>>> )
>>>
>>> Then, File names in the PCollection are extracted and logged. Please,
>>> note file names dot not have pain information in that point.
>>>
>>> To apply a window a distinct is used again. Here several files are
>>> discarded as late and they are not processed by this second distinct.
>>> Please, see
>>>
>>> https://github.com/kiuby88/windowing-textio/blob/master/src/main/java/org/kby/PipelineWithTextIo.java#L80-L83
>>>
>>> Debug is enabled for WindowTracing, so you can find in the terminal
>>> several messages as the followiing:
>>

Google Season of Docs

2020-05-15 Thread Amr Maghraby
Dear Apace,
My name is Amr Maghraby, I am a new graduate from AAST college got the
first rank on my class with CGPA 3.92 and joined the international
competition in the US called ROV got the second worldwide and last summer I
have involved in Google Summer of code 2019 and did good work also, I
participated in problem-solving competitions ACM ACPC and Hash Code. I was
asking if I could apply for GSOD?
Waiting for your reply.
Thanks,
Amr Maghraby


Re: Python Precommit significantly flaky

2020-05-15 Thread Brian Hulette
Thanks to Kyle we captured some additional logging for
https://issues.apache.org/jira/browse/BEAM-9975. I spent a little time
looking at it and found two different issues (see details in the comments):
https://issues.apache.org/jira/browse/BEAM-10006 - PipelineOptions can pick
up definitions from unrelated tests
https://issues.apache.org/jira/browse/BEAM-10007 - PortableRunner doesn't
handle ValueProvider instances when converting pipeline options

10006 needs some more investigation. I _think_ the fix for 10007 is pretty
simple, but I'm not familiar with ValueProvider so I could use some
guidance.

Can anyone help out with either of these issues?


On Wed, May 13, 2020 at 12:26 PM Valentyn Tymofieiev 
wrote:

> +1. PortableRunnerTest is very flaky - I looked at last 10 precommit-on-PR
> runs and 7 of them had errors in PortableRunnerTests.
>
>


Re: Writing a new IO on beam, should I use the source API or SDF?

2020-05-15 Thread Luke Cwik
If it is an unbounded source then SDF is a winner since you are not giving
up anything with it when compared to the legacy UnboundedSource API since
Dataflow doesn't support dynamic splitting of unbounded SDFs or
UnboundedSources (only initial splitting). You gain the ability to compose
sources and the initial splitting is done at pipeline execution for SDFs vs
pipeline construction time for UnboundedSource.

If it is bounded, my gut is to still go with SDF since:
* Dataflow runner V2 supports SDF fully
* The Java/Python SDF APIs have gone through the majority of churn already,
there are some minor clean-ups and then I would like to remove the
@Experimental annotations from them after a discussion on dev@ about it
* Being able to compose "sources" is immensely powerful

The caveat is that Dataflow runner V1 doesn't support dynamic splitting of
SDFs today and depending on how well runner v2 rollout happens, may never.
The big plus with the legacy source API is that there are already
bounded/unbounded source wrappers that will convert them into SDFs so you
get all of runner v1 and runner v2 support for what the legacy source API
can do today but give up the composability and any splitting support for
unbounded SDFs that will come later.

Finally, there is a way to get limited support for dynamic splitting of
bounded and unbounded SDFs for other runners using the composability of
SDFs and the limited depth splitting proposal[1].

1:
https://docs.google.com/document/d/1cKOB9ToasfYs1kLWQgffzvIbJx2Smy4svlodPRhFrk4/edit#heading=h.wkwslng744mv

On Fri, May 15, 2020 at 7:08 AM Steve Niemitz  wrote:

> I'm going to be writing a new IO (in java) for reading files in a custom
> format, and want to make it splittable.  It seems like I have a choice
> between the "legacy" source API, and newer experimental SDF API.  Is there
> any guidance on which I should use?  I can likely tolerate some API churn
> as well in the SDF APIs.
>
> My target runner is dataflow.
>
> Thanks!
>


Writing a new IO on beam, should I use the source API or SDF?

2020-05-15 Thread Steve Niemitz
I'm going to be writing a new IO (in java) for reading files in a custom
format, and want to make it splittable.  It seems like I have a choice
between the "legacy" source API, and newer experimental SDF API.  Is there
any guidance on which I should use?  I can likely tolerate some API churn
as well in the SDF APIs.

My target runner is dataflow.

Thanks!


Java transform executed as Python transform?

2020-05-15 Thread Paweł Urbanowicz
Hey,
I created a transform method in Java and now I want to use it in Python
using Cross-language.

I got pretty stuck with the following problem:
p
| GenerateSequence(...)
|ExternalTransform(...)
*=> is working like a charm *


p
| Create(...)
| ExternalTransform(...)
*=> getting assert pardo_payload.do_fn.urn ==
python_urns.PICKLED_DOFN_INFO *

Based on https://www.mail-archive.com/user@beam.apache.org/msg04887.html
it seems like a Create transform is being registered as a Java transform
but executed as Python transform.
Do you have any idea what is going on here? Thanks a lot for any help :)


Traceback (most recent call last):
File
"/Users/urban/projects/beam/sdks/python/apache_beam/io/external/snowflake_test.py",
line 92, in test_snowflake_write_read
self.run_write()
File
"/Users/urban/projects/beam/sdks/python/apache_beam/io/external/snowflake_test.py",
line 129, in run_write
expansion_service=self.expansion_service,
File "/Users/urban/projects/beam/sdks/python/apache_beam/pipeline.py", line
528, in __exit__
self.run().wait_until_finish()
File
"/Users/urban/projects/beam/sdks/python/apache_beam/testing/test_pipeline.py",
line 112, in run
False if self.not_use_test_runner_api else test_runner_api))
File "/Users/urban/projects/beam/sdks/python/apache_beam/pipeline.py", line
501, in run
self._options).run(False)
File "/Users/urban/projects/beam/sdks/python/apache_beam/pipeline.py", line
858, in from_runner_api
p.transforms_stack = [context.transforms.get_by_id(root_transform_id)]
File
"/Users/urban/projects/beam/sdks/python/apache_beam/runners/pipeline_context.py",
line 103, in get_by_id
self._id_to_proto[id], self._pipeline_context)
File "/Users/urban/projects/beam/sdks/python/apache_beam/pipeline.py", line
1231, in from_runner_api
part = context.transforms.get_by_id(transform_id)
File
"/Users/urban/projects/beam/sdks/python/apache_beam/runners/pipeline_context.py",
line 103, in get_by_id
self._id_to_proto[id], self._pipeline_context)
File "/Users/urban/projects/beam/sdks/python/apache_beam/pipeline.py", line
1231, in from_runner_api
part = context.transforms.get_by_id(transform_id)
File
"/Users/urban/projects/beam/sdks/python/apache_beam/runners/pipeline_context.py",
line 103, in get_by_id
self._id_to_proto[id], self._pipeline_context)
File "/Users/urban/projects/beam/sdks/python/apache_beam/pipeline.py", line
1231, in from_runner_api
part = context.transforms.get_by_id(transform_id)
File
"/Users/urban/projects/beam/sdks/python/apache_beam/runners/pipeline_context.py",
line 103, in get_by_id
self._id_to_proto[id], self._pipeline_context)
File "/Users/urban/projects/beam/sdks/python/apache_beam/pipeline.py", line
1231, in from_runner_api
part = context.transforms.get_by_id(transform_id)
File
"/Users/urban/projects/beam/sdks/python/apache_beam/runners/pipeline_context.py",
line 103, in get_by_id
self._id_to_proto[id], self._pipeline_context)
File "/Users/urban/projects/beam/sdks/python/apache_beam/pipeline.py", line
1170, in from_runner_api
transform = ptransform.PTransform.from_runner_api(proto, context)
File
"/Users/urban/projects/beam/sdks/python/apache_beam/transforms/ptransform.py",
line 685, in from_runner_api
context)
File
"/Users/urban/projects/beam/sdks/python/apache_beam/transforms/core.py",
line 1380, in from_runner_api_parameter
assert pardo_payload.do_fn.urn == python_urns.PICKLED_DOFN_INFO
AssertionError


Re: New Grafana dashboards

2020-05-15 Thread Kamil Wasilewski
Fixed, thanks for spotting that! One of the regex wasn't properly
interpreted in the latest version of Grafana, but now it should be OK.

On Thu, May 14, 2020 at 11:58 PM Pablo Estrada  wrote:

> I noticed that postcommit status dashboard shows 0/1 values - I remember
> it used to show green/red for passed/failed tests. Maybe something changed
> for it?
> Best
> -P.
>
> On Wed, May 13, 2020 at 3:23 PM Pablo Estrada  wrote:
>
>> Thanks Kamil! These dashboards are super useful. I'm happy to see our
>> perf tests there as well now.
>> Thanks!
>> -P.
>>
>> On Wed, May 13, 2020 at 8:43 AM Tyson Hamilton 
>> wrote:
>>
>>> The dashboards look great! Thank you.
>>>
>>> It would be nice if the 'Useful Links' section included links to Apache
>>> Beam related material like the cwiki documentation.
>>>
>>> On Wed, May 13, 2020 at 4:50 AM Kamil Wasilewski <
>>> kamil.wasilew...@polidea.com> wrote:
>>>
 Hello everyone,

 I'm pleased to announce that we've just moved dashboards gathering
 performance test execution times from Perfkit Explorer to Grafana. Here's a
 link to new dashboards: http://metrics.beam.apache.org

 *Why Grafana?*
 Grafana is an open source visualization tool. It offers better user
 experience and more flexibility that the tool we've been using until now.
 It also changes the way of developing new charts - all Grafana dashboards
 are stored as json files in Beam's repository, and therefore require a full
 code review process.

 *What's next?*
 There is still some work to be done. This includes moving even more
 charts to Grafana and simplifying the process of updating and creating new
 charts. We are also working on improving the docs [1].

 As always, I'd be happy to hear any feedback from you.

 Cheers,
 Kamil

 [1]
 https://cwiki.apache.org/confluence/display/BEAM/Test+Results+Monitoring

>>>


Re: TextIO. Writing late files

2020-05-15 Thread Jozef Vilcek
Hi Jose,

thank you for putting the effort to get example which demonstrate your
problem.

You are using a streaming pipeline and it seems that watermark in
downstream already advanced further, so when your File pane arrives, it is
already late. Since you define that lateness is not tolerated, it is
dropped.
I myself never had requirement to specify zero allowed lateness for
streaming. It feels dangerous. Do you have a specific use case? Also, in
may cases, after windowed files are written, I usually collect them into
global window and specify a different triggering policy for collecting
them. Both cases are why I never came across this situation.

I do not have an explanation if it is a bug or not. I would guess that
watermark can advance further, e.g. because elements can be processed in
arbitrary order. Not saying this is the case.
It needs someone with better understanding of how watermark advance is /
should be handled within pipelines.


P.S.: you can add `.withTimestampFn()` to your generate sequence, to get
more stable timing, which is also easier to reason about:

Dropping element at 1970-01-01T00:00:19.999Z for key
... window:[1970-01-01T00:00:15.000Z..1970-01-01T00:00:20.000Z) since too
far behind inputWatermark:1970-01-01T00:00:24.000Z;
outputWatermark:1970-01-01T00:00:24
.000Z

   instead of

Dropping element at 2020-05-15T08:52:34.999Z for key ...
window:[2020-05-15T08:52:30.000Z..2020-05-15T08:52:35.000Z) since too far
behind inputWatermark:2020-05-15T08:52:39.318Z;
outputWatermark:2020-05-15T08:52:39.318Z




In my



On Thu, May 14, 2020 at 10:47 AM Jose Manuel  wrote:

> Hi again,
>
> I have simplify the example to reproduce the data loss. The scenario is
> the following:
>
> - TextIO write files.
> - getPerDestinationOutputFilenames emits file names
> - File names are processed by a aggregator (combine, distinct,
> groupbyKey...) with a window **without allowlateness**
> - File names are discarded as late
>
> Here you can see the data loss in the picture in
> https://github.com/kiuby88/windowing-textio/blob/master/README.md#showing-data-loss
>
> Please, follow README to run the pipeline and find log traces that say
> data are dropped as late.
> Remember, you can run the pipeline with another window's  lateness values
> (check README.md)
>
> Kby.
>
> El mar., 12 may. 2020 a las 17:16, Jose Manuel ()
> escribió:
>
>> Hi,
>>
>> I would like to clarify that while TextIO is writing every data are in
>> the files (shards). The losing happens when file names emitted by
>> getPerDestinationOutputFilenames are processed by a window.
>>
>> I have created a pipeline to reproduce the scenario in which some
>> filenames are loss after the getPerDestinationOutputFilenames. Please, note
>> I tried to simplify the code as much as possible, but the scenario is not
>> easy to reproduce.
>>
>> Please check this project https://github.com/kiuby88/windowing-textio
>> Check readme to build and run (
>> https://github.com/kiuby88/windowing-textio#build-and-run)
>> Project contains only a class with the pipeline PipelineWithTextIo,
>> a log4j2.xml file in the resources and the pom.
>>
>> The pipeline in PipelineWithTextIo generates unbounded data using a
>> sequence. It adds a little delay (10s) per data entry, it uses a distinct
>> (just to apply the window), and then it writes data using TexIO.
>> The windows for the distinct is fixed (5 seconds) and it does not use
>> lateness.
>> Generated files can be found in
>> windowing-textio/pipe_with_lateness_0s/files. To write files the
>> FileNamePolicy uses window + timing + shards (see
>> https://github.com/kiuby88/windowing-textio/blob/master/src/main/java/org/kby/PipelineWithTextIo.java#L135
>> )
>> Files are emitted using getPerDestinationOutputFilenames()
>> (see the code here,
>> https://github.com/kiuby88/windowing-textio/blob/master/src/main/java/org/kby/PipelineWithTextIo.java#L71-L78
>> )
>>
>> Then, File names in the PCollection are extracted and logged. Please,
>> note file names dot not have pain information in that point.
>>
>> To apply a window a distinct is used again. Here several files are
>> discarded as late and they are not processed by this second distinct.
>> Please, see
>>
>> https://github.com/kiuby88/windowing-textio/blob/master/src/main/java/org/kby/PipelineWithTextIo.java#L80-L83
>>
>> Debug is enabled for WindowTracing, so you can find in the terminal
>> several messages as the followiing:
>> DEBUG org.apache.beam.sdk.util.WindowTracing - LateDataFilter: Dropping
>> element at 2020-05-12T14:05:14.999Z for
>> key:path/pipe_with_lateness_0s/files/[2020-05-12T14:05:10.000Z..2020-05-12T14:05:15.000Z)-ON_TIME-0-of-1.txt;
>> window:[2020-05-12T14:05:10.000Z..2020-05-12T14:05:15.000Z) since too far
>> behind inputWatermark:2020-05-12T14:05:19.799Z;
>> outputWatermark:2020-05-12T14:05:19.799Z`
>>
>> What happen here? I think that messages are generated per second and a
>> window of 5 seconds group them. Then a delay is added and finall