Re: [Question] How does one test Counters?

2021-09-01 Thread Jeff Klukas
We have a few transform-level tests where we check counter behavior in
Mozilla's telemetry pipeline codebase. See:

https://github.com/mozilla/gcp-ingestion/blob/fa98ac0c8fa09b5671a961062e6cf0985ec48b0e/ingestion-beam/src/test/java/com/mozilla/telemetry/decoder/GeoIspLookupTest.java#L77-L83

But I'm not familiar with how much metrics testing exists within the
apache/beam codebase.

On Wed, Sep 1, 2021 at 12:08 AM Enis Sert  wrote:

> Hi,
>
> I'm planning to add an `org.apache.beam.sdk.metrics.Counter` to my Beam
> PR and bump it when some event happens, similar to
> https://github.com/apache/beam/blob/master/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java#L988.
> However, I couldn't figure out how to test it with unit tests (i.e. I can't
> verify that the counter is incremented at all, but I can verify that
> the block containing it is executed). Can someone share an example?
>
> Thanks,
> esert
>


Re: How to flush window when draining a Dataflow pipeline?

2021-05-21 Thread Jeff Klukas
On Fri, May 21, 2021 at 11:47 AM Kenneth Knowles  wrote:

> +dev  +Reuven Lax 
>
> Advancing the watermark to infinity does have an effect on the
> GlobalWindow. The GlobalWindow ends a little bit before infinity :-). That
> is why this works to cause the output even for unbounded aggregations.
>

I'm definitely glad to hear that GlobalWindow is supposed to close on
Drain, so it sounds like the FixedWindows work around is not necessary.

If the watermark advances with the intention of causing windows to close,
then it's unclear to me in what cases droppedDueToLateness would be
expected, and whether it would be expected in our case.

Is it possible that the watermark is advanced while there are still
messages working their way through the pipeline, so that by the time they
hit the aggregation, they're considered late? If so, is there a way to
prevent that?


> On Fri, May 21, 2021 at 5:10 AM Jeff Klukas  wrote:
>
>> Beam users,
>>
>> We're attempting to write a Java pipeline that uses Count.perKey() to
>> collect event counts, and then flush those to an HTTP API every ten minutes
>> based on processing time.
>>
>> We've tried expressing this using GlobalWindows with an
>> AfterProcessingTime trigger, but we find that when we drain the pipeline
>> we end up with entries in the droppedDueToLateness metric. This was
>> initially surprising, but may be line line with documented behavior [0]:
>>
>> > When you issue the Drain command, Dataflow immediately closes any
>> in-process windows and fires all triggers. The system does not wait for any
>> outstanding time-based windows to finish. Dataflow causes open windows to
>> close by advancing the system watermark to infinity
>>
>> Perhaps advancing watermark to infinity has no effect on GlobalWindows,
>> so we attempted to get around this by using a fixed but arbitrarily-long
>> window:
>>
>> FixedWindows.of(Duration.standardDays(36500))
>>
>> The first few tests with this configuration came back clean, but the
>> third test again showed droppedDueToLateness after calling Drain. You can
>> see this current configuration in [1].
>>
>> Is there a pattern for reliably flushing on Drain when doing processing
>> time-based aggregates like this?
>>
>> [0]
>> https://cloud.google.com/dataflow/docs/guides/stopping-a-pipeline#effects_of_draining_a_job
>> [1]
>> https://github.com/mozilla/gcp-ingestion/pull/1689/files#diff-1d75ce2cbda625465d5971a83d842dd35e2eaded2c2dd2b6c7d0d7cdfd459115R58-R71
>>
>>


Re: Use case for reading to dynamic Pub/Sub subscriptions?

2020-12-09 Thread Jeff Klukas
I have certainly had use cases in the past where I've made use of the Kafka
consumer library's ability to consume from a set of topics based on a
regular expression.

Specifically, I worked with a microservices architecture where each service
had a Postgres DB with a logical decoding client for change data capture to
Kafka. There were separate Kafka topics populated per DB. We then had a
service that consumed from all those topics via regex for sinking the data
to S3. It was particularly nice that the regex-based topic subscription was
able to automatically pick up new topics matching the regex without
restarting the application.

I do find it limiting that PubsubIO needs to know about topics explicitly
on startup.

On Tue, Dec 8, 2020 at 8:23 PM Vincent Marquez 
wrote:

> KafkaIO has a readAll method that returns a
> PTransform, PCollection> is that
> what you mean? Then it could read in a 'dynamic' number of topics generated
> from somewhere else.  Is that what you mean?
>
> *~Vincent*
>
>
> On Tue, Dec 8, 2020 at 5:15 PM Daniel Collins 
> wrote:
>
>> /s/Combine/Flatten
>>
>> On Tue, Dec 8, 2020 at 8:06 PM Daniel Collins 
>> wrote:
>>
>>> Hi all,
>>>
>>> I'm trying to figure out if there's any possible use for reading from a
>>> dynamic set of Pub/Sub [Lite] subscriptions in a beam pipeline, although
>>> the same logic would apply to kafka topics. Does anyone know of a use case
>>> where you'd want to apply the same set of processing logic to all messages
>>> on a set of topics, but, you wouldn't know that set of topics when the
>>> pipeline is started? (otherwise you could just use Combine).
>>>
>>> -Dan
>>>
>>


Re: BigQueryIO create dataset

2020-12-03 Thread Jeff Klukas
> I don't think BigQuery offers a way to automatically create datasets when
writing.

This exactly, and it also makes sense from the standpoint of the BQ
permissions model. In BigQuery, table creation requires that the service
account have privileges at the dataset level, but dataset creation requires
that the service account is essentially an administrator for the entire
project. Given that model, I don't expect dynamic dataset creation to be a
common enough use case to warrant special support in BigQueryIO. It would
certainly be interesting to hear from others with such needs, though.

I concur with Chamikara that it likely makes sense to perform this in a
ParDo before your BigQueryIO.write transform. You'd probably want to list
datasets in the project on startup, and then for any message that comes in
needing a new dataset, make the appropriate API call using the BigQuery
SDK, and update the cache. You'd likely need to provide error handling to
handle the race condition where a different node has already successfully
created the dataset.

On Wed, Dec 2, 2020 at 10:27 PM Chamikara Jayalath 
wrote:

> The functionality does not come from BigQueryIO itself but it just exposes
> existing BigQuery feature CreateDisposition [1]. I don't think BigQuery
> offers a way to automatically create datasets when writing.
> Is it possible to create such Datasets from a ParDo in your pipeline that
> precedes BigQueryIO write transform ?
>
> Thanks,
> Cham
>
> [1]
> https://cloud.google.com/bigquery/docs/reference/auditlogs/rest/Shared.Types/CreateDisposition
>
> On Wed, Dec 2, 2020 at 3:20 PM Vasu Gupta  wrote:
>
>> Hey folks, why isn't there any capability of creating datasets
>> automatically just like tables in BigQueryIO? Actually, we at our company
>> have dynamic dataset architecture which means as the packet arrives, we
>> need to create new datasets and tables on the go. Since BigQueryIO already
>> have functionality of creating tables automatically so we were thinking
>> that why not a similar functionality for dataset can be implemented in
>> BigQueryIO.
>>
>


Re: Upgrade instruction from TimerDataCoder to TimerDataCoderV2

2020-11-06 Thread Jeff Klukas
Ke - You are correct that generally data encoded with a previous coder
version cannot be read with an updated coder. The formats have to match
exactly.

As far as I'm aware, it's necessary to flush a job and start with fresh
state in order to upgrade coders.

On Fri, Nov 6, 2020 at 2:13 PM Ke Wu  wrote:

> Hello,
>
> I found that TimerDataCoderV2 is created to include timer family id and
> output timestamps fields in TimerData. In addition, the new fields are
> encoded between old fields, which I suppose V2 coder cannot decode and data
> that is encoded by V1 coder and vice versus. My ask here is, how should we
> properly upgrade without losing existing states persisted in a store?
>
> Best,
> Ke


Re: [BEAM-10587] Support Maps in BigQuery #12389

2020-10-09 Thread Jeff Klukas
It's definitely desirable to be able to get back Map types from BQ, and
it's nice that BQ is consistent in representing maps as repeated key/value
structs. Inferring maps from that specific structure is preferable to
inventing some new naming convention for the fields, which would hinder
interoperability with non-Beam applications.

Would it be possible to add a configurable parameter called something like
withMapsInferred() ? Default behavior would be the status quo, but users
could opt in to the behavior of inferring maps based on field names. This
would prevent the PR change from potentially breaking existing
applications. And it means the least surprising behavior remains the
default.

On Fri, Oct 9, 2020 at 6:06 AM Worley, Ryan  wrote:

> https://github.com/apache/beam/pull/12389
>
> Hi everyone, in the above pull request I am attempting to add support for
> writing Avro records with maps to a BigQuery table (via Beam Schema).  The
> write portion is fairly straightforward - we convert the map to an array of
> structs with key and value fields (seemingly the closest possible
> approximation of a map in BigQuery).  But the read back portion is more
> controversial because we simply check if a field is an array of structs
> with exactly two fields - key and value - and assume that should be read
> into a Schema map field.
>
> So the possibility exists that an array of structs with key and value
> fields, which wasn't originally written from a map, could be unexpectedly
> read into a map.  In the PR review I suggested a few options for tagging
> the BigQuery field, so that we could know it was written from a Beam Schema
> map and should be read back into one, but I'm not very satisfied with any
> of the options.
>
> Andrew Pilloud suggested that I write to this group to get some feedback
> on the issue.  Should we be concerned that all arrays of structs with
> exactly 'key' and 'value' fields would be read into a Schema map or could
> this be considered a feature?  If the former, how would you suggest that we
> limit reading into a map only those fields that were originally written
> from a map?
>
> Thanks for any feedback to help bump this PR along!
>
> NOTICE:
>
> This message, and any attachments, contain(s) information that may be
> confidential or protected by privilege from disclosure and is intended only
> for the individual or entity named above. No one else may disclose, copy,
> distribute or use the contents of this message for any purpose. Its
> unauthorized use, dissemination or duplication is strictly prohibited and
> may be unlawful. If you receive this message in error or you otherwise are
> not an authorized recipient, please immediately delete the message and any
> attachments and notify the sender.
>


Re: [QUESTION] Reading Snappy Compressed Text Files

2020-04-22 Thread Jeff Klukas
Beam is able to infer compression from file extensions for a variety of
formats, but snappy is not among them currently:

https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Compression.java

Although ParquetIO and AvroIO each look to have support for snappy.

So as best I can tell, there is no current built-in support for reading
text files compressed via snappy. I think you would need to use FileIO to
match files, and then implement a custom DoFn that take the file object,
streams the contents through a snappy decompressor, and outputs one record
per line.

I imagine a PR to add snappy as a supported format in Compression.java
would be welcome.

On Wed, Apr 22, 2020 at 1:16 PM Christopher Larsen 
wrote:

> Hi devs,
>
> We are trying to build a pipeline to read snappy compressed text files
> that contain one record per line using the Java SDK.
>
> We have tried the following to read the files:
>
> p.apply("ReadLines",
> FileIO.match().filepattern((options.getInputFilePattern(
> .apply(FileIO.readMatches())
> .setCoder(SnappyCoder.of(ReadableFileCoder.of()))
> .apply(TextIO.readFiles())
> .apply(ParDo.of(new TransformRecord()));
>
> Is there a recommended way to decompress and read Snappy files with Beam?
>
> Thanks,
> Chris
>


Re: [RESULT][VOTE] Accept the Firefly design donation as Beam Mascot - Deadline Mon April 6

2020-04-17 Thread Jeff Klukas
I personally like the sound of "Datum" as a name. I also like the idea of
not assigning them a gender.

As a counterpoint on the naming side, one of the slide decks provided while
iterating on the design mentions:

> Mascot can change colors when it is “full of data” or has a “batch of
data” to process.  Yellow is supercharged and ready to process!

Based on that, I'd argue that the mascot maps to the concept of a bundle in
the beam execution model and we should consider a name that's a play on
"bundle" or perhaps a play on "checkpoint".

On Thu, Apr 16, 2020 at 3:44 PM Julian Bruno  wrote:

> Hi all,
>
> While working on the design of our Mascot
> Some ideas showed up and I wish to share them.
> In regard to Alex Van Boxel's question about the name of our Mascot.
>
> I was thinking about this yesterday night and feel it could be a great
> idea to name the Mascot "*Data*" or "*Datum*". Both names sound cute and
> make sense to me. I prefer the later. Datum means a single piece of
> information. The Mascot is the first piece of information and its job is to
> collect batches of data and process it. Datum is in charge of linking
> information together.
>
> In addition, our Mascot should have no gender. Rendering it accessible to
> all users.
>
> Beam as a name for the mascot is pretty straight forward but I think there
> are many things carrying that same name already.
>
> What do you think?
>
> Looking forward to hearing your feedback. Names are important and I feel
> it can expand the personality and create a cool background for our Mascot.
>
> Cheers!
>
> Julian
>
> On Mon, Apr 13, 2020, 3:40 PM Kyle Weaver  wrote:
>
>> Beam Firefly is fine with me (I guess people tend to forget mascot names
>> anyway). But if anyone comes up with something particularly cute/clever we
>> can consider it.
>>
>> On Mon, Apr 13, 2020 at 6:33 PM Aizhamal Nurmamat kyzy <
>> aizha...@apache.org> wrote:
>>
>>> @Alex, Beam Firefly?
>>>
>>> On Thu, Apr 9, 2020 at 10:57 PM Alex Van Boxel  wrote:
>>>
 We forgot something

 ...

 ...

 it/she/he needs a *name*!


  _/
 _/ Alex Van Boxel


 On Fri, Apr 10, 2020 at 6:19 AM Kenneth Knowles 
 wrote:

> Looking forward to the guide. I enjoy doing (bad) drawings as a way to
> relax. And I want them to be properly on brand :-)
>
> Kenn
>
> On Thu, Apr 9, 2020 at 10:35 AM Maximilian Michels 
> wrote:
>
>> Awesome. What a milestone! The mascot is a real eye catcher. Thank you
>> Julian and Aizhamal for making it happen.
>>
>> On 06.04.20 22:05, Aizhamal Nurmamat kyzy wrote:
>> > I am happy to announce that this vote has passed, with 13 approving
>> +1
>> > votes, 5 of which are binding PMC votes.
>> >
>> > We have the final design for the Beam Firefly! Yahoo!
>> >
>> > Everyone have a great week!
>> >
>> >
>> >
>> > On Mon, Apr 6, 2020 at 9:57 AM David Morávek > > > wrote:
>> >
>> > +1 (non-binding)
>> >
>> > On Mon, Apr 6, 2020 at 12:51 PM Reza Rokni > > > wrote:
>> >
>> > +1(non-binding)
>> >
>> > On Mon, Apr 6, 2020 at 5:24 PM Alexey Romanenko
>> > mailto:aromanenko@gmail.com>>
>> wrote:
>> >
>> > +1 (non-binding).
>> >
>> > > On 3 Apr 2020, at 14:53, Maximilian Michels
>> > mailto:m...@apache.org>> wrote:
>> > >
>> > > +1 (binding)
>> > >
>> > > On 03.04.20 10:33, Jan Lukavský wrote:
>> > >> +1 (non-binding).
>> > >>
>> > >> On 4/2/20 9:24 PM, Austin Bennett wrote:
>> > >>> +1 (nonbinding)
>> > >>>
>> > >>> On Thu, Apr 2, 2020 at 12:10 PM Luke Cwik
>> > mailto:lc...@google.com>
>> > >>> >>
>> wrote:
>> > >>>
>> > >>>+1 (binding)
>> > >>>
>> > >>>On Thu, Apr 2, 2020 at 11:54 AM Pablo Estrada
>> > mailto:pabl...@google.com>
>> > >>>> > >> wrote:
>> > >>>
>> > >>>+1! (binding)
>> > >>>
>> > >>>On Thu, Apr 2, 2020 at 11:19 AM Alex Van
>> Boxel
>> > >>>mailto:a...@vanboxel.be>
>> > >>
>> wrote:
>> > >>>
>> > >>>Thanks for clearing this up Aizhamal.
>> > >>>
>> > >>>+1 (non binding)
>> > >>>
>> > >>>_/
>> > >>>  

Re: Discrete Transforms vs One Single transform

2020-02-21 Thread Jeff Klukas
Also note that runners in many cases will fuse discrete transforms together
for efficiency, so while you might worry about performance degradation from
breaking your logic into many discrete transforms, that likely won't be an
issue in practice.

Also note that you have the option of defining composite transforms [0]
that compose a series of smaller discrete transforms, but present an object
that follows the same API. Depending on your needs for modularity and
reuse, this can be a nice way of factoring out logic from your top-level
pipeline while still taking advantage of best practices for using Beam's
built-in transforms.

[0]
https://beam.apache.org/documentation/programming-guide/#composite-transforms

On Fri, Feb 21, 2020 at 1:08 AM Luke Cwik  wrote:

> Use discrete transforms.
>
> If you merge them all into one transform you will lose visibility into the
> different parts and will be rebuilding what already exists to provide that
> visibility. You'll also be rebuilding that APIs that help users combine all
> their functions together. You'll actually find that you'll be rebuilding
> lots of what Apache Beam provides.
>
> On Thu, Feb 20, 2020 at 8:19 PM amit kumar  wrote:
>
>> Hi All,
>>
>> I am looking for inputs to understand the effects of converting multiple
>> discrete transforms into one single transformation. (and performing all
>> steps into one single PTransform).
>>
>> What is better approach, multiple discrete transforms vs one single
>> transform with lambdas and multiple functions ?
>>
>> I wanted to understand the effect of combining multiple transforms into
>> one single transform and doing everything in a lambda via Functions, will
>> there be any affect in performance or debugging, metrics or any other
>> factors and best practices?
>>
>> Version A
>> PCollection myRecords = pbegin
>> .apply("Kinesis Source", readfromKinesis()) //transform1
>> .apply(MapElements
>> .into(TypeDescriptors.strings())
>> .via(record -> new String(record.getDataAsBytes(
>> //transform2
>> .apply(convertByteStringToJsonNode()) //transform3
>> .apply(schematizeElements()); //transform4
>>
>> Version B
>>  PCollection myRecords = pbegin
>> .apply("Kinesis Source", readfromKinesis()) transform1
>> .apply( inputKinesisRecord -> {
>> String record = inputKinesisRecord.getDataAsBytes();
>> JsonNode jsonNode = convertByteStringToJsonNode(record);
>> SchematizedElement outputElement =
>> getSchematzedElement(jsonNode))
>> return outputElement;  }) transform2
>>
>>
>> Thanks in advance!
>> Amit
>>
>


Re: Subclassing MapElements

2020-01-28 Thread Jeff Klukas
Jason - I was going to suggest the same approach that you arrived at. I
think using a static method to a MapElements instance is an elegant
solution.

The one drawback I can think of with that approach is that you lose control
over default naming. From your example usage:

myPCollection.apply(MyMapper.of(…).exceptionsInto(…))

if MyMapper returns a MapElements instance, then this line  will appear in
your pipeline graph as "MapWithFailures". If you do the more verbose thing
and extend PTransform, then the default name would be "MyMapper" from the
name of the class.

You can easily get around this problem by passing an explicit name to
`apply` when using the transform (and indeed this style is encouraged in
the Beam docs), but it could be a nuisance if you're used to relying on
default names that match class names.

On Tue, Jan 28, 2020 at 6:41 PM jmac...@godaddy.com 
wrote:

> Yeah it just seems like a lot of boiler plate to do builders with lots of
> methods just to wrap a MapElements type for syntactic convenience. After
> thinking this over last night I’m wondering if it wouldn’t be better to use
> static factory methods for these, so rather than the following, which tends
> to be what we see in the PTransform style guide etc, but hides the
> MapElements instance and builder calls, closing off the callers opportunity
> to add additional tweaks to the transform:
>
>
>
> public MyMapper extends PTransform, PCollection {
>
> @Override
>
> public PCollection expand(PCollection input) {
>
> return MapElements.into(…).via(…);
>
> }
>
> }
>
> we might instead do something like this
>
>
>
>
> public MyMapper {
>
> public static MapElements of(…) {
>
> return MapElements.into(…).via(…);
> }
> }
>
>
>
> Which might be used like so (shown with usage of MapElements additional
> calls):
>
>
> myPCollection.apply(MyMapper.of(…).exceptionsInto(…))
>
>
>
>
>
> For us, this saves us from having a lot of typedescriptor code and such
> ending up in high level pipeline setups, making it easier to understand.
>
>
>
> -Jason
>
>
>
> *From: *Kenneth Knowles 
> *Reply-To: *"dev@beam.apache.org" 
> *Date: *Monday, January 27, 2020 at 9:10 PM
> *To: *dev 
> *Subject: *Re: Subclassing MapElements
>
>
>
> Notice: This email is from an external sender.
>
>
>
> You can, with care, create an abstract Builder class that can be extended.
> You have to be careful to never call a concrete constructor and only call
> its own builder methods, leaving the final construction abstract. Basically
> a bunch of setter methods in a builder style.
>
>
>
> Kenn
>
>
>
> On Mon, Jan 27, 2020 at 9:08 PM Kenneth Knowles  wrote:
>
> It might be more trouble than it is worth, saving typing but adding
> complexity. Especially since you've got @AutoValue and @AutoValue.Builder
> to do all the heavy lifting anyhow (
> https://beam.apache.org/contribute/ptransform-style-guide/#api).
>
>
>
> Kenn
>
>


Review needed for BEAM-8745 - exposing BigQueryIO.Write#withMaxBytesPerPartition

2020-01-09 Thread Jeff Klukas
Per the Contribution Guide, I'm bringing a PR that's been idle for more
than 3 days to the dev list to find a reviewer.

This change doesn't add any new code, but makes the existing
withMaxBytesPerPartition method public
and adds documentation concerning its use.

https://github.com/apache/beam/pull/10500
https://jira.apache.org/jira/browse/BEAM-8745


Re: [VOTE] Beam's Mascot will be the Firefly (Lampyridae)

2019-12-13 Thread Jeff Klukas
+1 (non-binding)

On Thu, Dec 12, 2019 at 11:58 PM Kenneth Knowles  wrote:

> Please vote on the proposal for Beam's mascot to be the Firefly. This
> encompasses the Lampyridae family of insects, without specifying a genus or
> species.
>
> [ ] +1, Approve Firefly being the mascot
> [ ] -1, Disapprove Firefly being the mascot
>
> The vote will be open for at least 72 hours excluding weekends. It is
> adopted by at least 3 PMC +1 approval votes, with no PMC -1 disapproval
> votes*. Non-PMC votes are still encouraged.
>
> PMC voters, please help by indicating your vote as "(binding)"
>
> Kenn
>
> *I have chosen this format for this vote, even though Beam uses simple
> majority as a rule, because I want any PMC member to be able to veto based
> on concerns about overlap or trademark.
>


Re: Version Beam Website Documentation

2019-12-04 Thread Jeff Klukas
The API reference docs (Java and Python at least) are versioned, so we have
a durable reference there and it's possible to link to particular sections
of API docs for particular versions.

For the major bits of introductory documentation (like the Beam Programming
Guide), I think it's a good thing to have only a single version, so that
people referencing it are always getting the most up-to-date wording and
explanations, although it may be worth adding callouts there about minimum
versions anywhere we discuss newer features. We should be encouraging the
community to stay reasonably current, so I think any feature that's present
in the latest LTS release should be fine to assume is available to users,
although perhaps we should also state that explicitly on the website.

Are there particular parts of the Beam website that you have in mind that
would benefit from versioning? Are there specific cases you see where the
current website would be confusing for someone using a Beam SDK that's a
few versions old?

On Tue, Dec 3, 2019 at 6:46 PM Ankur Goenka  wrote:

> Hi,
>
> We are constantly adding features to Beam which makes each new Beam
> version more feature rich and compelling.
> This also means that the old Beam released don't have the new features and
> might have different ways to do certain things.
>
> (I might be wrong here) - Our Beam website only publish a single version
> which is the latest version of documentation.
> This means that the users working with older SDK don't really have an easy
> way to lookup documentation for old versions of Beam.
>
> Proposal: Shall we consider publishing versioned Beam website to help
> users with old Beam version find the relevant information?
>
> Thanks,
> Ankur
>


Reviewer needed for clustering bug fix in Java BigQueryIO.Write

2019-10-22 Thread Jeff Klukas
I've had a one-line bug fix PR open since last week that I'd love to get
merged for 2.17.

Would a committer be willing to take a look at it?

https://github.com/apache/beam/pull/9784


Re: Support for LZO compression.

2019-10-09 Thread Jeff Klukas
Sameer - Also note that there's some prior art for optional components,
particularly compression algorithms. We added support for zstd compression,
but document that it's the user's responsibility to make sure the relevant
library is available on the classpath [0].

There was some interest in a more formal way of defining optional
dependencies, but I expect it would make sense to follow the same general
approach here as we used for zstd, relying on Javadoc to advertise the
user's responsibilities.

[0]
https://github.com/apache/beam/blob/c2f0d282337f3ae0196a7717712396a5a41fdde1/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Compression.java#L132-L153

On Wed, Oct 9, 2019 at 10:10 AM Sameer Abhyankar 
wrote:

> Thanks Luke! This is very helpful! This would certainly be considered as
> an optional component IMO.
>
> On Tue, Oct 8, 2019 at 7:01 PM Luke Cwik  wrote:
>
>> Sorry about that, gave the wrong information.
>>
>> The GPL 1, 2, and 3 all fall under category X licenses [1].
>> "Apache projects may not distribute Category X licensed components, be it
>> in source or binary form; and be it in ASF source code or convenience
>> binaries. As with the previous question on platforms, the component can be
>> relied on if the component's license terms do not affect the Apache
>> product's licensing. For example, using a GPL'ed tool during the build is
>> OK, however including GPL'ed source code is not."
>>
>> But if this is an optional component which does not significantly prevent
>> the majority of users to use the product then it will be ok[2]. Relevant
>> bit is:
>> "Apache projects can rely on components under prohibited licenses if the
>> component is only needed for optional features. When doing so, a project
>> shall provide the user with instructions on how to obtain and install the
>> non-included work. Optional means that the component is not required for
>> standard use of the product or for the product to achieve a desirable level
>> of quality. The question to ask yourself in this situation is:"
>>
>> So in this case I believe we can include the LZO as long as we mark it as
>> optional.
>>
>> 1: https://www.apache.org/legal/resolved.html#category-x
>> 2: https://www.apache.org/legal/resolved.html#optional
>>
>>
>>
>> On Tue, Oct 8, 2019 at 3:51 PM Luke Cwik  wrote:
>>
>>> Which GPL version?
>>>
>>> The Apache License 2.0 is compatible with GPL 3[1]
>>>
>>> 1: https://www.apache.org/foundation/license-faq.html#GPL
>>>
>>> On Tue, Oct 8, 2019 at 2:10 PM Sameer Abhyankar 
>>> wrote:
>>>
 Hi All,

 We were looking to add an IO that would read LZO compressed binaries
 from a supported filesystem. However, based on our research, LZO is shipped
 under the GPL license.

 Would the licensing issue make it unlikely for this to be accepted as a
 contribution into the Beam SDK? Are there options for adding support for
 LZO into the Beam SDK so we dont run into licensing issues?

 Thanks in advance for the help with this!!

 Sameer

>>>
>


Re: Prevent Shuffling on Writing Files

2019-09-18 Thread Jeff Klukas
What you propose with a writer per bundle is definitely possible, but I
expect the blocker is that in most cases the runner has control of bundle
sizes and there's nothing exposed to the user to control that. I've wanted
to do similar, but found average bundle sizes in my case on Dataflow to be
so small that it wasn't feasible to write out a separate file/object per
bundle.

On Wed, Sep 18, 2019 at 4:57 PM Shannon Duncan 
wrote:

> So I followed up on why TextIO shuffles and dug into the code some. It is
> using the shards and getting all the values into a keyed group to write to
> a single file.
>
> However... I wonder if there is way to just take the records that are on a
> worker and write them out. Thus not needing a shard number and doing this.
> Closer to how hadoop handle's writes.
>
> Maybe just a regular pardo and on bundleSetup it creates a writer and
> processElement reuses that writter to write to the same file for all
> elements within a bundle?
>
> I feel like this goes beyond scope of simple user mailing list so I'm
> expanding it to dev as well.
> +dev 
>
> Finding a solution that prevents quadrupling shuffle costs when simply
> writing out a file is a necessity for large scale jobs that work with 100+
> TB of data. If anyone has any ideas I'd love to hear them.
>
> Thanks,
> Shannon Duncan
>
> On Wed, Sep 18, 2019 at 1:06 PM Shannon Duncan 
> wrote:
>
>> We have been using Beam for a bit now. However we just turned on the
>> dataflow shuffle service and were very surprised that the shuffled data
>> amounts were quadruple the amounts we expected.
>>
>> Turns out that the file writing TextIO is doing shuffles within itself.
>>
>> Is there a way to prevent shuffling in the writing phase?
>>
>> Thanks,
>> Shannon Duncan
>>
>


Review needed: BigQuery clustering support for Beam Java API

2019-06-28 Thread Jeff Klukas
Beam devs - I'm looking for review on
https://github.com/apache/beam/pull/8945 which builds on a previous PR by
Wout Sheepers that was never merged due to concerns over evolving the coder
for TableDestination.

The new PR defaults to the existing TableDestinationCoderV2 and ensures the
new TableDestinationCoderV3 is used only when the user has opted in by
calling a clustering-related method.

As far as I can tell, clustered tables are a substantial improvement over
non-clustered tables in BigQuery with few downsides, so I expect
substantial interest in populating clustered tables now that the feature is
GA.


Re: GSOC - Implement an S3 filesystem for Python SDK

2019-05-01 Thread Jeff Klukas
For getting started reading data, there are some public S3 buckets on which
Amazon hosts data used in tutorials. For example, you should be able to
access s3://awssampledbuswest2 which is referenced in Redshift tutorials
[0].

Amazon also has a free tier for S3 for the first year an account is open,
which you could use to write some test data to S3 without incurring charges.

[0]
https://docs.aws.amazon.com/redshift/latest/gsg/rs-gsg-create-sample-db.html

On Wed, May 1, 2019 at 9:08 PM Pasan Kamburugamuwa <
pasankamburugamu...@gmail.com> wrote:

> Hi all,
>   I want to access to a s3 bucket in order to get familiarize
> with the boto3. So can you guys help me in this process.
>
> Thank you
>


Re: Apache Beam, Spark, Hadoop and AWS cross accounts

2019-03-25 Thread Jeff Klukas
I don't think I'm fully understanding the input part of your pipeline since
it looks like it's using some custom IO, but I am fairly confident that
FileIO.write() will never produce empty files, so the behavior you describe
sounds expected if you're reading in an empty file.

FileIO doesn't copy files, but rather is a container for read transforms
that read file contents into a PCollection and write transforms that batch
together records from a PCollection and write serialized contents to files.

I generally would not expect the FileIO machinery to be able to preserve
file structure or naming between input and output. If that's what you want,
then you may want to read file names and write your own ParDo to run S3
copy operations.

On Sun, Mar 24, 2019 at 12:07 PM Carlos Baeza 
wrote:

> Hi Guys,
>
> I’m new in Apache Beam. In my current project we have follow scenario:
>
> - We run transformations via Apache Beam Pipeline into Amazon AWS (using
> Cluster by Spark, Hadoop). We can in future produce big Files.
> - The generated a AVRO file that should be stored from AWS Account A to
> AWS Account B in "S3://some_store". The process is started in Account A
> because here is the required data access layer.
> - The first experimentation shown:
> - A empty file is created (0 bytes -> MyTransformation.avro) in
> Account
> A
> - After process is finished, no file appear in Account B
> "S3://sone_store”. File is missing.
>
> The process defined in Account A look like:
>
> ---
> final SimplePipelineOptions options =
> PipelineOptionsFactory.fromArgs(args).as(SimplePipelineOptions.class);
> final Pipeline pipeline = Pipeline.create(options);
> pipeline.apply(
>  SomeDataIO.read()
>  .withZookeeperQuorum(options.getZookeeperQuorum())
>  .withDataVersion(UUID.fromString(options.getVersionId()),
> options.getDataVersion())
>  .withView(DictionaryModelView
>  .create(MODEL, new ProcessSomeData(;
>
> Join.innerJoin(someData, someData)
>  .apply(Values.create())
>  .apply(ParDo.of(new ExtractSomeData()))
>  .apply(FileIO.>writeDynamic()
>  .by(KV::getKey)
>  .via(fn, AvroIO.sink(AVFeature.class))
>  .withNaming("MyTransformation.avro")
>  .to("S3://sone_store")
>  .withNumShards(1)
>  .withDestinationCoder(StringUtf8Coder.of()));
>
> pipeline.run().waitUntilFinish();
>
> ---
>
> The class ProcessSomeData is responsible to extract some data from our
> persistence layer and process it.
>
> In test, running from Account B, all work fine, we can produce the AVRO
> File (34 KB) and store the file into Account B S3 store ->
> S3://some_store
> But running in the cloud starting the process from Account A, then we
> lost the file (MyTransformation.avro, 0 bytes). -> File has not been
> copied.
> AWS S3 configuration from Account B give full access to Account A.
>
> 1. Some idea what goes wrong?
> 2. Maybe FileIO.Write.to(...) is not able to store data between AWS
> cross accounts?
> 3. Should I create my self a java client to store in Account B?
> 4. Can FileIO copy 0 bytes file?
>
> Any help is appreciate.
>
> Many thanks in advance !
>
> Carlos
>
>
>
>


Re: Add exception handling to MapElements

2019-02-12 Thread Jeff Klukas
I agree that chaining of transforms that might produce errors is a nice
pattern to be able to support, so I very much do want to provide something
like the promise interface you've described.

My currently posted PR supports this by having error-producing transforms
return a WithExceptions.Result collection that allows access to the
"output" and "error" collections, but also provides an `errorsTo` method to
strip the error collection to a list, returning the output collection for
further chaining:

 PCollection input = ...
 List> errorCollections = new ArrayList<>();
 PCollection output = input
  .apply(MapElements...withExceptions()...)
  .errorsTo(errorCollections)
  .apply(MapElements...withExceptions()...)
  .errorsTo(errorCollections);
 PCollection> errors =
PCollectionList.of(errorCollections)
  .apply("FlattenErrorCollections", Flatten.pCollections());

The need for collections with full type information and coders puts some
different constraints on the APIs for Beam. There may well be some deeper
changes possible (I believe Reuven previously suggested investigating
building support for error handling into the DoFn interface) but I see that
as an independent project of larger scope compared to this effort. Given
the current structure of DoFns, PTransforms, and coders, it's necessary to
provide an exception handler within the processing logic of the DoFn
itself, in order to produce a codable error collection that can be passed
to additional processing phases.

To restate, my proposal is about establishing a pattern for easily allowing
users to inject exception handling code to a DoFn wrapped in a PTransform,
and get out some error collection. It's an approach that's viable in the
short term without significant changes to deeper layers of the Beam Java
SDK. My intention with the PR was to factor as much reusable logic out as
possible to the new WithExceptions class so that additional transforms
within the SDK and user-defined transforms would be able to use the same
pattern that we're building into MapElements and FlatMapElements.

Sam - Does the above seem to provide the kind of functionality you like
from promises? Do you see ways to evolve this approach to better
incorporate successful ideas from promises? Or do you see opportunities for
deeper changes that we should pursue instead of the above?


On Mon, Feb 11, 2019 at 5:17 PM Sam Rohde  wrote:

> Sure, I was thinking of treating the apply as a promise (making use of
> your CodableException idea as well):
>
> ```
> PCollection<...> result = words.apply(new SomeUserDoFn())
>
> .then(new SomeOtherDoFn())
>
> .then(new OtherDoFn(),
>
>  // Error Handler
>
>   (CodableException<...> e) -> {
>
> logger.info(e.getMessage());
>
> return e;
>
>  });
>
> ```
>
> The idea is to treat the pipeline with each apply as an asynchronous
> operation where each step can either be "fulfilled" or "rejected". The
> promises can then be chained together like above.
>
>
> On Mon, Feb 11, 2019 at 1:47 PM Jeff Klukas  wrote:
>
>> Vallery Lancey's post is definitely one of the viewpoints incorporated
>> into this approach. I neglected to include that link in this iteration, but
>> it was discussed in the previous thread.
>>
>> Can you explain more about "another option that adds A+ Promise spec into
>> the apply method"? I'm failing to parse what that means.
>>
>> On Mon, Feb 11, 2019 at 4:23 PM Sam Rohde  wrote:
>>
>>> Interesting ideas! I think you're really honing in on what the Apache
>>> Beam API is missing: error handling for bad data and runtime errors. I like
>>> your method because it coalesces all the errors into a single collection to
>>> be looked at later. Also easy to add a PAssert on the errors collection.
>>>
>>> Looks like others are also taking a stab at exception handling:
>>> https://medium.com/@vallerylancey/error-handling-elements-in-apache-beam-pipelines-fffdea91af2a
>>>
>>> I would also like to add another option that adds A+ Promise spec into
>>> the apply method. This makes exception handling more general than with only
>>> the Map method.
>>>
>>> On Fri, Feb 8, 2019 at 9:53 AM Jeff Klukas  wrote:
>>>
>>>> I'm looking for feedback on a new attempt at implementing an exception
>>>> handling interface for map transforms as previously discussed on this list
>>>> [0] and documented in JIRA [1]. I'd like for users to be able to pass a
>>>> function into MapElements, FlatMapElements, etc. that potentially raises an
>>>> exception without having to resort to rolling a completely custom ParDo

Re: Add exception handling to MapElements

2019-02-11 Thread Jeff Klukas
Vallery Lancey's post is definitely one of the viewpoints incorporated into
this approach. I neglected to include that link in this iteration, but it
was discussed in the previous thread.

Can you explain more about "another option that adds A+ Promise spec into
the apply method"? I'm failing to parse what that means.

On Mon, Feb 11, 2019 at 4:23 PM Sam Rohde  wrote:

> Interesting ideas! I think you're really honing in on what the Apache Beam
> API is missing: error handling for bad data and runtime errors. I like your
> method because it coalesces all the errors into a single collection to be
> looked at later. Also easy to add a PAssert on the errors collection.
>
> Looks like others are also taking a stab at exception handling:
> https://medium.com/@vallerylancey/error-handling-elements-in-apache-beam-pipelines-fffdea91af2a
>
> I would also like to add another option that adds A+ Promise spec into the
> apply method. This makes exception handling more general than with only the
> Map method.
>
> On Fri, Feb 8, 2019 at 9:53 AM Jeff Klukas  wrote:
>
>> I'm looking for feedback on a new attempt at implementing an exception
>> handling interface for map transforms as previously discussed on this list
>> [0] and documented in JIRA [1]. I'd like for users to be able to pass a
>> function into MapElements, FlatMapElements, etc. that potentially raises an
>> exception without having to resort to rolling a completely custom ParDo
>> with an additional output for failing elements.
>>
>> I have a PR open for review [2] that introduces an exception-handling
>> interface that mimics the existing `into` and `via` methods of MapElements:
>>
>>  Result, String> result = words.apply(
>>  MapElements.into(TypeDescriptors.integers())
>> .via((String word) -> 1 / word.length()) // throws
>> ArithmeticException
>> .withExceptions() // returns a MapWithFailures transform
>> .into(TypeDescriptors.strings())
>> .via(ee -> e.exception().getMessage()));
>>  PCollection errors = result.errors();
>>  PCollection output = result.output();
>>
>>
>>
>> The example above is a bit more complex than I'd like, but gives users
>> full control over what type handled exceptions are transformed into. It
>> would be nice if we could simply create an error collection of some type
>> that wraps the input element and the Exception directly, but there is still
>> no general solution for encoding subclasses of exception, thus the need for
>> some exception handling function (which in this example is the lambda
>> passed to the second `via`).
>>
>> Let's call the above option 1.
>>
>> If we expect that most users will simply want to preserve the input
>> element that failed and know general metadata about the exception
>> (className, message, and stackTrace), we could instead optimize for a
>> default solution where we return an instance of a new
>> CodableException[InputT] type that wraps the input element and has
>> additional string fields for className, message, and stackTrace:
>>
>>  Result, String> result = words.apply(
>>  MapElements.into(TypeDescriptors.integers())
>> .via((String word) -> 1 / word.length())
>> .withExceptions());
>>  PCollection> errors = result.errors();
>>  PCollection output = result.output();
>>
>> Let's call this option 2.
>>
>> It's less user code compared to option 1 and returns a richer error
>> collection. I believe we'd be able to handle setting an appropriate coder
>> behind the scenes, setting some composite coder that reuses the coder for
>> the input collection in order to encode the InputT instance.
>>
>> I think we'd still need to provide some additional methods, though, if
>> the user wants to set a custom exception handling function and custom
>> coder. That would be for needs where a user wants to catch only a
>> particular subclass of exception, or access additional methods of Exception
>> (to access getCause() perhaps) or methods particular to an Exception
>> subclass. The full API would end up being more complex compared to option
>> 1, but it does make the default case much easier to use.
>>
>> If it's not fairly obvious what's going on in either of the above
>> examples, then we likely haven't figured out an appropriate API yet.
>> Reviews on the PR or commentary on the above two options would be
>> appreciated.
>>
>> [0]
>> https://lists.apache.org/thread.html/936ed2a5f2c01be066fd903abf70130625e0b8cf4028c11b89b8b23f@%3Cdev.beam.apache.org%3E
>> <https://lists.apache.org/thread.html/936ed2a5f2c01be066fd903abf70130625e0b8cf4028c11b89b8b23f@%3Cdev.beam.apache.org%3E>
>> [1] https://issues.apache.org/jira/browse/BEAM-5638
>> [2] https://github.com/apache/beam/pull/7736
>>
>


Add exception handling to MapElements

2019-02-08 Thread Jeff Klukas
I'm looking for feedback on a new attempt at implementing an exception
handling interface for map transforms as previously discussed on this list
[0] and documented in JIRA [1]. I'd like for users to be able to pass a
function into MapElements, FlatMapElements, etc. that potentially raises an
exception without having to resort to rolling a completely custom ParDo
with an additional output for failing elements.

I have a PR open for review [2] that introduces an exception-handling
interface that mimics the existing `into` and `via` methods of MapElements:

 Result, String> result = words.apply(
 MapElements.into(TypeDescriptors.integers())
.via((String word) -> 1 / word.length()) // throws
ArithmeticException
.withExceptions() // returns a MapWithFailures transform
.into(TypeDescriptors.strings())
.via(ee -> e.exception().getMessage()));
 PCollection errors = result.errors();
 PCollection output = result.output();



The example above is a bit more complex than I'd like, but gives users full
control over what type handled exceptions are transformed into. It would be
nice if we could simply create an error collection of some type that wraps
the input element and the Exception directly, but there is still no general
solution for encoding subclasses of exception, thus the need for some
exception handling function (which in this example is the lambda passed to
the second `via`).

Let's call the above option 1.

If we expect that most users will simply want to preserve the input element
that failed and know general metadata about the exception (className,
message, and stackTrace), we could instead optimize for a default solution
where we return an instance of a new CodableException[InputT] type that
wraps the input element and has additional string fields for className,
message, and stackTrace:

 Result, String> result = words.apply(
 MapElements.into(TypeDescriptors.integers())
.via((String word) -> 1 / word.length())
.withExceptions());
 PCollection> errors = result.errors();
 PCollection output = result.output();

Let's call this option 2.

It's less user code compared to option 1 and returns a richer error
collection. I believe we'd be able to handle setting an appropriate coder
behind the scenes, setting some composite coder that reuses the coder for
the input collection in order to encode the InputT instance.

I think we'd still need to provide some additional methods, though, if the
user wants to set a custom exception handling function and custom coder.
That would be for needs where a user wants to catch only a particular
subclass of exception, or access additional methods of Exception (to access
getCause() perhaps) or methods particular to an Exception subclass. The
full API would end up being more complex compared to option 1, but it does
make the default case much easier to use.

If it's not fairly obvious what's going on in either of the above examples,
then we likely haven't figured out an appropriate API yet. Reviews on the
PR or commentary on the above two options would be appreciated.

[0]
https://lists.apache.org/thread.html/936ed2a5f2c01be066fd903abf70130625e0b8cf4028c11b89b8b23f@%3Cdev.beam.apache.org%3E

[1] https://issues.apache.org/jira/browse/BEAM-5638
[2] https://github.com/apache/beam/pull/7736


Re: pipeline steps

2019-02-07 Thread Jeff Klukas
I haven't needed to do this with Beam before, but I've definitely had
similar needs in the past. Spark, for example, provides an input_file_name
function that can be applied to a dataframe to add the input file as an
additional column. It's not clear to me how that's implemented, though.

Perhaps others have suggestions, but I'm not aware of a way to do this
conveniently in Beam today. To my knowledge, today you would have to use
FileIO.match() and FileIO.readMatches() to get a collection of
ReadableFile. You'd then have to FlatMapElements to pull out the metadata
and the bytes of the file, and you'd be responsible for parsing those bytes
into avro records. You'd  be able to output something like a KV
that groups the file name together with the parsed avro record.

Seems like something worth providing better support for in Beam itself if
this indeed doesn't already exist.

On Thu, Feb 7, 2019 at 7:29 AM Chaim Turkel  wrote:

> Hi,
>   I am working on a pipeline that listens to a topic on pubsub to get
> files that have changes in the storage. Then i read avro files, and
> would like to write them to bigquery based on the file name (to
> different tables).
>   My problem is that the transformer that reads the avro does not give
> me back the files name (like a tuple or something like that). I seem
> to have this pattern come back a lot.
> Can you think of any solutions?
>
> Chaim
>
> --
>
>
> Loans are funded by
> FinWise Bank, a Utah-chartered bank located in Sandy,
> Utah, member FDIC, Equal
> Opportunity Lender. Merchant Cash Advances are
> made by Behalf. For more
> information on ECOA, click here
> . For important information about
> opening a new
> account, review Patriot Act procedures here
> .
> Visit Legal
>  to
> review our comprehensive program terms,
> conditions, and disclosures.
>


Schemas for classes with type parameters

2019-02-04 Thread Jeff Klukas
I've started experimenting with Beam schemas in the context of creating
custom AutoValue-based classes and using AutoValueSchema to generate
schemas and thus coders.

AFAICT, schemas need to have types fully specified, so it doesn't appear to
be possible to define an AutoValue class with a type parameter and then
create a schema for it. Basically, I want to confirm whether the following
type would ever be possible to create a schema for:

@DefaultSchema(AutoValueSchema.class)
@AutoValue
public abstract class MyClass {
  public abstract T getField1();
  public abstract String getField2();
  public static  MyClass of(T field1, String field2) {
return new AutoValue_MyClass(field1, field2);
  }
}

This may be an entirely reasonable limitation of the schema machinery, but
I want to make sure I'm not missing something.


Re: [DISCUSS] Should File based IOs implement readAll() or just readFiles()

2019-01-30 Thread Jeff Klukas
Reuven - Is TextIO.read().from() a more complex case than the topic Ismaël
is bringing up in this thread? I'm surprised to hear that the two examples
have different performance characteristics.

Reading through the implementation, I guess the fundamental difference is
whether a given configuration expands to TextIO.ReadAll or to io.Read.
AFAICT, that detail and the subsequent performance impact is not
documented.

If the above is correct, perhaps it's an argument for IOs to provide
higher-level methods in cases where they can optimize performance compared
to what a user might naively put together.

On Wed, Jan 30, 2019 at 12:35 PM Reuven Lax  wrote:

> Jeff, what you did here is not simply a refactoring. These two are quite
> different, and will likely have different performance characteristics.
>
> The first evaluates the wildcard, and allows the runner to pick
> appropriate bundling. Bundles might contain multiple files (if they are
> small), and the runner can split the files as appropriate. In the case of
> the Dataflow runner, these bundles can be further split dynamically.
>
> The second chops of the files inside the the PTransform, and processes
> each chunk in a ParDo. TextIO.readFiles currently chops up each file into
> 64mb chunks (hardcoded), and then processes each chunk in a ParDo.
>
> Reuven
>
>
> On Wed, Jan 30, 2019 at 9:18 AM Jeff Klukas  wrote:
>
>> I would prefer we move towards option [2]. I just tried the following
>> refactor in my own code from:
>>
>>   return input
>>   .apply(TextIO.read().from(fileSpec));
>>
>> to:
>>
>>   return input
>>   .apply(FileIO.match().filepattern(fileSpec))
>>   .apply(FileIO.readMatches())
>>   .apply(TextIO.readFiles());
>>
>> Yes, the latter is more verbose but not ridiculously so, and it's also
>> more instructive about what's happening.
>>
>> When I first started working with Beam, it took me a while to realize
>> that TextIO.read().from() would accept a wildcard. The more verbose version
>> involves a method called "filepattern" which makes this much more obvious.
>> It also leads me to understand that I could use the same FileIO.match()
>> machinery to do other things with filesystems other than read file
>> contents.
>>
>> On Wed, Jan 30, 2019 at 11:26 AM Ismaël Mejía  wrote:
>>
>>> Hello,
>>>
>>> A ‘recent’ pattern of use in Beam is to have in file based IOs a
>>> `readAll()` implementation that basically matches a `PCollection` of
>>> file patterns and reads them, e.g. `TextIO`, `AvroIO`. `ReadAll` is
>>> implemented by a expand function that matches files with FileIO and
>>> then reads them using a format specific `ReadFiles` transform e.g.
>>> TextIO.ReadFiles, AvroIO.ReadFiles. So in the end `ReadAll` in the
>>> Java implementation is just an user friendly API to hide FileIO.match
>>> + ReadFiles.
>>>
>>> Most recent IOs do NOT implement ReadAll to encourage the more
>>> composable approach of File + ReadFiles, e.g. XmlIO and ParquetIO.
>>>
>>> Implementing ReadAll as a wrapper is relatively easy and is definitely
>>> user friendly, but it has an  issue, it may be error-prone and it adds
>>> more code to maintain (mostly ‘repeated’ code). However `readAll` is a
>>> more abstract pattern that applies not only to File based IOs so it
>>> makes sense for example in other transforms that map a `Pcollection`
>>> of read requests and is the basis for SDF composable style APIs like
>>> the recent `HBaseIO.readAll()`.
>>>
>>> So the question is should we:
>>>
>>> [1] Implement `readAll` in all file based IOs to be user friendly and
>>> assume the (minor) maintenance cost
>>>
>>> or
>>>
>>> [2] Deprecate `readAll` from file based IOs and encourage users to use
>>> FileIO + `readFiles` (less maintenance and encourage composition).
>>>
>>> I just checked quickly in the python code base but I did not find if
>>> the File match + ReadFiles pattern applies, but it would be nice to
>>> see what the python guys think on this too.
>>>
>>> This discussion comes from a recent slack conversation with Łukasz
>>> Gajowy, and we wanted to settle into one approach to make the IO
>>> signatures consistent, so any opinions/preferences?
>>>
>>


Re: [DISCUSS] Should File based IOs implement readAll() or just readFiles()

2019-01-30 Thread Jeff Klukas
I would prefer we move towards option [2]. I just tried the following
refactor in my own code from:

  return input
  .apply(TextIO.read().from(fileSpec));

to:

  return input
  .apply(FileIO.match().filepattern(fileSpec))
  .apply(FileIO.readMatches())
  .apply(TextIO.readFiles());

Yes, the latter is more verbose but not ridiculously so, and it's also more
instructive about what's happening.

When I first started working with Beam, it took me a while to realize that
TextIO.read().from() would accept a wildcard. The more verbose version
involves a method called "filepattern" which makes this much more obvious.
It also leads me to understand that I could use the same FileIO.match()
machinery to do other things with filesystems other than read file
contents.

On Wed, Jan 30, 2019 at 11:26 AM Ismaël Mejía  wrote:

> Hello,
>
> A ‘recent’ pattern of use in Beam is to have in file based IOs a
> `readAll()` implementation that basically matches a `PCollection` of
> file patterns and reads them, e.g. `TextIO`, `AvroIO`. `ReadAll` is
> implemented by a expand function that matches files with FileIO and
> then reads them using a format specific `ReadFiles` transform e.g.
> TextIO.ReadFiles, AvroIO.ReadFiles. So in the end `ReadAll` in the
> Java implementation is just an user friendly API to hide FileIO.match
> + ReadFiles.
>
> Most recent IOs do NOT implement ReadAll to encourage the more
> composable approach of File + ReadFiles, e.g. XmlIO and ParquetIO.
>
> Implementing ReadAll as a wrapper is relatively easy and is definitely
> user friendly, but it has an  issue, it may be error-prone and it adds
> more code to maintain (mostly ‘repeated’ code). However `readAll` is a
> more abstract pattern that applies not only to File based IOs so it
> makes sense for example in other transforms that map a `Pcollection`
> of read requests and is the basis for SDF composable style APIs like
> the recent `HBaseIO.readAll()`.
>
> So the question is should we:
>
> [1] Implement `readAll` in all file based IOs to be user friendly and
> assume the (minor) maintenance cost
>
> or
>
> [2] Deprecate `readAll` from file based IOs and encourage users to use
> FileIO + `readFiles` (less maintenance and encourage composition).
>
> I just checked quickly in the python code base but I did not find if
> the File match + ReadFiles pattern applies, but it would be nice to
> see what the python guys think on this too.
>
> This discussion comes from a recent slack conversation with Łukasz
> Gajowy, and we wanted to settle into one approach to make the IO
> signatures consistent, so any opinions/preferences?
>


Re: FileIOTest.testMatchWatchForNewFiles flakey in java presubmit

2019-01-23 Thread Jeff Klukas
Thanks for the review, Alex. I pushed an additional commit to add
commentary and to explicitly pass a copy option that indicates we want to
preserve attributes like timestamps.

On Wed, Jan 23, 2019 at 12:23 PM Alex Amato  wrote:

> Thanks Jeff, I reviewed your PR with one suggestion to add a comment to
> make the test more clear. I am assuming the modified times get copied, not
> re-timestamped on copy, which is why your method works.
> Otherwise looks good to me
>
> On Wed, Jan 23, 2019 at 5:49 AM Jeff Klukas  wrote:
>
>> Posted https://github.com/apache/beam/pull/7599
>>
>> That PR follows suggestion #4. I chose that route because it maintains
>> the PAssert containsInAnyOrder check which seems easier to read and more
>> straight-forward than PAssert satisfies.
>>
>> Do let me know if you disagree and I can switch back to Eugene's
>> suggestion #1.
>>
>> On Wed, Jan 23, 2019 at 8:12 AM Jeff Klukas  wrote:
>>
>>> Suggestion #4: Create source files outside the writer thread, and then
>>> copy them from a source directory to the watched directory. That should
>>> atomically write the file with the already known lastModificationTime.
>>>
>>> On Wed, Jan 23, 2019 at 7:37 AM Jeff Klukas  wrote:
>>>
>>>> I'll work on getting a PR together this morning, probably following
>>>> Eugene's suggestion #1.
>>>>
>>>> On Tue, Jan 22, 2019 at 8:34 PM Udi Meiri  wrote:
>>>>
>>>>> Alex, the only way to implement my suggestion #1 (that I know of)
>>>>> would be to write to a file and read it back.
>>>>> I don't have good example for #2.
>>>>>
>>>>> Eugene's suggestion no. 1 seems like a good idea. There are some
>>>>> example
>>>>> <https://github.com/apache/beam/blob/324a1bcc820945731ccce7dd7e5354247b841356/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIOWriteTest.java#L335-L340>
>>>>> in the codebase.
>>>>>
>>>>> On Tue, Jan 22, 2019 at 5:16 PM Eugene Kirpichov 
>>>>> wrote:
>>>>>
>>>>>> Yeah the "List expected" is constructed
>>>>>> from Files.getLastModifiedTime() calls before the files are actually
>>>>>> modified, the code is basically unconditionally broken rather than merely
>>>>>> flaky.
>>>>>>
>>>>>> There's several easy options:
>>>>>> 1) Use PAssert.that().satisfies() instead of .contains(), and use
>>>>>> assertThat().contains() inside that, with the list constructed at time 
>>>>>> the
>>>>>> assertion is applied rather than declared.
>>>>>> 2) Implement a Matcher that ignores last modified time and
>>>>>> use that
>>>>>>
>>>>>> Jeff - your option #3 is unfortunately also race-prone, because the
>>>>>> code may match the files after they have been written but before
>>>>>> setLastModifiedTime was called.
>>>>>>
>>>>>> On Tue, Jan 22, 2019 at 5:08 PM Jeff Klukas  wrote:
>>>>>>
>>>>>>> Another option:
>>>>>>>
>>>>>>> #3 Have the writer thread call Files.setLastModifiedTime explicitly
>>>>>>> after each File.write. Then the lastModifiedMillis can be a stable value
>>>>>>> for each file and we can use those same static values in our expected
>>>>>>> result. I think that would also eliminate the race condition.
>>>>>>>
>>>>>>> On Tue, Jan 22, 2019 at 7:48 PM Alex Amato 
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Thanks Udi, is there a good example for either of these?
>>>>>>>> #1 - seems like you have to rewrite your assertion logic without
>>>>>>>> the PAssert? Is there some way to capture the pipeline output and 
>>>>>>>> iterate
>>>>>>>> over it? The pattern I have seen for this in the past also has thread
>>>>>>>> safety issues (Using a DoFn at the end of the pipeline to add the 
>>>>>>>> output to
>>>>>>>> a collection is not safe since the collection can be executed 
>>>>>>>> concurrently)
>>>>>>>> #2 - Would BigqueryMatcher be a good example for this? which is
>>>>>>>> used in BigQu

Re: FileIOTest.testMatchWatchForNewFiles flakey in java presubmit

2019-01-23 Thread Jeff Klukas
Posted https://github.com/apache/beam/pull/7599

That PR follows suggestion #4. I chose that route because it maintains the
PAssert containsInAnyOrder check which seems easier to read and more
straight-forward than PAssert satisfies.

Do let me know if you disagree and I can switch back to Eugene's suggestion
#1.

On Wed, Jan 23, 2019 at 8:12 AM Jeff Klukas  wrote:

> Suggestion #4: Create source files outside the writer thread, and then
> copy them from a source directory to the watched directory. That should
> atomically write the file with the already known lastModificationTime.
>
> On Wed, Jan 23, 2019 at 7:37 AM Jeff Klukas  wrote:
>
>> I'll work on getting a PR together this morning, probably following
>> Eugene's suggestion #1.
>>
>> On Tue, Jan 22, 2019 at 8:34 PM Udi Meiri  wrote:
>>
>>> Alex, the only way to implement my suggestion #1 (that I know of) would
>>> be to write to a file and read it back.
>>> I don't have good example for #2.
>>>
>>> Eugene's suggestion no. 1 seems like a good idea. There are some example
>>> <https://github.com/apache/beam/blob/324a1bcc820945731ccce7dd7e5354247b841356/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIOWriteTest.java#L335-L340>
>>> in the codebase.
>>>
>>> On Tue, Jan 22, 2019 at 5:16 PM Eugene Kirpichov 
>>> wrote:
>>>
>>>> Yeah the "List expected" is constructed
>>>> from Files.getLastModifiedTime() calls before the files are actually
>>>> modified, the code is basically unconditionally broken rather than merely
>>>> flaky.
>>>>
>>>> There's several easy options:
>>>> 1) Use PAssert.that().satisfies() instead of .contains(), and use
>>>> assertThat().contains() inside that, with the list constructed at time the
>>>> assertion is applied rather than declared.
>>>> 2) Implement a Matcher that ignores last modified time and
>>>> use that
>>>>
>>>> Jeff - your option #3 is unfortunately also race-prone, because the
>>>> code may match the files after they have been written but before
>>>> setLastModifiedTime was called.
>>>>
>>>> On Tue, Jan 22, 2019 at 5:08 PM Jeff Klukas  wrote:
>>>>
>>>>> Another option:
>>>>>
>>>>> #3 Have the writer thread call Files.setLastModifiedTime explicitly
>>>>> after each File.write. Then the lastModifiedMillis can be a stable value
>>>>> for each file and we can use those same static values in our expected
>>>>> result. I think that would also eliminate the race condition.
>>>>>
>>>>> On Tue, Jan 22, 2019 at 7:48 PM Alex Amato  wrote:
>>>>>
>>>>>> Thanks Udi, is there a good example for either of these?
>>>>>> #1 - seems like you have to rewrite your assertion logic without the
>>>>>> PAssert? Is there some way to capture the pipeline output and iterate 
>>>>>> over
>>>>>> it? The pattern I have seen for this in the past also has thread safety
>>>>>> issues (Using a DoFn at the end of the pipeline to add the output to a
>>>>>> collection is not safe since the collection can be executed concurrently)
>>>>>> #2 - Would BigqueryMatcher be a good example for this? which is used
>>>>>> in BigQueryTornadoesIT.java Or is there another example you would suggest
>>>>>> looking at for reference?
>>>>>>
>>>>>>- I guess to this you need to implement the SerializableMatcher
>>>>>>interface and use the matcher as an option in the pipeline options.
>>>>>>
>>>>>>
>>>>>> On Tue, Jan 22, 2019 at 4:28 PM Udi Meiri  wrote:
>>>>>>
>>>>>>> Some options:
>>>>>>> - You could wait to assert until after p.waitForFinish().
>>>>>>> - You could PAssert using SerializableMatcher and allow any
>>>>>>> lastModifiedTime.
>>>>>>>
>>>>>>> On Tue, Jan 22, 2019 at 3:56 PM Alex Amato 
>>>>>>> wrote:
>>>>>>>
>>>>>>>> +Jeff, Eugene,
>>>>>>>>
>>>>>>>> Hi Jeff and Eugene,
>>>>>>>>
>>>>>>>> I've noticed that Jeff's PR
>>>>>>>> <https://github.com/apache/beam/commit/410d6c7b5f933dcb0280894553c1e576ee4e4884>
>>>>>>>>  introduced
>>>>>>>> a race condition in this test, but its not clear exactly how to add 
>>>>>>>> Jeff's
>>>>>>>> test check in a thread safe way. I believe this to be the source of the
>>>>>>>> flakeyness Do you have any suggestions Eugene (since you authored this
>>>>>>>> test)?
>>>>>>>>
>>>>>>>> I added some details to this JIRA issue explaining in full
>>>>>>>> https://jira.apache.org/jira/browse/BEAM-6491?filter=-2
>>>>>>>>
>>>>>>>>
>>>>>>>> On Tue, Jan 22, 2019 at 3:34 PM Alex Amato 
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> I've seen this fail in a few different PRs for different
>>>>>>>>> contributors, and its causing some issues during the presubmit 
>>>>>>>>> process..
>>>>>>>>> This is a multithreadred test with a lot of sleeps, so it looks a bit
>>>>>>>>> suspicious as the source of the problem.
>>>>>>>>>
>>>>>>>>> https://builds.apache.org/job/beam_PreCommit_Java_Commit/3688/testReport/org.apache.beam.sdk.io/FileIOTest/testMatchWatchForNewFiles/
>>>>>>>>>
>>>>>>>>> I filed a JIRA for this issue:
>>>>>>>>> https://jira.apache.org/jira/browse/BEAM-6491?filter=-2
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>


Re: FileIOTest.testMatchWatchForNewFiles flakey in java presubmit

2019-01-23 Thread Jeff Klukas
Suggestion #4: Create source files outside the writer thread, and then copy
them from a source directory to the watched directory. That should
atomically write the file with the already known lastModificationTime.

On Wed, Jan 23, 2019 at 7:37 AM Jeff Klukas  wrote:

> I'll work on getting a PR together this morning, probably following
> Eugene's suggestion #1.
>
> On Tue, Jan 22, 2019 at 8:34 PM Udi Meiri  wrote:
>
>> Alex, the only way to implement my suggestion #1 (that I know of) would
>> be to write to a file and read it back.
>> I don't have good example for #2.
>>
>> Eugene's suggestion no. 1 seems like a good idea. There are some example
>> <https://github.com/apache/beam/blob/324a1bcc820945731ccce7dd7e5354247b841356/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIOWriteTest.java#L335-L340>
>> in the codebase.
>>
>> On Tue, Jan 22, 2019 at 5:16 PM Eugene Kirpichov 
>> wrote:
>>
>>> Yeah the "List expected" is constructed
>>> from Files.getLastModifiedTime() calls before the files are actually
>>> modified, the code is basically unconditionally broken rather than merely
>>> flaky.
>>>
>>> There's several easy options:
>>> 1) Use PAssert.that().satisfies() instead of .contains(), and use
>>> assertThat().contains() inside that, with the list constructed at time the
>>> assertion is applied rather than declared.
>>> 2) Implement a Matcher that ignores last modified time and use
>>> that
>>>
>>> Jeff - your option #3 is unfortunately also race-prone, because the code
>>> may match the files after they have been written but before
>>> setLastModifiedTime was called.
>>>
>>> On Tue, Jan 22, 2019 at 5:08 PM Jeff Klukas  wrote:
>>>
>>>> Another option:
>>>>
>>>> #3 Have the writer thread call Files.setLastModifiedTime explicitly
>>>> after each File.write. Then the lastModifiedMillis can be a stable value
>>>> for each file and we can use those same static values in our expected
>>>> result. I think that would also eliminate the race condition.
>>>>
>>>> On Tue, Jan 22, 2019 at 7:48 PM Alex Amato  wrote:
>>>>
>>>>> Thanks Udi, is there a good example for either of these?
>>>>> #1 - seems like you have to rewrite your assertion logic without the
>>>>> PAssert? Is there some way to capture the pipeline output and iterate over
>>>>> it? The pattern I have seen for this in the past also has thread safety
>>>>> issues (Using a DoFn at the end of the pipeline to add the output to a
>>>>> collection is not safe since the collection can be executed concurrently)
>>>>> #2 - Would BigqueryMatcher be a good example for this? which is used
>>>>> in BigQueryTornadoesIT.java Or is there another example you would suggest
>>>>> looking at for reference?
>>>>>
>>>>>- I guess to this you need to implement the SerializableMatcher
>>>>>interface and use the matcher as an option in the pipeline options.
>>>>>
>>>>>
>>>>> On Tue, Jan 22, 2019 at 4:28 PM Udi Meiri  wrote:
>>>>>
>>>>>> Some options:
>>>>>> - You could wait to assert until after p.waitForFinish().
>>>>>> - You could PAssert using SerializableMatcher and allow any
>>>>>> lastModifiedTime.
>>>>>>
>>>>>> On Tue, Jan 22, 2019 at 3:56 PM Alex Amato 
>>>>>> wrote:
>>>>>>
>>>>>>> +Jeff, Eugene,
>>>>>>>
>>>>>>> Hi Jeff and Eugene,
>>>>>>>
>>>>>>> I've noticed that Jeff's PR
>>>>>>> <https://github.com/apache/beam/commit/410d6c7b5f933dcb0280894553c1e576ee4e4884>
>>>>>>>  introduced
>>>>>>> a race condition in this test, but its not clear exactly how to add 
>>>>>>> Jeff's
>>>>>>> test check in a thread safe way. I believe this to be the source of the
>>>>>>> flakeyness Do you have any suggestions Eugene (since you authored this
>>>>>>> test)?
>>>>>>>
>>>>>>> I added some details to this JIRA issue explaining in full
>>>>>>> https://jira.apache.org/jira/browse/BEAM-6491?filter=-2
>>>>>>>
>>>>>>>
>>>>>>> On Tue, Jan 22, 2019 at 3:34 PM Alex Amato 
>>>>>>> wrote:
>>>>>>>
>>>>>>>> I've seen this fail in a few different PRs for different
>>>>>>>> contributors, and its causing some issues during the presubmit 
>>>>>>>> process..
>>>>>>>> This is a multithreadred test with a lot of sleeps, so it looks a bit
>>>>>>>> suspicious as the source of the problem.
>>>>>>>>
>>>>>>>> https://builds.apache.org/job/beam_PreCommit_Java_Commit/3688/testReport/org.apache.beam.sdk.io/FileIOTest/testMatchWatchForNewFiles/
>>>>>>>>
>>>>>>>> I filed a JIRA for this issue:
>>>>>>>> https://jira.apache.org/jira/browse/BEAM-6491?filter=-2
>>>>>>>>
>>>>>>>>
>>>>>>>>


Re: FileIOTest.testMatchWatchForNewFiles flakey in java presubmit

2019-01-23 Thread Jeff Klukas
I'll work on getting a PR together this morning, probably following
Eugene's suggestion #1.

On Tue, Jan 22, 2019 at 8:34 PM Udi Meiri  wrote:

> Alex, the only way to implement my suggestion #1 (that I know of) would be
> to write to a file and read it back.
> I don't have good example for #2.
>
> Eugene's suggestion no. 1 seems like a good idea. There are some example
> <https://github.com/apache/beam/blob/324a1bcc820945731ccce7dd7e5354247b841356/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIOWriteTest.java#L335-L340>
> in the codebase.
>
> On Tue, Jan 22, 2019 at 5:16 PM Eugene Kirpichov 
> wrote:
>
>> Yeah the "List expected" is constructed
>> from Files.getLastModifiedTime() calls before the files are actually
>> modified, the code is basically unconditionally broken rather than merely
>> flaky.
>>
>> There's several easy options:
>> 1) Use PAssert.that().satisfies() instead of .contains(), and use
>> assertThat().contains() inside that, with the list constructed at time the
>> assertion is applied rather than declared.
>> 2) Implement a Matcher that ignores last modified time and use
>> that
>>
>> Jeff - your option #3 is unfortunately also race-prone, because the code
>> may match the files after they have been written but before
>> setLastModifiedTime was called.
>>
>> On Tue, Jan 22, 2019 at 5:08 PM Jeff Klukas  wrote:
>>
>>> Another option:
>>>
>>> #3 Have the writer thread call Files.setLastModifiedTime explicitly
>>> after each File.write. Then the lastModifiedMillis can be a stable value
>>> for each file and we can use those same static values in our expected
>>> result. I think that would also eliminate the race condition.
>>>
>>> On Tue, Jan 22, 2019 at 7:48 PM Alex Amato  wrote:
>>>
>>>> Thanks Udi, is there a good example for either of these?
>>>> #1 - seems like you have to rewrite your assertion logic without the
>>>> PAssert? Is there some way to capture the pipeline output and iterate over
>>>> it? The pattern I have seen for this in the past also has thread safety
>>>> issues (Using a DoFn at the end of the pipeline to add the output to a
>>>> collection is not safe since the collection can be executed concurrently)
>>>> #2 - Would BigqueryMatcher be a good example for this? which is used in
>>>> BigQueryTornadoesIT.java Or is there another example you would suggest
>>>> looking at for reference?
>>>>
>>>>- I guess to this you need to implement the SerializableMatcher
>>>>interface and use the matcher as an option in the pipeline options.
>>>>
>>>>
>>>> On Tue, Jan 22, 2019 at 4:28 PM Udi Meiri  wrote:
>>>>
>>>>> Some options:
>>>>> - You could wait to assert until after p.waitForFinish().
>>>>> - You could PAssert using SerializableMatcher and allow any
>>>>> lastModifiedTime.
>>>>>
>>>>> On Tue, Jan 22, 2019 at 3:56 PM Alex Amato  wrote:
>>>>>
>>>>>> +Jeff, Eugene,
>>>>>>
>>>>>> Hi Jeff and Eugene,
>>>>>>
>>>>>> I've noticed that Jeff's PR
>>>>>> <https://github.com/apache/beam/commit/410d6c7b5f933dcb0280894553c1e576ee4e4884>
>>>>>>  introduced
>>>>>> a race condition in this test, but its not clear exactly how to add 
>>>>>> Jeff's
>>>>>> test check in a thread safe way. I believe this to be the source of the
>>>>>> flakeyness Do you have any suggestions Eugene (since you authored this
>>>>>> test)?
>>>>>>
>>>>>> I added some details to this JIRA issue explaining in full
>>>>>> https://jira.apache.org/jira/browse/BEAM-6491?filter=-2
>>>>>>
>>>>>>
>>>>>> On Tue, Jan 22, 2019 at 3:34 PM Alex Amato 
>>>>>> wrote:
>>>>>>
>>>>>>> I've seen this fail in a few different PRs for different
>>>>>>> contributors, and its causing some issues during the presubmit process..
>>>>>>> This is a multithreadred test with a lot of sleeps, so it looks a bit
>>>>>>> suspicious as the source of the problem.
>>>>>>>
>>>>>>> https://builds.apache.org/job/beam_PreCommit_Java_Commit/3688/testReport/org.apache.beam.sdk.io/FileIOTest/testMatchWatchForNewFiles/
>>>>>>>
>>>>>>> I filed a JIRA for this issue:
>>>>>>> https://jira.apache.org/jira/browse/BEAM-6491?filter=-2
>>>>>>>
>>>>>>>
>>>>>>>


Re: Confusing sentence in Windowing section in Beam programming guide

2019-01-17 Thread Jeff Klukas
Reuven - I don't think I realized it was possible to have late data with
the global window, so I'm definitely learning things through this
discussion.

New suggested wording, then:

Elements that arrive with a smaller timestamp than the current
watermark are considered late data.

That says basically the same thing as the wording currently in the guide,
but uses "smaller" (which implies a less-than-watermark comparison) rather
than "later" (which folks have interpreted as a greater-than-watermark
comparison).

On Thu, Jan 17, 2019 at 3:40 PM Reuven Lax  wrote:

> Though it's not tied to window. You could be in the global window, so the
> watermark never advances past the end of the window, yet still get late
> data.
>
> On Thu, Jan 17, 2019, 11:14 AM Jeff Klukas 
>> How about: "Once the watermark progresses past the end of a window, any
>> further elements that arrive with a timestamp in that window are considered
>> late data."
>>
>> On Thu, Jan 17, 2019 at 1:43 PM Rui Wang  wrote:
>>
>>> Hi Community,
>>>
>>> In Beam programming guide [1], there is a sentence: "Data that arrives
>>> with a timestamp after the watermark is considered *late data*"
>>>
>>> Seems like people get confused by it. For example, see Stackoverflow
>>> comment [2]. Basically it makes people think that a event timestamp that is
>>> bigger than watermark is considered late (due to that "after").
>>>
>>> Although there is a example right after this sentence to explain late
>>> data, seems to me that this sentence is incomplete. The complete sentence
>>> to me can be: "The watermark consistently advances from -inf to +inf. Data
>>> that arrives with a timestamp after the watermark is considered late data."
>>>
>>> Am I understand correctly? Is there better description for the order of
>>> late data and watermark? I would happy to send PR to update Beam
>>> documentation.
>>>
>>> -Rui
>>>
>>> [1]: https://beam.apache.org/documentation/programming-guide/#windowing
>>> [2]:
>>> https://stackoverflow.com/questions/54141352/dataflow-to-process-late-and-out-of-order-data-for-batch-and-stream-messages/54188971?noredirect=1#comment95302476_54188971
>>>
>>>
>>>


Re: Confusing sentence in Windowing section in Beam programming guide

2019-01-17 Thread Jeff Klukas
How about: "Once the watermark progresses past the end of a window, any
further elements that arrive with a timestamp in that window are considered
late data."

On Thu, Jan 17, 2019 at 1:43 PM Rui Wang  wrote:

> Hi Community,
>
> In Beam programming guide [1], there is a sentence: "Data that arrives
> with a timestamp after the watermark is considered *late data*"
>
> Seems like people get confused by it. For example, see Stackoverflow
> comment [2]. Basically it makes people think that a event timestamp that is
> bigger than watermark is considered late (due to that "after").
>
> Although there is a example right after this sentence to explain late
> data, seems to me that this sentence is incomplete. The complete sentence
> to me can be: "The watermark consistently advances from -inf to +inf. Data
> that arrives with a timestamp after the watermark is considered late data."
>
> Am I understand correctly? Is there better description for the order of
> late data and watermark? I would happy to send PR to update Beam
> documentation.
>
> -Rui
>
> [1]: https://beam.apache.org/documentation/programming-guide/#windowing
> [2]:
> https://stackoverflow.com/questions/54141352/dataflow-to-process-late-and-out-of-order-data-for-batch-and-stream-messages/54188971?noredirect=1#comment95302476_54188971
>
>
>


Re: Possible to use GlobalWindows for writing unbounded input to files?

2019-01-11 Thread Jeff Klukas
It is indeed well documented that numShards is required for unbounded
input. And I do believe that a helpful error is thrown in the case of
unbounded input and runner-determined sharding.

I do believe there's still a bug here; it's just wandered quite a bit from
the original title of the thread. The title should now be "Exception when
using custom triggering and runner-determined file sharding".

I was seeing the IllegalStateException in a unit test when I tried to
compile my pipeline with the custom triggering. That unit test exercised
*bounded* file input and numShards=0.

In bounded mode, it would still be useful to be able to limit file sizes
via GlobalWindows with triggering on AfterPane.elementCountAtLeast. But
elementCountAtLeast will emit a continuation trigger that trips the Flatten
problem for runner-determined sharding.


On Fri, Jan 11, 2019 at 12:32 PM Reuven Lax  wrote:

> Ah,
>
> numShards = 0 is explicitly not supported in unbounded mode today, for the
> reason mentioned above. If FileIO doesn't reject the pipeline in that case,
> we should fix that.
>
> Reuven
>
> On Fri, Jan 11, 2019 at 9:23 AM Jeff Klukas  wrote:
>
>> Indeed, I was wrong about the ValueProvider distinction. I updated that
>> in the JIRA.
>>
>> It's when numShards is 0 (so runner-provided sharding) vs. an explicit
>> number. Things work fine for explicit sharding. It's the runner-provided
>> sharding mode that encounters the Flatten of PCollections with conflicting
>> triggers.
>>
>> On Fri, Jan 11, 2019 at 12:18 PM Reuven Lax  wrote:
>>
>>> FileIO requires an explicit numShards in unbounded mode for a number of
>>> reasons - one being that a trigger has to happen on a GroupByKey, and we
>>> need something to group on.
>>>
>>> It is extremely surprising that behavior would change between using a
>>> ValueProvider or not. The exact same codepath should be triggered
>>> regardless of whether a ValueProvider is used.
>>>
>>> Reuven
>>>
>>> On Wed, Jan 9, 2019 at 11:00 PM Kenneth Knowles  wrote:
>>>
>>>> Definitely sounds like a bug but also I want to caution you (or anyone
>>>> reading this archived) that there are known problems with continuation
>>>> triggers. A spec on continuation triggers that we missed was that they
>>>> really must be "compatible" (this is an arbitrary concept, having only to
>>>> do with Flattening two PCollections together) with their original trigger.
>>>> Without this, we also know that you can have three PCollections with
>>>> identical triggering and you can CoGroupByKey them together but you cannot
>>>> do this three-way join as a sequence of binary joins.
>>>>
>>>> Kenn
>>>>
>>>> On Wed, Jan 9, 2019 at 10:44 AM Jeff Klukas 
>>>> wrote:
>>>>
>>>>> Thanks for the response, Chamikara. I filed
>>>>> https://jira.apache.org/jira/browse/BEAM-6399 and I expect I can work
>>>>> around the problem in my case by not using a ValueProvider for numShards.
>>>>>
>>>>> On Wed, Jan 9, 2019 at 1:22 PM Chamikara Jayalath <
>>>>> chamik...@google.com> wrote:
>>>>>
>>>>>> I'm not to familiar about the exact underlying issue here but writing
>>>>>> unbounded input to files when using GlobalWindows for unsharded output 
>>>>>> is a
>>>>>> valid usecase so sounds like a bug. Feel free to create a JIRA.
>>>>>>
>>>>>> - Cham
>>>>>>
>>>>>> On Wed, Jan 9, 2019 at 10:00 AM Jeff Klukas 
>>>>>> wrote:
>>>>>>
>>>>>>> I've read more deeply into the WriteFiles code and I'm understanding
>>>>>>> now that the exception is due to WriteFiles' attempt to handle unsharded
>>>>>>> input. In that case, it creates a sharded and unsharded collection; the
>>>>>>> first goes through one GroupByKey while the other goes through 2. These 
>>>>>>> two
>>>>>>> collections are then flattened together and they have incompatible 
>>>>>>> triggers
>>>>>>> due to the double-grouped collection using a continuation trigger.
>>>>>>>
>>>>>>> I was calling FileIO.withNumShards(ValueProvider), but if I
>>>>>>> switch to hard coding an integer rather than passing a ValueProvider,
>>>>>>> WriteFiles uses a different code

Re: Possible to use GlobalWindows for writing unbounded input to files?

2019-01-11 Thread Jeff Klukas
Indeed, I was wrong about the ValueProvider distinction. I updated that in
the JIRA.

It's when numShards is 0 (so runner-provided sharding) vs. an explicit
number. Things work fine for explicit sharding. It's the runner-provided
sharding mode that encounters the Flatten of PCollections with conflicting
triggers.

On Fri, Jan 11, 2019 at 12:18 PM Reuven Lax  wrote:

> FileIO requires an explicit numShards in unbounded mode for a number of
> reasons - one being that a trigger has to happen on a GroupByKey, and we
> need something to group on.
>
> It is extremely surprising that behavior would change between using a
> ValueProvider or not. The exact same codepath should be triggered
> regardless of whether a ValueProvider is used.
>
> Reuven
>
> On Wed, Jan 9, 2019 at 11:00 PM Kenneth Knowles  wrote:
>
>> Definitely sounds like a bug but also I want to caution you (or anyone
>> reading this archived) that there are known problems with continuation
>> triggers. A spec on continuation triggers that we missed was that they
>> really must be "compatible" (this is an arbitrary concept, having only to
>> do with Flattening two PCollections together) with their original trigger.
>> Without this, we also know that you can have three PCollections with
>> identical triggering and you can CoGroupByKey them together but you cannot
>> do this three-way join as a sequence of binary joins.
>>
>> Kenn
>>
>> On Wed, Jan 9, 2019 at 10:44 AM Jeff Klukas  wrote:
>>
>>> Thanks for the response, Chamikara. I filed
>>> https://jira.apache.org/jira/browse/BEAM-6399 and I expect I can work
>>> around the problem in my case by not using a ValueProvider for numShards.
>>>
>>> On Wed, Jan 9, 2019 at 1:22 PM Chamikara Jayalath 
>>> wrote:
>>>
>>>> I'm not to familiar about the exact underlying issue here but writing
>>>> unbounded input to files when using GlobalWindows for unsharded output is a
>>>> valid usecase so sounds like a bug. Feel free to create a JIRA.
>>>>
>>>> - Cham
>>>>
>>>> On Wed, Jan 9, 2019 at 10:00 AM Jeff Klukas 
>>>> wrote:
>>>>
>>>>> I've read more deeply into the WriteFiles code and I'm understanding
>>>>> now that the exception is due to WriteFiles' attempt to handle unsharded
>>>>> input. In that case, it creates a sharded and unsharded collection; the
>>>>> first goes through one GroupByKey while the other goes through 2. These 
>>>>> two
>>>>> collections are then flattened together and they have incompatible 
>>>>> triggers
>>>>> due to the double-grouped collection using a continuation trigger.
>>>>>
>>>>> I was calling FileIO.withNumShards(ValueProvider), but if I
>>>>> switch to hard coding an integer rather than passing a ValueProvider,
>>>>> WriteFiles uses a different code path that doesn't flatten collections and
>>>>> no exception is thrown.
>>>>>
>>>>> So, this might really be considered a bug of WriteFiles (and thus
>>>>> FileIO). But I'd love to hear other interpretations.
>>>>>
>>>>> On Wed, Jan 9, 2019 at 11:25 AM Jeff Klukas 
>>>>> wrote:
>>>>>
>>>>>> I'm building a pipeline that streams from Pubsub and writes to files.
>>>>>> I'm using FileIO's dynamic destinations to place elements into different
>>>>>> directories according to date and I really don't care about ordering of
>>>>>> elements beyond the date buckets.
>>>>>>
>>>>>> So, I think GlobalWindows is appropriate in this case, even though
>>>>>> the input is unbounded. Is it possible to use GlobalWindows but set a
>>>>>> trigger based on number of elements and/or processing time so that beam
>>>>>> actually writes out files periodically?
>>>>>>
>>>>>> I tried the following:
>>>>>>
>>>>>> Window.into(new GlobalWindows())
>>>>>>   .triggering(Repeatedly.forever(AfterFirst.of(
>>>>>> AfterPane.elementCountAtLeast(1),
>>>>>>
>>>>>> AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardMinutes(10)
>>>>>>   .discardingFiredPanes()
>>>>>>
>>>>>> But it raises an exception about incompatible triggers:
>>>>>>
>>>>>> Inputs to Flatten had incompatible triggers:
>>>>>> Repeatedly.forever(AfterFirst.of(AfterPane.elementCountAtLeast(1),
>>>>>> AfterProcessingTime.pastFirstElementInPane().plusDelayOf(1 minute))),
>>>>>> Repeatedly.forever(AfterFirst.of(AfterPane.elementCountAtLeast(1),
>>>>>> AfterSynchronizedProcessingTime.pastFirstElementInPane()))
>>>>>>
>>>>>> I believe that what's happening is that FileIO with explicit
>>>>>> numShards (required in the case of unbounded input) is forcing a
>>>>>> GroupByKey, which activates continuation triggers that are incompatible
>>>>>> with my stated triggers. It's internals of WriteFiles that's trying to
>>>>>> flatten the incompatible PCollections together.
>>>>>>
>>>>>>
>>>>>>


Re: Query expressions for schema fields

2019-01-07 Thread Jeff Klukas
There is also JMESPath (http://jmespath.org/) which is quite similar to
JsonPath, but does have a spec and lacks the leading $ character. The AWS
CLI uses JMESPath for defining queries.



On Mon, Jan 7, 2019 at 1:05 PM Reuven Lax  wrote:

>
>
> On Mon, Jan 7, 2019 at 1:44 AM Robert Bradshaw 
> wrote:
>
>> On Sun, Jan 6, 2019 at 12:46 PM Reuven Lax  wrote:
>> >
>> > Some time ago, @Jean-Baptiste Onofré made the excellent suggestion that
>> we look into using JsonPath as a selector format for schema fields. This
>> provides a simple and natural way for users to select nested schema fields,
>> as well as wildcards. This would allow users to more simply select nested
>> fields using the Select transform, e.g.:
>> >
>> > p.apply(Select.fields("event.userid", "event.location.*");
>> >
>> > It would also fit into NewDoFn (Java) like this:
>> >
>> > @ProcessElement
>> > public void process(@Field("userid") String userId,
>> > @Field("action.location.*") Location location) {
>> > }
>> >
>> > After some investigation, I believe that we're better off with
>> something very close to a subset of JsonPath, but not precisely JsonPath.
>>
>> I am very wary of creating something that's very close to, but not
>> quite, a (subset of) a well established standard. Is there
>> disadvantage to not being a strict actual subset? If we go this route,
>> we should at least ensure that any divergence is illegal JsonPath
>> rather than having different semantic meaning.
>>
>
> As far as I can tell, JsonPath isn't much of a "standard." There doesn't
> seem to be much of a spec other than implementation.
>
> For the most part, I am speaking of a strict subset of JsonPath. The only
> incompatibility is that JsonPath expressions all start with a '$' (which
> represents the root node). So in the above expression you would write
> "$.action.location.*" instead. I think staying closer to BeamSql syntax
> makes more sense here, and I would like to dispense with the need to begin
> with a $ character. JsonPath also assumes that each object is also a
> JavaScript object (which makes no sense here), and some of the JsonPath
> features are based on that.
>
>
>> > JsonPath has many features that are Javascript specific (e.g. the
>> ability to embed Javascript expressions), JsonPath also includes the
>> ability to do complex filtering and aggregation, which I don't think we
>> want here; Beam already provides the ability to do such filtering and
>> aggregation, and it's not needed here. One example of a change: JsonPath
>> queries always begin with $ (representing the root node), and I think we're
>> better off not requiring that so that these queries look more like BeamSql
>> queries.
>> >
>> > I've created a small ANTLR grammar (which has the advantage that it's
>> easy to extend) for these expressions and have everything working in a
>> branch. However there are a few more features of JsonPath that might be
>> useful here, and I wanted community feedback to see whether it's worth
>> implementing them.
>> >
>> > The first are array/map slices and selectors. Currently if a schema
>> contains an array (or map) field, you can only select all elements of the
>> array or map. JsonPath however supports selecting and slicing the array.
>> For example, consider the following:
>> >
>> > @DefaultSchema(JavaFieldSchema.class)
>> > public class Event {
>> >   public final String userId;
>> >   public final List actions;
>> > }
>> >
>> > Currently you can apply Select.fields("actions.location"), and that
>> will return a schema containing a list of Locations, one for every action
>> in the original event. If we allowed slicing,  you could instead write
>> Select.fields("actions[0:9].locations"), which would do the same but only
>> for the first 10 elements of the array.
>> >
>> > Is this useful in Beam? It would not be hard to implement, but I want
>> to see what folks think first.
>> >
>> > The second feature is recursive field selection. The example often
>> given in JsonPath is a Json document containing the inventory for a store.
>> There are lists of subobjects representing books, bicycles, tables, chairs,
>> etc. etc. The JsonPath query "$..price" recursively finds every object that
>> has a field named price, and returns those prices; in this case it returns
>> the price of every element in the store.
>> >
>> > I'm a bit less convinced that recursive field selection is useful in
>> Beam. The usual example for Json involves a document that represents an
>> entire corpus, e.g. a store inventory. In Beam, the schemas are applied to
>> individual records, and I don't know how often there will be a use for this
>> sort of recursive selection. However I could be wrong here, so if anyone
>> has a good use case for this sort of selection, please let me know.
>>
>> Records often contain lists, e.g. the record could be an order, and it
>> could be useful to select on the price of the items (just to throw it
>> out there).
>>
>
> BTW, that 

Re: Why does Beam not use the google-api-client libraries?

2019-01-02 Thread Jeff Klukas
My apologies. I got the terminology entirely wrong.

As you say, PubsubIO and other Beam components _do_ use the official Google
API client library (google-api-client). They do not, however, use the
higher-level Google Cloud libraries such as google-cloud-pubsub which
provide abstractions on top of the API client library.

I am wondering whether there are technical reasons not to use the
higher-level service-specific libraries, or whether this is simply
historical.

On Wed, Jan 2, 2019 at 12:38 PM Anton Kedin  wrote:

> I don't have enough context to answer all of the questions, but looking at
> PubsubIO it seems to use the official libraries, e.g. see Pubsub doc [1]
> vs Pubsub IO GRPC client [2]. Correct me if I misunderstood your question.
>
> [1]
> https://cloud.google.com/pubsub/docs/publisher#pubsub-publish-message-java
> [2]
> https://github.com/apache/beam/blob/2e759fecf63d62d110f29265f9438128e3bdc8ab/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubGrpcClient.java#L189
>
> Pubsub IO JSON client seems to use a slightly different approach but still
> relies on somewhat official path, e.g. Pubsub doc [3] (javadoc[4]) vs
> Pubsub IO JSON client [5].
>
> [3] https://developers.google.com/api-client-library/java/apis/pubsub/v1
> [4]
> https://developers.google.com/resources/api-libraries/documentation/pubsub/v1/java/latest/com/google/api/services/pubsub/Pubsub.html
> [5]
> https://github.com/apache/beam/blob/2e759fecf63d62d110f29265f9438128e3bdc8ab/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubJsonClient.java#L130
>
> The latter seems to be the older library, so I would assume it's for
> legacy reasons.
>
> Regards,
> Anton
>
>
> On Wed, Jan 2, 2019 at 9:03 AM Jeff Klukas  wrote:
>
>> I'm building a high-volume Beam pipeline using PubsubIO and running into
>> some concerns over performance and delivery semantics, prompting me to want
>> to better understand the implementation. Reading through the library,
>> PubsubIO appears to be a completely separate implementation of Pubsub
>> client behavior from Google's own Java client. As a developer trying to
>> read and understand the implementation, this is a significant hurdle, since
>> any previous knowledge of the Google library is not applicable and is
>> potentially at odds with what's in PubsubIO.
>>
>> Why doesn't beam use the Google clients for PubsubIO, BigQueryIO, etc.?
>> Is it for historical reasons? Is there difficulty in packaging and
>> integration of the Google clients? Or are the needs for Beam just
>> substantially different from what the Google libraries provide?
>>
>


Why does Beam not use the google-api-client libraries?

2019-01-02 Thread Jeff Klukas
I'm building a high-volume Beam pipeline using PubsubIO and running into
some concerns over performance and delivery semantics, prompting me to want
to better understand the implementation. Reading through the library,
PubsubIO appears to be a completely separate implementation of Pubsub
client behavior from Google's own Java client. As a developer trying to
read and understand the implementation, this is a significant hurdle, since
any previous knowledge of the Google library is not applicable and is
potentially at odds with what's in PubsubIO.

Why doesn't beam use the Google clients for PubsubIO, BigQueryIO, etc.? Is
it for historical reasons? Is there difficulty in packaging and integration
of the Google clients? Or are the needs for Beam just substantially
different from what the Google libraries provide?


Re: Evolving a Coder for an added field

2018-12-27 Thread Jeff Klukas
Picking this back up, I've modified the PR [0] to add a MetadataCoderV2
that encodes/decodes the new lastModifiedMillis, and MetadataCoder now
provides a default -1 value for lastModifiedMillis when decoding to
maintain compatibility. The intent is to follow the same pattern as the
existing TableDestinationCoderV2. MetadataCoder remains the default
registered coder for Metadata, so MetadataCoderV2 is strictly opt-in at
this point.

I'm hoping to get review on the above so the change can be available in the
short term, while work continues to understand the way forward for coder
versioning. I added a link to this thread to BEAM-3616 [1] which already
captures the need for coder versioning.

https://github.com/apache/beam/pull/6914
https://issues.apache.org/jira/browse/BEAM-3616

On Mon, Nov 26, 2018 at 11:47 AM Lukasz Cwik  wrote:

> Reuven was one of the people I reached out to on this matter and he
> replied on this thread.
>
> On Mon, Nov 26, 2018 at 7:07 AM Robert Bradshaw 
> wrote:
>
>> Modifying an existing coder is a non-starter until we have a versioning
>> story. Creating an entirely new coder should definitely be possible, and
>> using it either opt-in or, if a good enough case can be made, possibly even
>> opt-out could get this unblocked.
>>
>> On Mon, Nov 26, 2018 at 3:05 PM Jeff Klukas  wrote:
>>
>>> Lukasz - Were you able to get any more context on the possibility of
>>> versioning coders from other folks at Google?
>>>
>>> It sounds like adding versioning for coders and/or schemas is
>>> potentially a large change. At this point, should I just write up some
>>> highlights from this thread in a JIRA issue for future tracking?
>>>
>>> On Mon, Nov 12, 2018 at 8:23 PM Reuven Lax  wrote:
>>>
>>>> A few thoughts:
>>>>
>>>> 1. I agree with you about coder versioning. The lack of a good story
>>>> around versioning has been a huge pain here, and it's unfortunate that
>>>> nobody ever worked on this.
>>>>
>>>> 2. I think versioning schemas will be easier than versioning coders
>>>> (especially for adding new fields). In many cases I suggest we start
>>>> looking at migrating as much as possible to schemas, and in Beam 3.0 maybe
>>>> we can migrate all of our internal payload to schemas. Schemas support
>>>> nested fields, repeated fields, and map fields - which can model most 
>>>> thing.
>>>>
>>>> 3. There was a Beam proposal for a way to generically handle
>>>> incompatible schema updates via snapshots. The idea was that such updates
>>>> can be accompanied by a transform that maps a pipeline snapshot into a new
>>>> snapshot with the encodings modified.
>>>>
>>>> Reuven
>>>>
>>>> On Tue, Nov 13, 2018 at 3:16 AM Jeff Klukas 
>>>> wrote:
>>>>
>>>>> Conversation here has fizzled, but sounds like there's basically a
>>>>> consensus here on a need for a new concept of Coder versioning that's
>>>>> accessible at the Java level in order to allow an evolution path. Further,
>>>>> it sounds like my open PR [0] for adding a new field to Metadata is
>>>>> essentially blocked until we have coder versioning in place.
>>>>>
>>>>> Is there any existing documentation of these concepts, or should I go
>>>>> ahead and file a new Jira issue summarizing the problem? I don't think I
>>>>> have a comprehensive enough understanding of the Coder machinery to be 
>>>>> able
>>>>> to design a solution, so I'd need to hand this off or simply leave it in
>>>>> the Jira backlog.
>>>>>
>>>>> [0] https://github.com/apache/beam/pull/6914
>>>>>
>>>>>
>>>>> On Tue, Nov 6, 2018 at 4:38 AM Robert Bradshaw 
>>>>> wrote:
>>>>>
>>>>>> Yes, a Coder author should be able to register a URN with a mapping
>>>>>> from (components + payload) -> Coder (and vice versa), and this should
>>>>>> be more lightweight than manually editing the proto files.
>>>>>> On Mon, Nov 5, 2018 at 7:12 PM Thomas Weise  wrote:
>>>>>> >
>>>>>> > +1
>>>>>> >
>>>>>> > I think that coders should be immutable/versioned. The SDK should
>>>>>> know about all the available versions and be able to associate the data
>>>>>> (stream or at rest) with the corresponding coder version via URN

Re: help with right transform to read tgz file

2018-12-26 Thread Jeff Klukas
The general approach in your example looks reasonable to me. I don't think
there's anything built in to Beam to help with parsing the tar file format
and I don't know how robust the method of replacing "^@" and then splitting
on newlines will be. I'd likely use Apache's commons-compress library for
walking through the bytes of the tar file, pulling out the file names and
associated contents.

You should be able to put all of that logic into a single FlatMapElements
invocation as in your example. I'd suggest returning KV
where the key is the file name (or perhaps just the prefix of the file name
if that's what you want to combine on) and the value is the line of content.

It looks like Beam does include some capabilities for parsing CSV lines,
but I have no experience using them. It looks like they're building upon
Apache commons-csv, so you might consider using that library directly if
you need to parse fields out of the CSV and do transformations.

On Fri, Dec 21, 2018 at 6:24 PM Sridevi Nookala <
snook...@parallelwireless.com> wrote:

> Hi,
>
> I am newbie to apache beam
> I am trying to write a simple pipeline using apache beam java sdk.
> the pipleline will read a bunch of tgz files.
> each tgz files have multiple CSV files with data
>
> public static final void main(String args[]) throws Exception {
>
> PipelineOptions options = PipelineOptionsFactory.create();
> Pipeline pipeline = Pipeline.create(options);
>
>
> PCollection matches =
> pipeline.apply(FileIO.match().filepattern("/tmp/beam5/*.tgz"));
>
> PCollection compGz =
> matches.apply(FileIO.readMatches().withCompression(Compression.GZIP));
> PCollection contents = compGz.apply(FlatMapElements
> // uses imports from TypeDescriptors
> .into(TypeDescriptors.strings())
> .via((ReadableFile f) -> {
> try {
> return
> Arrays.asList(f.readFullyAsUTF8String().replaceAll("^@","").split("\\r?\\n|\\r",
> -1));
> } catch (IOException e) {
> // TODO Auto-generated catch block
> e.printStackTrace();
> }
> return null;
> }) );
> PDone ret =
> contents.apply(TextIO.write().to("/tmp/beam6/output.txt").withoutSharding());
>
> }
>
> instead of returning flat list of strings, i tried parsing the
> f.readFullyAsUTF8String() and make CSVFileBean, but it does not seem to
> like
>
> basically the above program is crude
>
> i am looking for suggestions on right transform to transform this tgz into
> individual CSV bean POJO's that have name of CSV and contents
>
> i am stuck decoding the tgz from readFullyAsUTF8String()
>
> eventually i need to take each CSV bean and combine them
>
> Eg.  test1.tgz has  foo_time1.csv, bar_time1.csv and
>test2.tgz has  foo_time2.csv, bar_time2.csv
>
> so i need to extract these CSV's and combine all the foo's and bar's
>
> and possibly manipulate foo's, bar's by adding columns and transforming
> and then sending out to destinations which can be filesystem or kafka
>
> thanks
> Any help is appreciated
> Sri
>
>
>
> --
>
>


Re: Can we allow SimpleFunction and SerializableFunction to throw Exception?

2018-12-17 Thread Jeff Klukas
Thanks to Kenn Knowles for picking up the review here. The PR is merged, so
the new interface ProcessFunction and abstract class InferableFunction
class (both of which declare `throws Exception`) are now available in
master.

On Fri, Dec 14, 2018 at 12:18 PM Jeff Klukas  wrote:

> Checking in on this thread. Anybody interested to review
> https://github.com/apache/beam/pull/7160 ?
>
> There could be some discussion on whether these names are the right names,
> but otherwise the only potential objection I see here is the additional
> burden on developers to understand the differences between the existing
> (SerializableFunction and SimpleFunction) and the new (ProcessFunction and
> InferableFunction). I originally planned on marking the existing ones as
> deprecated, but decided there are contexts where disallowing checked
> exceptions probably makes sense. So we now have 4 objects for developers to
> be familiar with rather than 2.
>
> On Fri, Dec 7, 2018 at 6:54 AM Robert Bradshaw 
> wrote:
>
>> How should we move forward on this? The idea looks good, and even
>> comes with a PR to review. Any objections to the names?
>> On Wed, Dec 5, 2018 at 12:48 PM Jeff Klukas  wrote:
>> >
>> > Reminder that I'm looking for review on
>> https://github.com/apache/beam/pull/7160
>> >
>> > On Thu, Nov 29, 2018, 11:48 AM Jeff Klukas > >>
>> >> I created a JIRA and a PR for this:
>> >>
>> >> https://issues.apache.org/jira/browse/BEAM-6150
>> >> https://github.com/apache/beam/pull/7160
>> >>
>> >> On naming, I'm proposing that SerializableFunction extend
>> ProcessFunction (since this new superinterface is particularly appropriate
>> for user code executed inside a ProcessElement method) and that
>> SimpleFunction extend InferableFunction (since type information and coder
>> inference are what distinguish this from ProcessFunction).
>> >>
>> >> We originally discussed deprecating SerializableFunction and
>> SimpleFunction in favor of the new types, but there appear to be two fairly
>> separate use cases for SerializableFunction. It's either defining user code
>> that will be executed in a DoFn, in which case I think we always want to
>> prefer the new interface that allows declared exceptions. But it's also
>> used where the code is to be executed as part of pipeline construction, in
>> which case it may be reasonable to want to restrict checked exceptions. In
>> any case, deprecating SerializableFunction and SimpleFunction can be
>> discussed further in the future.
>> >>
>> >>
>> >> On Wed, Nov 28, 2018 at 9:53 PM Kenneth Knowles 
>> wrote:
>> >>>
>> >>> Nice! A clean solution and an opportunity to bikeshed on names. This
>> has everything I love.
>> >>>
>> >>> Kenn
>> >>>
>> >>> On Wed, Nov 28, 2018 at 6:43 PM Jeff Klukas 
>> wrote:
>> >>>>
>> >>>> It looks like we can add make the new interface a superinterface for
>> the existing SerializableFunction while maintaining binary compatibility
>> [0].
>> >>>>
>> >>>> We'd have:
>> >>>>
>> >>>> public interface NewSerializableFunction extends
>> Serializable {
>> >>>>   OutputT apply(InputT input) throws Exception;
>> >>>> }
>> >>>>
>> >>>> and then modify SerializableFunction to inherit from it:
>> >>>>
>> >>>> public interface SerializableFunction extends
>> NewSerializableFunction, Serializable {
>> >>>>   @Override
>> >>>>   OutputT apply(InputT input);
>> >>>> }
>> >>>>
>> >>>>
>> >>>> IIUC, we can then more or less replace all references to
>> SerializableFunction with NewSerializableFunction across the beam codebase
>> without having to introduce any new overrides. I'm working on a proof of
>> concept now.
>> >>>>
>> >>>> It's not clear what the actual names for NewSerializableFunction and
>> NewSimpleFunction should be.
>> >>>>
>> >>>> [0]
>> https://docs.oracle.com/javase/specs/jls/se8/html/jls-13.html#jls-13.4.4
>> >>>>
>> >>>>
>> >>>> On Mon, Nov 26, 2018 at 9:54 PM Thomas Weise  wrote:
>> >>>>>
>> >>>>> +1 for introducing the new interface now and deprecating the old
>> one. The major versio

Re: Can we allow SimpleFunction and SerializableFunction to throw Exception?

2018-12-14 Thread Jeff Klukas
Checking in on this thread. Anybody interested to review
https://github.com/apache/beam/pull/7160 ?

There could be some discussion on whether these names are the right names,
but otherwise the only potential objection I see here is the additional
burden on developers to understand the differences between the existing
(SerializableFunction and SimpleFunction) and the new (ProcessFunction and
InferableFunction). I originally planned on marking the existing ones as
deprecated, but decided there are contexts where disallowing checked
exceptions probably makes sense. So we now have 4 objects for developers to
be familiar with rather than 2.

On Fri, Dec 7, 2018 at 6:54 AM Robert Bradshaw  wrote:

> How should we move forward on this? The idea looks good, and even
> comes with a PR to review. Any objections to the names?
> On Wed, Dec 5, 2018 at 12:48 PM Jeff Klukas  wrote:
> >
> > Reminder that I'm looking for review on
> https://github.com/apache/beam/pull/7160
> >
> > On Thu, Nov 29, 2018, 11:48 AM Jeff Klukas  >>
> >> I created a JIRA and a PR for this:
> >>
> >> https://issues.apache.org/jira/browse/BEAM-6150
> >> https://github.com/apache/beam/pull/7160
> >>
> >> On naming, I'm proposing that SerializableFunction extend
> ProcessFunction (since this new superinterface is particularly appropriate
> for user code executed inside a ProcessElement method) and that
> SimpleFunction extend InferableFunction (since type information and coder
> inference are what distinguish this from ProcessFunction).
> >>
> >> We originally discussed deprecating SerializableFunction and
> SimpleFunction in favor of the new types, but there appear to be two fairly
> separate use cases for SerializableFunction. It's either defining user code
> that will be executed in a DoFn, in which case I think we always want to
> prefer the new interface that allows declared exceptions. But it's also
> used where the code is to be executed as part of pipeline construction, in
> which case it may be reasonable to want to restrict checked exceptions. In
> any case, deprecating SerializableFunction and SimpleFunction can be
> discussed further in the future.
> >>
> >>
> >> On Wed, Nov 28, 2018 at 9:53 PM Kenneth Knowles 
> wrote:
> >>>
> >>> Nice! A clean solution and an opportunity to bikeshed on names. This
> has everything I love.
> >>>
> >>> Kenn
> >>>
> >>> On Wed, Nov 28, 2018 at 6:43 PM Jeff Klukas 
> wrote:
> >>>>
> >>>> It looks like we can add make the new interface a superinterface for
> the existing SerializableFunction while maintaining binary compatibility
> [0].
> >>>>
> >>>> We'd have:
> >>>>
> >>>> public interface NewSerializableFunction extends
> Serializable {
> >>>>   OutputT apply(InputT input) throws Exception;
> >>>> }
> >>>>
> >>>> and then modify SerializableFunction to inherit from it:
> >>>>
> >>>> public interface SerializableFunction extends
> NewSerializableFunction, Serializable {
> >>>>   @Override
> >>>>   OutputT apply(InputT input);
> >>>> }
> >>>>
> >>>>
> >>>> IIUC, we can then more or less replace all references to
> SerializableFunction with NewSerializableFunction across the beam codebase
> without having to introduce any new overrides. I'm working on a proof of
> concept now.
> >>>>
> >>>> It's not clear what the actual names for NewSerializableFunction and
> NewSimpleFunction should be.
> >>>>
> >>>> [0]
> https://docs.oracle.com/javase/specs/jls/se8/html/jls-13.html#jls-13.4.4
> >>>>
> >>>>
> >>>> On Mon, Nov 26, 2018 at 9:54 PM Thomas Weise  wrote:
> >>>>>
> >>>>> +1 for introducing the new interface now and deprecating the old
> one. The major version change then provides the opportunity to remove
> deprecated code.
> >>>>>
> >>>>>
> >>>>> On Mon, Nov 26, 2018 at 10:09 AM Lukasz Cwik 
> wrote:
> >>>>>>
> >>>>>> Before 3.0 we will still want to introduce this giving time for
> people to migrate, would it make sense to do that now and deprecate the
> alternatives that it replaces?
> >>>>>>
> >>>>>> On Mon, Nov 26, 2018 at 5:59 AM Jeff Klukas 
> wrote:
> >>>>>>>
> >>>>>>> Picking up this thread again. Based on

Re: Can we allow SimpleFunction and SerializableFunction to throw Exception?

2018-12-05 Thread Jeff Klukas
Reminder that I'm looking for review on
https://github.com/apache/beam/pull/7160

On Thu, Nov 29, 2018, 11:48 AM Jeff Klukas  I created a JIRA and a PR for this:
>
> https://issues.apache.org/jira/browse/BEAM-6150
> https://github.com/apache/beam/pull/7160
>
> On naming, I'm proposing that SerializableFunction extend ProcessFunction
> (since this new superinterface is particularly appropriate for user code
> executed inside a ProcessElement method) and that SimpleFunction extend
> InferableFunction (since type information and coder inference are what
> distinguish this from ProcessFunction).
>
> We originally discussed deprecating SerializableFunction and
> SimpleFunction in favor of the new types, but there appear to be two fairly
> separate use cases for SerializableFunction. It's either defining user code
> that will be executed in a DoFn, in which case I think we always want to
> prefer the new interface that allows declared exceptions. But it's also
> used where the code is to be executed as part of pipeline construction, in
> which case it may be reasonable to want to restrict checked exceptions. In
> any case, deprecating SerializableFunction and SimpleFunction can be
> discussed further in the future.
>
> On Wed, Nov 28, 2018 at 9:53 PM Kenneth Knowles  wrote:
>
>> Nice! A clean solution and an opportunity to bikeshed on names. This has
>> everything I love.
>>
>> Kenn
>>
>> On Wed, Nov 28, 2018 at 6:43 PM Jeff Klukas  wrote:
>>
>>> It looks like we can add make the new interface a superinterface for the
>>> existing SerializableFunction while maintaining binary compatibility [0].
>>>
>>> We'd have:
>>>
>>> public interface NewSerializableFunction extends
>>> Serializable {
>>>   OutputT apply(InputT input) throws Exception;
>>> }
>>>
>>> and then modify SerializableFunction to inherit from it:
>>>
>>> public interface SerializableFunction extends
>>> NewSerializableFunction, Serializable {
>>>   @Override
>>>   OutputT apply(InputT input);
>>> }
>>>
>>>
>>> IIUC, we can then more or less replace all references to
>>> SerializableFunction with NewSerializableFunction across the beam codebase
>>> without having to introduce any new overrides. I'm working on a proof of
>>> concept now.
>>>
>>> It's not clear what the actual names for NewSerializableFunction and
>>> NewSimpleFunction should be.
>>>
>>> [0]
>>> https://docs.oracle.com/javase/specs/jls/se8/html/jls-13.html#jls-13.4.4
>>>
>>>
>>> On Mon, Nov 26, 2018 at 9:54 PM Thomas Weise  wrote:
>>>
>>>> +1 for introducing the new interface now and deprecating the old one.
>>>> The major version change then provides the opportunity to remove deprecated
>>>> code.
>>>>
>>>>
>>>> On Mon, Nov 26, 2018 at 10:09 AM Lukasz Cwik  wrote:
>>>>
>>>>> Before 3.0 we will still want to introduce this giving time for people
>>>>> to migrate, would it make sense to do that now and deprecate the
>>>>> alternatives that it replaces?
>>>>>
>>>>> On Mon, Nov 26, 2018 at 5:59 AM Jeff Klukas 
>>>>> wrote:
>>>>>
>>>>>> Picking up this thread again. Based on the feedback from Kenn,
>>>>>> Reuven, and Romain, it sounds like there's no objection to the idea of
>>>>>> SimpleFunction and SerializableFunction declaring that they throw
>>>>>> Exception. So the discussion at this point is about whether there's an
>>>>>> acceptable way to introduce that change.
>>>>>>
>>>>>> IIUC correctly, Kenn was suggesting that we need to ensure backwards
>>>>>> compatibility for existing user code both at runtime and recompile, which
>>>>>> means we can't simply add the declaration to the existing interfaces, 
>>>>>> since
>>>>>> that would cause errors at compile time for user code directly invoking
>>>>>> SerializableFunction instances.
>>>>>>
>>>>>> I don't see an obvious way that introducing a new functional
>>>>>> interface would help without littering the API with more variants (it's
>>>>>> already a bit confusing that i.e. MapElements has multiple via() methods 
>>>>>> to
>>>>>> support three different function interfaces).
>>>>>>
>>>>>> Perhaps

Re: Can we allow SimpleFunction and SerializableFunction to throw Exception?

2018-11-29 Thread Jeff Klukas
I created a JIRA and a PR for this:

https://issues.apache.org/jira/browse/BEAM-6150
https://github.com/apache/beam/pull/7160

On naming, I'm proposing that SerializableFunction extend ProcessFunction
(since this new superinterface is particularly appropriate for user code
executed inside a ProcessElement method) and that SimpleFunction extend
InferableFunction (since type information and coder inference are what
distinguish this from ProcessFunction).

We originally discussed deprecating SerializableFunction and SimpleFunction
in favor of the new types, but there appear to be two fairly separate use
cases for SerializableFunction. It's either defining user code that will be
executed in a DoFn, in which case I think we always want to prefer the new
interface that allows declared exceptions. But it's also used where the
code is to be executed as part of pipeline construction, in which case it
may be reasonable to want to restrict checked exceptions. In any case,
deprecating SerializableFunction and SimpleFunction can be discussed
further in the future.

On Wed, Nov 28, 2018 at 9:53 PM Kenneth Knowles  wrote:

> Nice! A clean solution and an opportunity to bikeshed on names. This has
> everything I love.
>
> Kenn
>
> On Wed, Nov 28, 2018 at 6:43 PM Jeff Klukas  wrote:
>
>> It looks like we can add make the new interface a superinterface for the
>> existing SerializableFunction while maintaining binary compatibility [0].
>>
>> We'd have:
>>
>> public interface NewSerializableFunction extends
>> Serializable {
>>   OutputT apply(InputT input) throws Exception;
>> }
>>
>> and then modify SerializableFunction to inherit from it:
>>
>> public interface SerializableFunction extends
>> NewSerializableFunction, Serializable {
>>   @Override
>>   OutputT apply(InputT input);
>> }
>>
>>
>> IIUC, we can then more or less replace all references to
>> SerializableFunction with NewSerializableFunction across the beam codebase
>> without having to introduce any new overrides. I'm working on a proof of
>> concept now.
>>
>> It's not clear what the actual names for NewSerializableFunction and
>> NewSimpleFunction should be.
>>
>> [0]
>> https://docs.oracle.com/javase/specs/jls/se8/html/jls-13.html#jls-13.4.4
>>
>>
>> On Mon, Nov 26, 2018 at 9:54 PM Thomas Weise  wrote:
>>
>>> +1 for introducing the new interface now and deprecating the old one.
>>> The major version change then provides the opportunity to remove deprecated
>>> code.
>>>
>>>
>>> On Mon, Nov 26, 2018 at 10:09 AM Lukasz Cwik  wrote:
>>>
>>>> Before 3.0 we will still want to introduce this giving time for people
>>>> to migrate, would it make sense to do that now and deprecate the
>>>> alternatives that it replaces?
>>>>
>>>> On Mon, Nov 26, 2018 at 5:59 AM Jeff Klukas 
>>>> wrote:
>>>>
>>>>> Picking up this thread again. Based on the feedback from Kenn, Reuven,
>>>>> and Romain, it sounds like there's no objection to the idea of
>>>>> SimpleFunction and SerializableFunction declaring that they throw
>>>>> Exception. So the discussion at this point is about whether there's an
>>>>> acceptable way to introduce that change.
>>>>>
>>>>> IIUC correctly, Kenn was suggesting that we need to ensure backwards
>>>>> compatibility for existing user code both at runtime and recompile, which
>>>>> means we can't simply add the declaration to the existing interfaces, 
>>>>> since
>>>>> that would cause errors at compile time for user code directly invoking
>>>>> SerializableFunction instances.
>>>>>
>>>>> I don't see an obvious way that introducing a new functional interface
>>>>> would help without littering the API with more variants (it's already a 
>>>>> bit
>>>>> confusing that i.e. MapElements has multiple via() methods to support 
>>>>> three
>>>>> different function interfaces).
>>>>>
>>>>> Perhaps this kind of cleanup is best left for Beam 3.0?
>>>>>
>>>>> On Mon, Oct 15, 2018 at 6:51 PM Reuven Lax  wrote:
>>>>>
>>>>>> Compilation compatibility is a big part of what Beam aims to provide
>>>>>> with its guarantees. Romain makes a good point that most users are not
>>>>>> invoking SeralizableFunctions themselves (they are usually invoked inside
>>>>>> of Beam classes such as MapE

Re: Can we allow SimpleFunction and SerializableFunction to throw Exception?

2018-11-28 Thread Jeff Klukas
It looks like we can add make the new interface a superinterface for the
existing SerializableFunction while maintaining binary compatibility [0].

We'd have:

public interface NewSerializableFunction extends
Serializable {
  OutputT apply(InputT input) throws Exception;
}

and then modify SerializableFunction to inherit from it:

public interface SerializableFunction extends
NewSerializableFunction, Serializable {
  @Override
  OutputT apply(InputT input);
}


IIUC, we can then more or less replace all references to
SerializableFunction with NewSerializableFunction across the beam codebase
without having to introduce any new overrides. I'm working on a proof of
concept now.

It's not clear what the actual names for NewSerializableFunction and
NewSimpleFunction should be.

[0] https://docs.oracle.com/javase/specs/jls/se8/html/jls-13.html#jls-13.4.4


On Mon, Nov 26, 2018 at 9:54 PM Thomas Weise  wrote:

> +1 for introducing the new interface now and deprecating the old one. The
> major version change then provides the opportunity to remove deprecated
> code.
>
>
> On Mon, Nov 26, 2018 at 10:09 AM Lukasz Cwik  wrote:
>
>> Before 3.0 we will still want to introduce this giving time for people to
>> migrate, would it make sense to do that now and deprecate the alternatives
>> that it replaces?
>>
>> On Mon, Nov 26, 2018 at 5:59 AM Jeff Klukas  wrote:
>>
>>> Picking up this thread again. Based on the feedback from Kenn, Reuven,
>>> and Romain, it sounds like there's no objection to the idea of
>>> SimpleFunction and SerializableFunction declaring that they throw
>>> Exception. So the discussion at this point is about whether there's an
>>> acceptable way to introduce that change.
>>>
>>> IIUC correctly, Kenn was suggesting that we need to ensure backwards
>>> compatibility for existing user code both at runtime and recompile, which
>>> means we can't simply add the declaration to the existing interfaces, since
>>> that would cause errors at compile time for user code directly invoking
>>> SerializableFunction instances.
>>>
>>> I don't see an obvious way that introducing a new functional interface
>>> would help without littering the API with more variants (it's already a bit
>>> confusing that i.e. MapElements has multiple via() methods to support three
>>> different function interfaces).
>>>
>>> Perhaps this kind of cleanup is best left for Beam 3.0?
>>>
>>> On Mon, Oct 15, 2018 at 6:51 PM Reuven Lax  wrote:
>>>
>>>> Compilation compatibility is a big part of what Beam aims to provide
>>>> with its guarantees. Romain makes a good point that most users are not
>>>> invoking SeralizableFunctions themselves (they are usually invoked inside
>>>> of Beam classes such as MapElements), however I suspect some users do these
>>>> things.
>>>>
>>>> On Sun, Oct 14, 2018 at 2:38 PM Kenneth Knowles 
>>>> wrote:
>>>>
>>>>> Romain has brought up two good aspects of backwards compatibility:
>>>>>
>>>>>  - runtime replacement vs recompile
>>>>>  - consumer (covariant) vs producer (contravariant, such as interfaces
>>>>> a user implements)
>>>>>
>>>>> In this case, I think the best shoice is covariant and contravariant
>>>>> (invariant) backwards compat including recompile compat. But we shouldn't
>>>>> assume there is one obvious definition of "backwards compatibility".
>>>>>
>>>>> Does it help to introduce a new functional interface?
>>>>>
>>>>> Kenn
>>>>>
>>>>> On Sun, Oct 14, 2018 at 6:25 AM Romain Manni-Bucau <
>>>>> rmannibu...@gmail.com> wrote:
>>>>>
>>>>>> Beam does not catch Exception for function usage so it will have to
>>>>>> do it in some places.
>>>>>>
>>>>>> A user does not have to execute the function so worse case it impacts
>>>>>> tests and in any case the most important: it does not impact the user 
>>>>>> until
>>>>>> it recompiles the code (= runtime is not impacted).
>>>>>>
>>>>>> Romain Manni-Bucau
>>>>>> @rmannibucau <https://twitter.com/rmannibucau> |  Blog
>>>>>> <https://rmannibucau.metawerx.net/> | Old Blog
>>>>>> <http://rmannibucau.wordpress.com> | Github
>>>>>> <https://github.com/rmannibucau> | LinkedIn
>>>>

Re: TextIO setting file dynamically issue

2018-11-28 Thread Jeff Klukas
You can likely achieve what you want using FileIO with dynamic
destinations, which is described in the "Advanced features" section of the
TextIO docs [0].

Your case might look something like:

 PCollection events = ...;
 events.apply(FileIO.writeDynamic()
   .by(event -> formatAsHHMMSS(event.timestamp))
   .via(TextIO.sink(), Event::toString)
   .to(timeString ->
nameFilesUsingWindowPaneAndShard("gs:///" + timeString +
"/Test", ".txt")));

This assumes the time you care about is part of the data structure you're
trying to write out. Per Reuven's point, if you wanted to use processing
time instead, your by() function could look more like your initial example:

   .by(event -> formatAsHHMMSS(new DateTime()))


[0]
https://beam.apache.org/releases/javadoc/2.8.0/index.html?org/apache/beam/sdk/io/TextIO.html#advanced-features

On Tue, Nov 27, 2018 at 6:48 PM Lukasz Cwik  wrote:

> +u...@beam.apache.org 
>
> On Mon, Nov 26, 2018 at 5:33 PM Reuven Lax  wrote:
>
>> Do you need it to change based on the timestamps of the records being
>> processed, or based on actual current time?
>>
>> On Mon, Nov 26, 2018 at 5:30 PM Matthew Schneid <
>> matthew.t.schn...@gmail.com> wrote:
>>
>>> Hello,
>>>
>>>
>>>
>>> I have an interesting issue that I can’t seem to find a reliable
>>> resolution too.
>>>
>>>
>>>
>>> I have a standard TextIO output that looks like the following:
>>>
>>>
>>>
>>> TextIO.*write*().to("gs://+ new DateTime().toString("HH-mm-ss") 
>>> + "/Test-")
>>>
>>>
>>>
>>> The above works, and writes to GSC, as I expect it too.
>>>
>>>
>>>
>>> However, it retains the instantiated datetime value, and what I need to
>>> happen is for it to dynamically change with the current time.
>>>
>>>
>>>
>>> Is this possible?
>>>
>>>
>>>
>>> Thanks for any and all help that can be provided.
>>>
>>>
>>>
>>> V/R
>>>
>>>
>>>
>>> MS
>>>
>>


Re: Design review for supporting AutoValue Coders and conversions to Row

2018-11-26 Thread Jeff Klukas
Reuven - How is the work on constructor support for ByteBuddy codegen
going? Does it still look like that's going to be a feasible way forward
for generating schemas/coders for AutoValue classes?

On Thu, Nov 15, 2018 at 4:37 PM Reuven Lax  wrote:

> I would hope so if possible.
>
> On Fri, Nov 16, 2018, 4:36 AM Kenneth Knowles 
>> Just some low-level detail: If there is no @DefaultSchema annotation but
>> it is an @AutoValue class, can schema inference go ahead with the
>> AutoValueSchema? Then the user doesn't have to do anything.
>>
>> Kenn
>>
>> On Wed, Nov 14, 2018 at 6:14 AM Reuven Lax  wrote:
>>
>>> We already have a framework for ByteBuddy codegen for JavaBean Row
>>> interfaces, which should hopefully be easy to extend AutoValue (and more
>>> efficient than using reflection). I'm working on adding constructor support
>>> to this right now.
>>>
>>> On Wed, Nov 14, 2018 at 12:29 AM Jeff Klukas 
>>> wrote:
>>>
>>>> Sounds, then, like we need to a define a new `AutoValueSchema extends
>>>> SchemaProvider` and users would opt-in to this via the DefaultSchema
>>>> annotation:
>>>>
>>>> @DefaultSchema(AutoValueSchema.class)
>>>> @AutoValue
>>>> public abstract MyClass ...
>>>>
>>>> Since we already have the JavaBean and JavaField reflection-based
>>>> schema providers to use as a guide, it sounds like it may be best to try to
>>>> implement this using reflection rather than implementing an AutoValue
>>>> extension.
>>>>
>>>> A reflection-based approach here would hinge on being able to discover
>>>> the package-private constructor for the concrete class and read its types.
>>>> Those types would define the schema, and the fromRow impementation would
>>>> call the discovered constructor.
>>>>
>>>> On Mon, Nov 12, 2018 at 10:02 AM Reuven Lax  wrote:
>>>>
>>>>>
>>>>>
>>>>> On Mon, Nov 12, 2018 at 11:38 PM Jeff Klukas 
>>>>> wrote:
>>>>>
>>>>>> Reuven - A SchemaProvider makes sense. It's not clear to me, though,
>>>>>> whether that's more limited than a Coder. Do all values of the schema 
>>>>>> have
>>>>>> to be simple types, or does Beam SQL support nested schemas?
>>>>>>
>>>>>
>>>>> Nested schemas, collection types (lists and maps), and collections of
>>>>> nested types are all supported.
>>>>>
>>>>>>
>>>>>> Put another way, would a user be able to create an AutoValue class
>>>>>> comprised of simple types and then use that as a field inside another
>>>>>> AutoValue class? I can see how that's possible with Coders, but not clear
>>>>>> whether that's possible with Row schemas.
>>>>>>
>>>>>
>>>>> Yes, this is explicitly supported.
>>>>>
>>>>>>
>>>>>> On Fri, Nov 9, 2018 at 8:22 PM Reuven Lax  wrote:
>>>>>>
>>>>>>> Hi Jeff,
>>>>>>>
>>>>>>> I would suggest a slightly different approach. Instead of generating
>>>>>>> a coder, writing a SchemaProvider that generates a schema for AutoValue.
>>>>>>> Once a PCollection has a schema, a coder is not needed (as Beam knows 
>>>>>>> how
>>>>>>> to encode any type with a schema), and it will work seamlessly with Beam
>>>>>>> SQL (in fact you don't need to write a transform to turn it into a Row 
>>>>>>> if a
>>>>>>> schema is registered).
>>>>>>>
>>>>>>> We already do this for POJOs and basic JavaBeans. I'm happy to help
>>>>>>> do this for AutoValue.
>>>>>>>
>>>>>>> Reuven
>>>>>>>
>>>>>>> On Sat, Nov 10, 2018 at 5:50 AM Jeff Klukas 
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Hi all - I'm looking for some review and commentary on a proposed
>>>>>>>> design for providing built-in Coders for AutoValue classes. There's
>>>>>>>> existing discussion in BEAM-1891 [0] about using AvroCoder, but that's
>>>>>>>> blocked on incompatibility between AutoValue and Avro's reflection
>>>>>>>> machinery that don't look resolvable.
>>>>>>>>
>>>>>>>> I wrote up a design document [1] that instead proposes using
>>>>>>>> AutoValue's extension API to automatically generate a Coder for each
>>>>>>>> AutoValue class that users generate. A similar technique could be used 
>>>>>>>> to
>>>>>>>> generate conversions to and from Row for use with BeamSql.
>>>>>>>>
>>>>>>>> I'd appreciate review of the design and thoughts on whether this
>>>>>>>> seems feasible to support within the Beam codebase. I may be missing a
>>>>>>>> simpler approach.
>>>>>>>>
>>>>>>>> [0] https://issues.apache.org/jira/browse/BEAM-1891
>>>>>>>> [1]
>>>>>>>> https://docs.google.com/document/d/1ucoik4WzUDfilqIz3I1AuMHc1J8DE6iv7gaUCDI42BI/edit?usp=sharing
>>>>>>>>
>>>>>>>


Re: Evolving a Coder for an added field

2018-11-26 Thread Jeff Klukas
Lukasz - Were you able to get any more context on the possibility of
versioning coders from other folks at Google?

It sounds like adding versioning for coders and/or schemas is potentially a
large change. At this point, should I just write up some highlights from
this thread in a JIRA issue for future tracking?

On Mon, Nov 12, 2018 at 8:23 PM Reuven Lax  wrote:

> A few thoughts:
>
> 1. I agree with you about coder versioning. The lack of a good story
> around versioning has been a huge pain here, and it's unfortunate that
> nobody ever worked on this.
>
> 2. I think versioning schemas will be easier than versioning coders
> (especially for adding new fields). In many cases I suggest we start
> looking at migrating as much as possible to schemas, and in Beam 3.0 maybe
> we can migrate all of our internal payload to schemas. Schemas support
> nested fields, repeated fields, and map fields - which can model most thing.
>
> 3. There was a Beam proposal for a way to generically handle incompatible
> schema updates via snapshots. The idea was that such updates can be
> accompanied by a transform that maps a pipeline snapshot into a new
> snapshot with the encodings modified.
>
> Reuven
>
> On Tue, Nov 13, 2018 at 3:16 AM Jeff Klukas  wrote:
>
>> Conversation here has fizzled, but sounds like there's basically a
>> consensus here on a need for a new concept of Coder versioning that's
>> accessible at the Java level in order to allow an evolution path. Further,
>> it sounds like my open PR [0] for adding a new field to Metadata is
>> essentially blocked until we have coder versioning in place.
>>
>> Is there any existing documentation of these concepts, or should I go
>> ahead and file a new Jira issue summarizing the problem? I don't think I
>> have a comprehensive enough understanding of the Coder machinery to be able
>> to design a solution, so I'd need to hand this off or simply leave it in
>> the Jira backlog.
>>
>> [0] https://github.com/apache/beam/pull/6914
>>
>>
>> On Tue, Nov 6, 2018 at 4:38 AM Robert Bradshaw 
>> wrote:
>>
>>> Yes, a Coder author should be able to register a URN with a mapping
>>> from (components + payload) -> Coder (and vice versa), and this should
>>> be more lightweight than manually editing the proto files.
>>> On Mon, Nov 5, 2018 at 7:12 PM Thomas Weise  wrote:
>>> >
>>> > +1
>>> >
>>> > I think that coders should be immutable/versioned. The SDK should know
>>> about all the available versions and be able to associate the data (stream
>>> or at rest) with the corresponding coder version via URN. We can also look
>>> how that is solved elsewhere, for example the Kafka schema registry.
>>> >
>>> > Today we only have a few URNs for standard coders:
>>> https://github.com/apache/beam/blob/master/model/pipeline/src/main/proto/beam_runner_api.proto#L617
>>> >
>>> > I imagine we will need a coder registry where IOs and users can add
>>> their versioned coders also?
>>> >
>>> > Thanks,
>>> > Thomas
>>> >
>>> >
>>> > On Mon, Nov 5, 2018 at 7:54 AM Jean-Baptiste Onofré 
>>> wrote:
>>> >>
>>> >> It makes sense to have a more concrete URN including the version.
>>> >>
>>> >> Good idea Robert.
>>> >>
>>> >> Regards
>>> >> JB
>>> >>
>>> >> On 05/11/2018 16:52, Robert Bradshaw wrote:
>>> >> > I think we'll want to allow upgrades across SDK versions. A runner
>>> >> > should be able to recognize when a coder (or any other aspect of the
>>> >> > pipeline) has changed and adapt/reject accordingly. (Until we remove
>>> >> > coders from sources/sinks, there's also possibly the expectation
>>> that
>>> >> > one should be able to read data from a source written with that same
>>> >> > coder across versions as well.)
>>> >> >
>>> >> > I think it really comes down to how coders are named. If we decide
>>> to
>>> >> > let coders change arbitrarily between versions, probably the URN for
>>> >> > SerializedJavaCoder should have the SDK version number in it. Coders
>>> >> > that are stable across SDKs can have better, more stable URNs
>>> defined
>>> >> > and registered.
>>> >> >
>>> >> > I am more OK with changing the registry to infer different coders as
>>&g

Re: Can we allow SimpleFunction and SerializableFunction to throw Exception?

2018-11-26 Thread Jeff Klukas
-Bucau
>>>>>>> @rmannibucau <https://twitter.com/rmannibucau> |  Blog
>>>>>>> <https://rmannibucau.metawerx.net/> | Old Blog
>>>>>>> <http://rmannibucau.wordpress.com> | Github
>>>>>>> <https://github.com/rmannibucau> | LinkedIn
>>>>>>> <https://www.linkedin.com/in/rmannibucau> | Book
>>>>>>> <https://www.packtpub.com/application-development/java-ee-8-high-performance>
>>>>>>>
>>>>>>>
>>>>>>> Le dim. 14 oct. 2018 à 10:49, Reuven Lax  a
>>>>>>> écrit :
>>>>>>>
>>>>>>>> But it means that other functions that call SerializableFunctions
>>>>>>>> must now declare exceptions, right? If yes, this is incompatible.
>>>>>>>>
>>>>>>>> On Sun, Oct 14, 2018 at 1:37 AM Romain Manni-Bucau <
>>>>>>>> rmannibu...@gmail.com> wrote:
>>>>>>>>
>>>>>>>>> No, only parameter types and return type is used to lookup methods.
>>>>>>>>>
>>>>>>>>> Romain Manni-Bucau
>>>>>>>>> @rmannibucau <https://twitter.com/rmannibucau> |  Blog
>>>>>>>>> <https://rmannibucau.metawerx.net/> | Old Blog
>>>>>>>>> <http://rmannibucau.wordpress.com> | Github
>>>>>>>>> <https://github.com/rmannibucau> | LinkedIn
>>>>>>>>> <https://www.linkedin.com/in/rmannibucau> | Book
>>>>>>>>> <https://www.packtpub.com/application-development/java-ee-8-high-performance>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> Le dim. 14 oct. 2018 à 09:45, Reuven Lax  a
>>>>>>>>> écrit :
>>>>>>>>>
>>>>>>>>>> I've run into this problem before as well. Doesn't changing the
>>>>>>>>>> signature involve a backwards-incompatible change though?
>>>>>>>>>>
>>>>>>>>>> On Wed, Oct 3, 2018 at 5:11 PM Jeff Klukas 
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>> I'm working on https://issues.apache.org/jira/browse/BEAM-5638
>>>>>>>>>>> to add exception handling options to single message transforms in 
>>>>>>>>>>> the Java
>>>>>>>>>>> SDK.
>>>>>>>>>>>
>>>>>>>>>>> MapElements' via() method is overloaded to accept either a
>>>>>>>>>>> SimpleFunction, a SerializableFunction, or a Contextful, all of 
>>>>>>>>>>> which are
>>>>>>>>>>> ultimately stored as a Contextful where the mapping functionis 
>>>>>>>>>>> expected to
>>>>>>>>>>> have signature:
>>>>>>>>>>>
>>>>>>>>>>> OutputT apply(InputT element, Context c) throws Exception;
>>>>>>>>>>>
>>>>>>>>>>> So Contextful.Fn allows throwing checked exceptions, but neither
>>>>>>>>>>> SerializableFunction nor SimpleFunction do. The user-provided
>>>>>>>>>>> function has to satisfy the more restrictive signature:
>>>>>>>>>>>
>>>>>>>>>>> OutputT apply(InputT input);
>>>>>>>>>>>
>>>>>>>>>>> Is there background about why we allow arbitrary checked
>>>>>>>>>>> exceptions to be thrown in one case but not the other two? Could we
>>>>>>>>>>> consider expanding SerializableFunction and SimpleFunction to
>>>>>>>>>>> the following?:
>>>>>>>>>>>
>>>>>>>>>>> OutputT apply(InputT input) throws Exception;
>>>>>>>>>>>
>>>>>>>>>>> This would, for example, simplify the implementation of
>>>>>>>>>>> ParseJsons and AsJsons, where we have to catch an IOException in
>>>>>>>>>>> MapElements#via only to rethrow as RuntimeException.
>>>>>>>>>>>
>>>>>>>>>>


Re: Design review for supporting AutoValue Coders and conversions to Row

2018-11-13 Thread Jeff Klukas
Sounds, then, like we need to a define a new `AutoValueSchema extends
SchemaProvider` and users would opt-in to this via the DefaultSchema
annotation:

@DefaultSchema(AutoValueSchema.class)
@AutoValue
public abstract MyClass ...

Since we already have the JavaBean and JavaField reflection-based schema
providers to use as a guide, it sounds like it may be best to try to
implement this using reflection rather than implementing an AutoValue
extension.

A reflection-based approach here would hinge on being able to discover the
package-private constructor for the concrete class and read its types.
Those types would define the schema, and the fromRow impementation would
call the discovered constructor.

On Mon, Nov 12, 2018 at 10:02 AM Reuven Lax  wrote:

>
>
> On Mon, Nov 12, 2018 at 11:38 PM Jeff Klukas  wrote:
>
>> Reuven - A SchemaProvider makes sense. It's not clear to me, though,
>> whether that's more limited than a Coder. Do all values of the schema have
>> to be simple types, or does Beam SQL support nested schemas?
>>
>
> Nested schemas, collection types (lists and maps), and collections of
> nested types are all supported.
>
>>
>> Put another way, would a user be able to create an AutoValue class
>> comprised of simple types and then use that as a field inside another
>> AutoValue class? I can see how that's possible with Coders, but not clear
>> whether that's possible with Row schemas.
>>
>
> Yes, this is explicitly supported.
>
>>
>> On Fri, Nov 9, 2018 at 8:22 PM Reuven Lax  wrote:
>>
>>> Hi Jeff,
>>>
>>> I would suggest a slightly different approach. Instead of generating a
>>> coder, writing a SchemaProvider that generates a schema for AutoValue. Once
>>> a PCollection has a schema, a coder is not needed (as Beam knows how to
>>> encode any type with a schema), and it will work seamlessly with Beam SQL
>>> (in fact you don't need to write a transform to turn it into a Row if a
>>> schema is registered).
>>>
>>> We already do this for POJOs and basic JavaBeans. I'm happy to help do
>>> this for AutoValue.
>>>
>>> Reuven
>>>
>>> On Sat, Nov 10, 2018 at 5:50 AM Jeff Klukas  wrote:
>>>
>>>> Hi all - I'm looking for some review and commentary on a proposed
>>>> design for providing built-in Coders for AutoValue classes. There's
>>>> existing discussion in BEAM-1891 [0] about using AvroCoder, but that's
>>>> blocked on incompatibility between AutoValue and Avro's reflection
>>>> machinery that don't look resolvable.
>>>>
>>>> I wrote up a design document [1] that instead proposes using
>>>> AutoValue's extension API to automatically generate a Coder for each
>>>> AutoValue class that users generate. A similar technique could be used to
>>>> generate conversions to and from Row for use with BeamSql.
>>>>
>>>> I'd appreciate review of the design and thoughts on whether this seems
>>>> feasible to support within the Beam codebase. I may be missing a simpler
>>>> approach.
>>>>
>>>> [0] https://issues.apache.org/jira/browse/BEAM-1891
>>>> [1]
>>>> https://docs.google.com/document/d/1ucoik4WzUDfilqIz3I1AuMHc1J8DE6iv7gaUCDI42BI/edit?usp=sharing
>>>>
>>>


Re: Evolving a Coder for an added field

2018-11-12 Thread Jeff Klukas
t;>> discussion)?
> >> >>> [2] Should we have a MetadataCoderV2? (does this imply a repeated
> >> >>> Matadata object) ? In this case where is the right place to identify
> >> >>> and decide what coder to use?
> >> >>>
> >> >>> Other ideas... ?
> >> >>>
> >> >>> Last thing, the link that Luke shared does not seem to work (looks
> >> >>> like a googley-friendly URL, here it is the full URL for those
> >> >>> interested in the drain/update proposal:
> >> >>>
> >> >>> [2]
> https://docs.google.com/document/d/1UWhnYPgui0gUYOsuGcCjLuoOUlGA4QaY91n8p3wz9MY/edit#
> >> >>> On Fri, Nov 2, 2018 at 10:11 PM Lukasz Cwik 
> wrote:
> >> >>>>
> >> >>>> I think the idea is that you would use one coder for paths where
> you don't need this information and would have FileIO provide a separate
> path that uses your updated coder.
> >> >>>> Existing users would not be impacted and users of the new FileIO
> that depend on this information would not be able to have updated their
> pipeline in the first place.
> >> >>>>
> >> >>>> If the feature in FileIO is experimental, we could choose to break
> it for existing users though since I don't know how feasible my suggestion
> above is.
> >> >>>>
> >> >>>>
> >> >>>>
> >> >>>> On Fri, Nov 2, 2018 at 12:56 PM Jeff Klukas 
> wrote:
> >> >>>>>
> >> >>>>> Lukasz - Thanks for those links. That's very helpful context.
> >> >>>>>
> >> >>>>> It sounds like there's no explicit user contract about evolving
> Coder classes in the Java SDK and users might reasonably assume Coders to
> be stable between SDK versions. Thus, users of the Dataflow or Flink
> runners might reasonably expect that they can update the Java SDK version
> used in their pipeline when performing an update.
> >> >>>>>
> >> >>>>> Based in that understanding, evolving a class like Metadata might
> not be possible except in a major version bump where it's obvious to users
> to expect breaking changes and not to expect an "update" operation to work.
> >> >>>>>
> >> >>>>> It's not clear to me what changing the "name" of a coder would
> look like or whether that's a tenable solution here. Would that change be
> able to happen within the SDK itself, or is it something users would need
> to specify?
> >> >>
> >> >> --
> >> >> Jean-Baptiste Onofré
> >> >> jbono...@apache.org
> >> >> http://blog.nanthrax.net
> >> >> Talend - http://www.talend.com
> >>
> >> --
> >> Jean-Baptiste Onofré
> >> jbono...@apache.org
> >> http://blog.nanthrax.net
> >> Talend - http://www.talend.com
>


Re: Design review for supporting AutoValue Coders and conversions to Row

2018-11-12 Thread Jeff Klukas
Reuven - A SchemaProvider makes sense. It's not clear to me, though,
whether that's more limited than a Coder. Do all values of the schema have
to be simple types, or does Beam SQL support nested schemas?

Put another way, would a user be able to create an AutoValue class
comprised of simple types and then use that as a field inside another
AutoValue class? I can see how that's possible with Coders, but not clear
whether that's possible with Row schemas.

On Fri, Nov 9, 2018 at 8:22 PM Reuven Lax  wrote:

> Hi Jeff,
>
> I would suggest a slightly different approach. Instead of generating a
> coder, writing a SchemaProvider that generates a schema for AutoValue. Once
> a PCollection has a schema, a coder is not needed (as Beam knows how to
> encode any type with a schema), and it will work seamlessly with Beam SQL
> (in fact you don't need to write a transform to turn it into a Row if a
> schema is registered).
>
> We already do this for POJOs and basic JavaBeans. I'm happy to help do
> this for AutoValue.
>
> Reuven
>
> On Sat, Nov 10, 2018 at 5:50 AM Jeff Klukas  wrote:
>
>> Hi all - I'm looking for some review and commentary on a proposed design
>> for providing built-in Coders for AutoValue classes. There's existing
>> discussion in BEAM-1891 [0] about using AvroCoder, but that's blocked on
>> incompatibility between AutoValue and Avro's reflection machinery that
>> don't look resolvable.
>>
>> I wrote up a design document [1] that instead proposes using AutoValue's
>> extension API to automatically generate a Coder for each AutoValue class
>> that users generate. A similar technique could be used to generate
>> conversions to and from Row for use with BeamSql.
>>
>> I'd appreciate review of the design and thoughts on whether this seems
>> feasible to support within the Beam codebase. I may be missing a simpler
>> approach.
>>
>> [0] https://issues.apache.org/jira/browse/BEAM-1891
>> [1]
>> https://docs.google.com/document/d/1ucoik4WzUDfilqIz3I1AuMHc1J8DE6iv7gaUCDI42BI/edit?usp=sharing
>>
>


Re: Design review for supporting AutoValue Coders and conversions to Row

2018-11-09 Thread Jeff Klukas
Anton - Thanks for reading and commenting. I've gone as far as creating a
skeleton AutoValue extension to better understand how that API works, but I
don't yet have a working prototype for either of these proposed additions.

I'll move on to prototyping the Coder generation for AutoValue classes if I
get some clear signal from this list that maintaining an AutoValue
extension for generating this code seems like a reasonable path forward.

On Fri, Nov 9, 2018 at 7:42 PM Anton Kedin  wrote:

> Hi Jeff,
>
> I think this is a great idea! Thank you for working on the proposal. I
> left couple of comments in the doc.
>
> Have you tried prototyping this?
>
> Regards,
> Anton
>


Design review for supporting AutoValue Coders and conversions to Row

2018-11-09 Thread Jeff Klukas
Hi all - I'm looking for some review and commentary on a proposed design
for providing built-in Coders for AutoValue classes. There's existing
discussion in BEAM-1891 [0] about using AvroCoder, but that's blocked on
incompatibility between AutoValue and Avro's reflection machinery that
don't look resolvable.

I wrote up a design document [1] that instead proposes using AutoValue's
extension API to automatically generate a Coder for each AutoValue class
that users generate. A similar technique could be used to generate
conversions to and from Row for use with BeamSql.

I'd appreciate review of the design and thoughts on whether this seems
feasible to support within the Beam codebase. I may be missing a simpler
approach.

[0] https://issues.apache.org/jira/browse/BEAM-1891
[1]
https://docs.google.com/document/d/1ucoik4WzUDfilqIz3I1AuMHc1J8DE6iv7gaUCDI42BI/edit?usp=sharing


Re: Evolving a Coder for an added field

2018-11-02 Thread Jeff Klukas
Lukasz - Thanks for those links. That's very helpful context.

It sounds like there's no explicit user contract about evolving Coder
classes in the Java SDK and users might reasonably assume Coders to be
stable between SDK versions. Thus, users of the Dataflow or Flink runners
might reasonably expect that they can update the Java SDK version used in
their pipeline when performing an update.

Based in that understanding, evolving a class like Metadata might not be
possible except in a major version bump where it's obvious to users to
expect breaking changes and not to expect an "update" operation to work.

It's not clear to me what changing the "name" of a coder would look like or
whether that's a tenable solution here. Would that change be able to happen
within the SDK itself, or is it something users would need to specify?


Evolving a Coder for an added field

2018-11-02 Thread Jeff Klukas
I'm adding a new lastModifiedMillis field to MatchResult.Metadata [0] which
requires also updating MetadataCoder, but it's not clear to me whether
there are guidelines to follow when evolving a type when that changes the
encoding.

Is a user allowed to update Beam library versions as part of updating a
pipeline? If so, there could be a situation where an updated pipeline is
reading state that includes Metadata encoded without the new
lastModifiedMillis field, which would cause a CodingException to be thrown.

Is there prior art for evolving a type and its Coder? Should I be defensive
and catch CodingException when attempting to decode the new field,
providing a default value?

[0] https://github.com/apache/beam/pull/6914


FileSystems should retrieve lastModified time

2018-10-29 Thread Jeff Klukas
I just wrote up a JIRA issues proposing that FileSystem implementations
retrieve lastModified time of the files they list:
https://issues.apache.org/jira/browse/BEAM-5910

Any immediate concerns? I'm not intimately familiar with HDFS, but I'm
otherwise confident that GCS, S3, and local filesystems can all give us a
suitable timestamp.

In the short term, this change would allow users to write their own polling
logic on top of FileSystems to periodically check for updates to files.
Currently, you would need to fall back to the APIs for each individual
storage provider.

Longer term, I'd love to see FileIO.match.continuously support an option
for returning updated contents when files are updated.


Re: New Edit button on beam.apache.org pages

2018-10-25 Thread Jeff Klukas
On Thu, Oct 25, 2018 at 3:17 PM Kenneth Knowles  wrote:

> What I haven't figured out is how to get GitHub to create the branch for
> the PR on your fork.
>

GitHub does that work for you. I don't have commit access to apache/beam,
so when I hit the link, it gives me a banner at the top of the edit page
that says "You’re editing a file in a project you don’t have write access
to. Submitting a change to this file will write it to a new branch in your
fork jklukas/beam, so you can send a pull request."

Looks like exactly the workflow I'd hope for.


Re: New Edit button on beam.apache.org pages

2018-10-25 Thread Jeff Klukas
Max - The website source was indeed merged into the main beam repository a
few weeks ago, separate from this change.

The edit button is a great idea!

On Thu, Oct 25, 2018 at 7:37 AM Maximilian Michels  wrote:

> Cool!
>
> I guess the underlying change is that the website can now be edited
> through the main repository and we don't have to go through "beam-site"?
>
> -Max
>
> On 25.10.18 12:20, Alexey Romanenko wrote:
> > This is really cool feature! With a tab “Preview changes” it makes
> documentation updating much more easier to do.
> > Thanks a lot to Alan and Scott!
> >
> >> On 25 Oct 2018, at 09:48, Robert Bradshaw  wrote:
> >>
> >> Very cool! Thanks!
> >> On Thu, Oct 25, 2018 at 9:38 AM Connell O'Callaghan <
> conne...@google.com> wrote:
> >>>
> >>> Alan and Scott thank you for this enhancement and for reducing the
> obstacles to entry/contribute for new (and existing) members of the BEAM
> community
> >>>
> >>> On Wed, Oct 24, 2018 at 8:50 PM Kenneth Knowles 
> wrote:
> 
>  This is a genius way to involve everyone who lands on the site! My
> first PR is about to open... :-)
> 
>  Kenn
> 
>  On Wed, Oct 24, 2018 at 8:47 PM Jean-Baptiste Onofré 
> wrote:
> >
> > Sweet !!
> >
> > Thanks !
> >
> > Regards
> > JB
> >
> > On 24/10/2018 23:24, Alan Myrvold wrote:
> >> To make small documentation changes easier, there is now an Edit
> button
> >> at the top right of the pages on https://beam.apache.org. This
> button
> >> opens the source .md file on the master branch of the beam
> repository in
> >> the github web editor. After making changes you can create a pull
> >> request to ask to have it merged.
> >>
> >> Thanks to Scott for the suggestion to add this in [BEAM-4431]
> >> 
> >>
> >> Let me know if you run into any issues.
> >>
> >> Alan
> >>
> >>
> >
>


Re: [Proposal] Add exception handling option to MapElements

2018-10-23 Thread Jeff Klukas
https://github.com/apache/beam/pull/6586 is still open for review, but I
also wanted to gather feedback about a potential refactor as part of that
change.

We could refactor MapElements, FlatMapElements, and Filter to all inherit
from a common abstract base class SingleMessageTransform. The new code for
exception handling is nearly identical between the three classes and could
be consolidated without altering the current public interfaces. Are there
concerns with adding such a base class ?

On Thu, Oct 11, 2018 at 4:44 PM Jeff Klukas  wrote:

> The PR (https://github.com/apache/beam/pull/6586) is updated now with a
> coding solution for Failure. We use AvroCoder for the Exception and inherit
> whatever the input coder was for values.
>
> The unfortunate bit is that users might provide an Exception subclass that
> doesn't provide a no-argument constructor and thus isn't
> AvroCoder-compatible. I'm currently handling this through early failure
> with context about how to choose a different exception type.
>
>
> On Fri, Oct 5, 2018 at 3:59 PM Jeff Klukas  wrote:
>
>> It would be ideal to have some higher-level way of wrapping a PTransform
>> to handle errors inside, but that indeed seems like a substantially
>> trickier thing to implement.
>>
>>
>>
>>
>>
>> On Fri, Oct 5, 2018 at 3:38 PM Reuven Lax  wrote:
>>
>>> Cool! I've left a few comments.
>>>
>>> This also makes me think whether we can implement this on ParDo as well,
>>> though that might be a bit trickier since it involves hooking into
>>> DoFnInvoker.
>>>
>>> Reuven
>>>
>>> On Fri, Oct 5, 2018 at 10:33 AM Jeff Klukas  wrote:
>>>
>>>> I've posted a full PR for the Java exception handling API that's ready
>>>> for review: https://github.com/apache/beam/pull/6586
>>>>
>>>> It implements new WithErrors nested classes on MapElements,
>>>> FlatMapElements, Filter, AsJsons, and ParseJsons.
>>>>
>>>> On Wed, Oct 3, 2018 at 7:55 PM Jeff Klukas  wrote:
>>>>
>>>>> Jira issues for adding exception handling in Java and Python SDKs:
>>>>>
>>>>> https://issues.apache.org/jira/browse/BEAM-5638
>>>>> https://issues.apache.org/jira/browse/BEAM-5639
>>>>>
>>>>> I'll plan to have a complete PR for the Java SDK put together in the
>>>>> next few days.
>>>>>
>>>>> On Wed, Oct 3, 2018 at 1:29 PM Jeff Klukas 
>>>>> wrote:
>>>>>
>>>>>> I don't personally have experience with the Python SDK, so am not
>>>>>> immediately in a position to comment on how feasible it would be to
>>>>>> introduce a similar change there. I'll plan to write up two separate 
>>>>>> issues
>>>>>> for adding exception handling in the Java and Python SDKs.
>>>>>>
>>>>>> On Wed, Oct 3, 2018 at 12:17 PM Thomas Weise  wrote:
>>>>>>
>>>>>>> +1 for the proposal as well as the suggestion to offer it in other
>>>>>>> SDKs, where applicable
>>>>>>>
>>>>>>> On Wed, Oct 3, 2018 at 8:58 AM Chamikara Jayalath <
>>>>>>> chamik...@google.com> wrote:
>>>>>>>
>>>>>>>> Sounds like a very good addition. I'd say this can be a single PR
>>>>>>>> since changes are related. Please open a JIRA for tracking.
>>>>>>>>
>>>>>>>> Have you though about introducing a similar change to Python SDK ?
>>>>>>>> (doesn't have to be the same PR).
>>>>>>>>
>>>>>>>> - Cham
>>>>>>>>
>>>>>>>> On Wed, Oct 3, 2018 at 8:31 AM Jeff Klukas 
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> If this looks good for MapElements, I agree that it makes sense to
>>>>>>>>> extend to FlatMapElements and Filter and to keep the API consistent 
>>>>>>>>> between
>>>>>>>>> them.
>>>>>>>>>
>>>>>>>>> Do you have suggestions on how to submit changes with that wider
>>>>>>>>> scope? Would one PR altering MapElements, FlatMapElements, Filter,
>>>>>>>>> ParseJsons, and AsJsons be too large to reasonably review? Should I 
>>>>>>>>> open an
>>>&

Re: 2 tier input

2018-10-22 Thread Jeff Klukas
Chaim - If the full list of IDs is able to fit comfortably in memory and
the Mongo collection is small enough that you can read the whole
collection, you may want to fetch the IDs into a Java collection using the
BigQuery API directly, then turn them into a Beam PCollection using
Create.of(collection_of_ids). You could then use MongoDbIO.read() to read
the entire collection, but throw out rows based on the side input of IDs.

If the list of IDs is particularly small, you could fetch the collection
into memory and parse that into a string filter that you pass to
MongoDbIO.read() to specify which documents to fetch, avoiding the need for
a side input.

Otherwise, if it's a large number of IDs, you may need to use Beam's
BigQueryIO to create a PCollection for the IDs, and then pass that into a
ParDo with a custom DoFn that issues Mongo queries for a batch of IDs. I'm
not very familiar with Mongo APIs, but you'd need to give the DoFn a
connection to Mongo that's serializable. You could likely look at the
implementation of MongoDbIO for inspiration there.

On Sun, Oct 21, 2018 at 5:18 AM Chaim Turkel  wrote:

> hi,
>   I have the following flow i need to implement.
> From the bigquery i run a query and get a list of id's then i need to
> load from mongo all the documents based on these id's and export them
> as an xml file.
> How do you suggest i go about doing this?
>
> chaim
>
> --
>
>
> Loans are funded by
> FinWise Bank, a Utah-chartered bank located in Sandy,
> Utah, member FDIC, Equal
> Opportunity Lender. Merchant Cash Advances are
> made by Behalf. For more
> information on ECOA, click here
> . For important information about
> opening a new
> account, review Patriot Act procedures here
> .
> Visit Legal
>  to
> review our comprehensive program terms,
> conditions, and disclosures.
>


Re: [Proposal] Add exception handling option to MapElements

2018-10-11 Thread Jeff Klukas
The PR (https://github.com/apache/beam/pull/6586) is updated now with a
coding solution for Failure. We use AvroCoder for the Exception and inherit
whatever the input coder was for values.

The unfortunate bit is that users might provide an Exception subclass that
doesn't provide a no-argument constructor and thus isn't
AvroCoder-compatible. I'm currently handling this through early failure
with context about how to choose a different exception type.


On Fri, Oct 5, 2018 at 3:59 PM Jeff Klukas  wrote:

> It would be ideal to have some higher-level way of wrapping a PTransform
> to handle errors inside, but that indeed seems like a substantially
> trickier thing to implement.
>
>
>
>
>
> On Fri, Oct 5, 2018 at 3:38 PM Reuven Lax  wrote:
>
>> Cool! I've left a few comments.
>>
>> This also makes me think whether we can implement this on ParDo as well,
>> though that might be a bit trickier since it involves hooking into
>> DoFnInvoker.
>>
>> Reuven
>>
>> On Fri, Oct 5, 2018 at 10:33 AM Jeff Klukas  wrote:
>>
>>> I've posted a full PR for the Java exception handling API that's ready
>>> for review: https://github.com/apache/beam/pull/6586
>>>
>>> It implements new WithErrors nested classes on MapElements,
>>> FlatMapElements, Filter, AsJsons, and ParseJsons.
>>>
>>> On Wed, Oct 3, 2018 at 7:55 PM Jeff Klukas  wrote:
>>>
>>>> Jira issues for adding exception handling in Java and Python SDKs:
>>>>
>>>> https://issues.apache.org/jira/browse/BEAM-5638
>>>> https://issues.apache.org/jira/browse/BEAM-5639
>>>>
>>>> I'll plan to have a complete PR for the Java SDK put together in the
>>>> next few days.
>>>>
>>>> On Wed, Oct 3, 2018 at 1:29 PM Jeff Klukas  wrote:
>>>>
>>>>> I don't personally have experience with the Python SDK, so am not
>>>>> immediately in a position to comment on how feasible it would be to
>>>>> introduce a similar change there. I'll plan to write up two separate 
>>>>> issues
>>>>> for adding exception handling in the Java and Python SDKs.
>>>>>
>>>>> On Wed, Oct 3, 2018 at 12:17 PM Thomas Weise  wrote:
>>>>>
>>>>>> +1 for the proposal as well as the suggestion to offer it in other
>>>>>> SDKs, where applicable
>>>>>>
>>>>>> On Wed, Oct 3, 2018 at 8:58 AM Chamikara Jayalath <
>>>>>> chamik...@google.com> wrote:
>>>>>>
>>>>>>> Sounds like a very good addition. I'd say this can be a single PR
>>>>>>> since changes are related. Please open a JIRA for tracking.
>>>>>>>
>>>>>>> Have you though about introducing a similar change to Python SDK ?
>>>>>>> (doesn't have to be the same PR).
>>>>>>>
>>>>>>> - Cham
>>>>>>>
>>>>>>> On Wed, Oct 3, 2018 at 8:31 AM Jeff Klukas 
>>>>>>> wrote:
>>>>>>>
>>>>>>>> If this looks good for MapElements, I agree that it makes sense to
>>>>>>>> extend to FlatMapElements and Filter and to keep the API consistent 
>>>>>>>> between
>>>>>>>> them.
>>>>>>>>
>>>>>>>> Do you have suggestions on how to submit changes with that wider
>>>>>>>> scope? Would one PR altering MapElements, FlatMapElements, Filter,
>>>>>>>> ParseJsons, and AsJsons be too large to reasonably review? Should I 
>>>>>>>> open an
>>>>>>>> overall JIRA ticket to track and break this into smaller  PRs?
>>>>>>>>
>>>>>>>> On Wed, Oct 3, 2018 at 10:31 AM Reuven Lax 
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Sounds cool. Why not support this on other transforms as well?
>>>>>>>>> (FlatMapElements, Filter, etc.)
>>>>>>>>>
>>>>>>>>> Reuven
>>>>>>>>>
>>>>>>>>> On Tue, Oct 2, 2018 at 4:49 PM Jeff Klukas 
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> I've seen a few Beam users mention the need to handle errors in
>>>>>>>>>> their transforms by using a try/catch and routing to differe

Re: [Proposal] Add exception handling option to MapElements

2018-10-05 Thread Jeff Klukas
It would be ideal to have some higher-level way of wrapping a PTransform to
handle errors inside, but that indeed seems like a substantially trickier
thing to implement.





On Fri, Oct 5, 2018 at 3:38 PM Reuven Lax  wrote:

> Cool! I've left a few comments.
>
> This also makes me think whether we can implement this on ParDo as well,
> though that might be a bit trickier since it involves hooking into
> DoFnInvoker.
>
> Reuven
>
> On Fri, Oct 5, 2018 at 10:33 AM Jeff Klukas  wrote:
>
>> I've posted a full PR for the Java exception handling API that's ready
>> for review: https://github.com/apache/beam/pull/6586
>>
>> It implements new WithErrors nested classes on MapElements,
>> FlatMapElements, Filter, AsJsons, and ParseJsons.
>>
>> On Wed, Oct 3, 2018 at 7:55 PM Jeff Klukas  wrote:
>>
>>> Jira issues for adding exception handling in Java and Python SDKs:
>>>
>>> https://issues.apache.org/jira/browse/BEAM-5638
>>> https://issues.apache.org/jira/browse/BEAM-5639
>>>
>>> I'll plan to have a complete PR for the Java SDK put together in the
>>> next few days.
>>>
>>> On Wed, Oct 3, 2018 at 1:29 PM Jeff Klukas  wrote:
>>>
>>>> I don't personally have experience with the Python SDK, so am not
>>>> immediately in a position to comment on how feasible it would be to
>>>> introduce a similar change there. I'll plan to write up two separate issues
>>>> for adding exception handling in the Java and Python SDKs.
>>>>
>>>> On Wed, Oct 3, 2018 at 12:17 PM Thomas Weise  wrote:
>>>>
>>>>> +1 for the proposal as well as the suggestion to offer it in other
>>>>> SDKs, where applicable
>>>>>
>>>>> On Wed, Oct 3, 2018 at 8:58 AM Chamikara Jayalath <
>>>>> chamik...@google.com> wrote:
>>>>>
>>>>>> Sounds like a very good addition. I'd say this can be a single PR
>>>>>> since changes are related. Please open a JIRA for tracking.
>>>>>>
>>>>>> Have you though about introducing a similar change to Python SDK ?
>>>>>> (doesn't have to be the same PR).
>>>>>>
>>>>>> - Cham
>>>>>>
>>>>>> On Wed, Oct 3, 2018 at 8:31 AM Jeff Klukas 
>>>>>> wrote:
>>>>>>
>>>>>>> If this looks good for MapElements, I agree that it makes sense to
>>>>>>> extend to FlatMapElements and Filter and to keep the API consistent 
>>>>>>> between
>>>>>>> them.
>>>>>>>
>>>>>>> Do you have suggestions on how to submit changes with that wider
>>>>>>> scope? Would one PR altering MapElements, FlatMapElements, Filter,
>>>>>>> ParseJsons, and AsJsons be too large to reasonably review? Should I 
>>>>>>> open an
>>>>>>> overall JIRA ticket to track and break this into smaller  PRs?
>>>>>>>
>>>>>>> On Wed, Oct 3, 2018 at 10:31 AM Reuven Lax  wrote:
>>>>>>>
>>>>>>>> Sounds cool. Why not support this on other transforms as well?
>>>>>>>> (FlatMapElements, Filter, etc.)
>>>>>>>>
>>>>>>>> Reuven
>>>>>>>>
>>>>>>>> On Tue, Oct 2, 2018 at 4:49 PM Jeff Klukas 
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> I've seen a few Beam users mention the need to handle errors in
>>>>>>>>> their transforms by using a try/catch and routing to different outputs
>>>>>>>>> based on whether an exception was thrown. This was particularly nicely
>>>>>>>>> written up in a post by Vallery Lancey:
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> https://medium.com/@vallerylancey/error-handling-elements-in-apache-beam-pipelines-fffdea91af2a
>>>>>>>>>
>>>>>>>>> I'd love to see this pattern better supported directly in the Beam
>>>>>>>>> API, because it currently requires the user to implement a full DoFn 
>>>>>>>>> even
>>>>>>>>> for the simplest cases.
>>>>>>>>>
>>>>>>>>> I propose we support for a MapElements-like transform that allows
>>>>>>>>> the user to specify a set of exceptions to catch and route to a 
>>>>>>>>> failure
>>>>>>>>> output. Something like:
>>>>>>>>>
>>>>>>>>> MapElements
>>>>>>>>> .via(myFunctionThatThrows)
>>>>>>>>> .withSuccessTag(successTag)
>>>>>>>>> .withFailureTag(failureTag, JsonParsingException.class)
>>>>>>>>>
>>>>>>>>> which would output a PCollectionTuple with both the successful
>>>>>>>>> outcomes of the map operation and also a collection of the inputs that
>>>>>>>>> threw JsonParsingException.
>>>>>>>>>
>>>>>>>>> To make this more concrete, I put together a proof of concept PR:
>>>>>>>>> https://github.com/apache/beam/pull/6518  I'd appreciate feedback
>>>>>>>>> about whether this seems like a worthwhile addition and a feasible 
>>>>>>>>> approach.
>>>>>>>>>
>>>>>>>>


Re: [Proposal] Add exception handling option to MapElements

2018-10-05 Thread Jeff Klukas
I've posted a full PR for the Java exception handling API that's ready for
review: https://github.com/apache/beam/pull/6586

It implements new WithErrors nested classes on MapElements,
FlatMapElements, Filter, AsJsons, and ParseJsons.

On Wed, Oct 3, 2018 at 7:55 PM Jeff Klukas  wrote:

> Jira issues for adding exception handling in Java and Python SDKs:
>
> https://issues.apache.org/jira/browse/BEAM-5638
> https://issues.apache.org/jira/browse/BEAM-5639
>
> I'll plan to have a complete PR for the Java SDK put together in the next
> few days.
>
> On Wed, Oct 3, 2018 at 1:29 PM Jeff Klukas  wrote:
>
>> I don't personally have experience with the Python SDK, so am not
>> immediately in a position to comment on how feasible it would be to
>> introduce a similar change there. I'll plan to write up two separate issues
>> for adding exception handling in the Java and Python SDKs.
>>
>> On Wed, Oct 3, 2018 at 12:17 PM Thomas Weise  wrote:
>>
>>> +1 for the proposal as well as the suggestion to offer it in other SDKs,
>>> where applicable
>>>
>>> On Wed, Oct 3, 2018 at 8:58 AM Chamikara Jayalath 
>>> wrote:
>>>
>>>> Sounds like a very good addition. I'd say this can be a single PR since
>>>> changes are related. Please open a JIRA for tracking.
>>>>
>>>> Have you though about introducing a similar change to Python SDK ?
>>>> (doesn't have to be the same PR).
>>>>
>>>> - Cham
>>>>
>>>> On Wed, Oct 3, 2018 at 8:31 AM Jeff Klukas  wrote:
>>>>
>>>>> If this looks good for MapElements, I agree that it makes sense to
>>>>> extend to FlatMapElements and Filter and to keep the API consistent 
>>>>> between
>>>>> them.
>>>>>
>>>>> Do you have suggestions on how to submit changes with that wider
>>>>> scope? Would one PR altering MapElements, FlatMapElements, Filter,
>>>>> ParseJsons, and AsJsons be too large to reasonably review? Should I open 
>>>>> an
>>>>> overall JIRA ticket to track and break this into smaller  PRs?
>>>>>
>>>>> On Wed, Oct 3, 2018 at 10:31 AM Reuven Lax  wrote:
>>>>>
>>>>>> Sounds cool. Why not support this on other transforms as well?
>>>>>> (FlatMapElements, Filter, etc.)
>>>>>>
>>>>>> Reuven
>>>>>>
>>>>>> On Tue, Oct 2, 2018 at 4:49 PM Jeff Klukas 
>>>>>> wrote:
>>>>>>
>>>>>>> I've seen a few Beam users mention the need to handle errors in
>>>>>>> their transforms by using a try/catch and routing to different outputs
>>>>>>> based on whether an exception was thrown. This was particularly nicely
>>>>>>> written up in a post by Vallery Lancey:
>>>>>>>
>>>>>>>
>>>>>>> https://medium.com/@vallerylancey/error-handling-elements-in-apache-beam-pipelines-fffdea91af2a
>>>>>>>
>>>>>>> I'd love to see this pattern better supported directly in the Beam
>>>>>>> API, because it currently requires the user to implement a full DoFn 
>>>>>>> even
>>>>>>> for the simplest cases.
>>>>>>>
>>>>>>> I propose we support for a MapElements-like transform that allows
>>>>>>> the user to specify a set of exceptions to catch and route to a failure
>>>>>>> output. Something like:
>>>>>>>
>>>>>>> MapElements
>>>>>>> .via(myFunctionThatThrows)
>>>>>>> .withSuccessTag(successTag)
>>>>>>> .withFailureTag(failureTag, JsonParsingException.class)
>>>>>>>
>>>>>>> which would output a PCollectionTuple with both the successful
>>>>>>> outcomes of the map operation and also a collection of the inputs that
>>>>>>> threw JsonParsingException.
>>>>>>>
>>>>>>> To make this more concrete, I put together a proof of concept PR:
>>>>>>> https://github.com/apache/beam/pull/6518  I'd appreciate feedback
>>>>>>> about whether this seems like a worthwhile addition and a feasible 
>>>>>>> approach.
>>>>>>>
>>>>>>


Can we allow SimpleFunction and SerializableFunction to throw Exception?

2018-10-03 Thread Jeff Klukas
I'm working on https://issues.apache.org/jira/browse/BEAM-5638 to add
exception handling options to single message transforms in the Java SDK.

MapElements' via() method is overloaded to accept either a SimpleFunction,
a SerializableFunction, or a Contextful, all of which are ultimately stored
as a Contextful where the mapping functionis expected to have signature:

OutputT apply(InputT element, Context c) throws Exception;

So Contextful.Fn allows throwing checked exceptions, but neither
SerializableFunction nor SimpleFunction do. The user-provided function has
to satisfy the more restrictive signature:

OutputT apply(InputT input);

Is there background about why we allow arbitrary checked exceptions to be
thrown in one case but not the other two? Could we consider expanding
SerializableFunction and SimpleFunction to the following?:

OutputT apply(InputT input) throws Exception;

This would, for example, simplify the implementation of ParseJsons and
AsJsons, where we have to catch an IOException in MapElements#via only to
rethrow as RuntimeException.


Re: [Proposal] Add exception handling option to MapElements

2018-10-03 Thread Jeff Klukas
Jira issues for adding exception handling in Java and Python SDKs:

https://issues.apache.org/jira/browse/BEAM-5638
https://issues.apache.org/jira/browse/BEAM-5639

I'll plan to have a complete PR for the Java SDK put together in the next
few days.

On Wed, Oct 3, 2018 at 1:29 PM Jeff Klukas  wrote:

> I don't personally have experience with the Python SDK, so am not
> immediately in a position to comment on how feasible it would be to
> introduce a similar change there. I'll plan to write up two separate issues
> for adding exception handling in the Java and Python SDKs.
>
> On Wed, Oct 3, 2018 at 12:17 PM Thomas Weise  wrote:
>
>> +1 for the proposal as well as the suggestion to offer it in other SDKs,
>> where applicable
>>
>> On Wed, Oct 3, 2018 at 8:58 AM Chamikara Jayalath 
>> wrote:
>>
>>> Sounds like a very good addition. I'd say this can be a single PR since
>>> changes are related. Please open a JIRA for tracking.
>>>
>>> Have you though about introducing a similar change to Python SDK ?
>>> (doesn't have to be the same PR).
>>>
>>> - Cham
>>>
>>> On Wed, Oct 3, 2018 at 8:31 AM Jeff Klukas  wrote:
>>>
>>>> If this looks good for MapElements, I agree that it makes sense to
>>>> extend to FlatMapElements and Filter and to keep the API consistent between
>>>> them.
>>>>
>>>> Do you have suggestions on how to submit changes with that wider scope?
>>>> Would one PR altering MapElements, FlatMapElements, Filter, ParseJsons, and
>>>> AsJsons be too large to reasonably review? Should I open an overall JIRA
>>>> ticket to track and break this into smaller  PRs?
>>>>
>>>> On Wed, Oct 3, 2018 at 10:31 AM Reuven Lax  wrote:
>>>>
>>>>> Sounds cool. Why not support this on other transforms as well?
>>>>> (FlatMapElements, Filter, etc.)
>>>>>
>>>>> Reuven
>>>>>
>>>>> On Tue, Oct 2, 2018 at 4:49 PM Jeff Klukas 
>>>>> wrote:
>>>>>
>>>>>> I've seen a few Beam users mention the need to handle errors in their
>>>>>> transforms by using a try/catch and routing to different outputs based on
>>>>>> whether an exception was thrown. This was particularly nicely written up 
>>>>>> in
>>>>>> a post by Vallery Lancey:
>>>>>>
>>>>>>
>>>>>> https://medium.com/@vallerylancey/error-handling-elements-in-apache-beam-pipelines-fffdea91af2a
>>>>>>
>>>>>> I'd love to see this pattern better supported directly in the Beam
>>>>>> API, because it currently requires the user to implement a full DoFn even
>>>>>> for the simplest cases.
>>>>>>
>>>>>> I propose we support for a MapElements-like transform that allows the
>>>>>> user to specify a set of exceptions to catch and route to a failure 
>>>>>> output.
>>>>>> Something like:
>>>>>>
>>>>>> MapElements
>>>>>> .via(myFunctionThatThrows)
>>>>>> .withSuccessTag(successTag)
>>>>>> .withFailureTag(failureTag, JsonParsingException.class)
>>>>>>
>>>>>> which would output a PCollectionTuple with both the successful
>>>>>> outcomes of the map operation and also a collection of the inputs that
>>>>>> threw JsonParsingException.
>>>>>>
>>>>>> To make this more concrete, I put together a proof of concept PR:
>>>>>> https://github.com/apache/beam/pull/6518  I'd appreciate feedback
>>>>>> about whether this seems like a worthwhile addition and a feasible 
>>>>>> approach.
>>>>>>
>>>>>


Re: [Proposal] Add exception handling option to MapElements

2018-10-03 Thread Jeff Klukas
I don't personally have experience with the Python SDK, so am not
immediately in a position to comment on how feasible it would be to
introduce a similar change there. I'll plan to write up two separate issues
for adding exception handling in the Java and Python SDKs.

On Wed, Oct 3, 2018 at 12:17 PM Thomas Weise  wrote:

> +1 for the proposal as well as the suggestion to offer it in other SDKs,
> where applicable
>
> On Wed, Oct 3, 2018 at 8:58 AM Chamikara Jayalath 
> wrote:
>
>> Sounds like a very good addition. I'd say this can be a single PR since
>> changes are related. Please open a JIRA for tracking.
>>
>> Have you though about introducing a similar change to Python SDK ?
>> (doesn't have to be the same PR).
>>
>> - Cham
>>
>> On Wed, Oct 3, 2018 at 8:31 AM Jeff Klukas  wrote:
>>
>>> If this looks good for MapElements, I agree that it makes sense to
>>> extend to FlatMapElements and Filter and to keep the API consistent between
>>> them.
>>>
>>> Do you have suggestions on how to submit changes with that wider scope?
>>> Would one PR altering MapElements, FlatMapElements, Filter, ParseJsons, and
>>> AsJsons be too large to reasonably review? Should I open an overall JIRA
>>> ticket to track and break this into smaller  PRs?
>>>
>>> On Wed, Oct 3, 2018 at 10:31 AM Reuven Lax  wrote:
>>>
>>>> Sounds cool. Why not support this on other transforms as well?
>>>> (FlatMapElements, Filter, etc.)
>>>>
>>>> Reuven
>>>>
>>>> On Tue, Oct 2, 2018 at 4:49 PM Jeff Klukas  wrote:
>>>>
>>>>> I've seen a few Beam users mention the need to handle errors in their
>>>>> transforms by using a try/catch and routing to different outputs based on
>>>>> whether an exception was thrown. This was particularly nicely written up 
>>>>> in
>>>>> a post by Vallery Lancey:
>>>>>
>>>>>
>>>>> https://medium.com/@vallerylancey/error-handling-elements-in-apache-beam-pipelines-fffdea91af2a
>>>>>
>>>>> I'd love to see this pattern better supported directly in the Beam
>>>>> API, because it currently requires the user to implement a full DoFn even
>>>>> for the simplest cases.
>>>>>
>>>>> I propose we support for a MapElements-like transform that allows the
>>>>> user to specify a set of exceptions to catch and route to a failure 
>>>>> output.
>>>>> Something like:
>>>>>
>>>>> MapElements
>>>>> .via(myFunctionThatThrows)
>>>>> .withSuccessTag(successTag)
>>>>> .withFailureTag(failureTag, JsonParsingException.class)
>>>>>
>>>>> which would output a PCollectionTuple with both the successful
>>>>> outcomes of the map operation and also a collection of the inputs that
>>>>> threw JsonParsingException.
>>>>>
>>>>> To make this more concrete, I put together a proof of concept PR:
>>>>> https://github.com/apache/beam/pull/6518  I'd appreciate feedback
>>>>> about whether this seems like a worthwhile addition and a feasible 
>>>>> approach.
>>>>>
>>>>


Re: [Proposal] Add exception handling option to MapElements

2018-10-03 Thread Jeff Klukas
If this looks good for MapElements, I agree that it makes sense to extend
to FlatMapElements and Filter and to keep the API consistent between them.

Do you have suggestions on how to submit changes with that wider scope?
Would one PR altering MapElements, FlatMapElements, Filter, ParseJsons, and
AsJsons be too large to reasonably review? Should I open an overall JIRA
ticket to track and break this into smaller  PRs?

On Wed, Oct 3, 2018 at 10:31 AM Reuven Lax  wrote:

> Sounds cool. Why not support this on other transforms as well?
> (FlatMapElements, Filter, etc.)
>
> Reuven
>
> On Tue, Oct 2, 2018 at 4:49 PM Jeff Klukas  wrote:
>
>> I've seen a few Beam users mention the need to handle errors in their
>> transforms by using a try/catch and routing to different outputs based on
>> whether an exception was thrown. This was particularly nicely written up in
>> a post by Vallery Lancey:
>>
>>
>> https://medium.com/@vallerylancey/error-handling-elements-in-apache-beam-pipelines-fffdea91af2a
>>
>> I'd love to see this pattern better supported directly in the Beam API,
>> because it currently requires the user to implement a full DoFn even for
>> the simplest cases.
>>
>> I propose we support for a MapElements-like transform that allows the
>> user to specify a set of exceptions to catch and route to a failure output.
>> Something like:
>>
>> MapElements
>> .via(myFunctionThatThrows)
>> .withSuccessTag(successTag)
>> .withFailureTag(failureTag, JsonParsingException.class)
>>
>> which would output a PCollectionTuple with both the successful outcomes
>> of the map operation and also a collection of the inputs that threw
>> JsonParsingException.
>>
>> To make this more concrete, I put together a proof of concept PR:
>> https://github.com/apache/beam/pull/6518  I'd appreciate feedback about
>> whether this seems like a worthwhile addition and a feasible approach.
>>
>


[Proposal] Add exception handling option to MapElements

2018-10-02 Thread Jeff Klukas
I've seen a few Beam users mention the need to handle errors in their
transforms by using a try/catch and routing to different outputs based on
whether an exception was thrown. This was particularly nicely written up in
a post by Vallery Lancey:

https://medium.com/@vallerylancey/error-handling-elements-in-apache-beam-pipelines-fffdea91af2a

I'd love to see this pattern better supported directly in the Beam API,
because it currently requires the user to implement a full DoFn even for
the simplest cases.

I propose we support for a MapElements-like transform that allows the user
to specify a set of exceptions to catch and route to a failure output.
Something like:

MapElements
.via(myFunctionThatThrows)
.withSuccessTag(successTag)
.withFailureTag(failureTag, JsonParsingException.class)

which would output a PCollectionTuple with both the successful outcomes of
the map operation and also a collection of the inputs that threw
JsonParsingException.

To make this more concrete, I put together a proof of concept PR:
https://github.com/apache/beam/pull/6518  I'd appreciate feedback about
whether this seems like a worthwhile addition and a feasible approach.


Re: Are there plans for removing joda-time from the beam java SDK?

2018-09-27 Thread Jeff Klukas
I agree that there's no path to removing joda-time as a dependency in 2.x.
If we can it a goal for 3.0 to use java.time consistently and remove
joda-time at that point, I'd be very happy.

I have no context, though, on timeline for a 3.0 release. If that's
sometime in the next year, then there's likely too much cost and not enough
benefit to partially supporting java.time in 2.x. It would be better to
leave joda-time as the preferred time library.

If Beam 3.0 is further out, then it may still be worth considering if we
can add at least partial java.time support. I expect more organizations in
the next few years will be trying to enforce use of java.time over
joda-time.

On Thu, Sep 27, 2018 at 7:59 AM Robert Bradshaw  wrote:

> As long as joda stays anywhere in the public API (and removing it would be
> a backwards incompatible change) we can't drop it as a dependency.
>
> While we could provide java.time overloads for time-accepting methods,
> time-returning methods can't be as transparently interchangeable. I'm not
> sure whether the duplication is worth it (until we move to 3.0, at which
> point if it's mechanical enough (for us and our users) we could just switch
> over).
>
> The situation I'd really rather end up in is where some methods take only
> joda time, some methods take only java.time, and some take both. Similarly
> with return values. Whatever we do we should be consistent.
>
> On Thu, Sep 27, 2018 at 12:00 PM Łukasz Gajowy 
> wrote:
>
>> +1 to removing joda. IMO from now on we should favor java.time in reviews
>> over joda.time in new features and feel free to replace joda when
>> refactoring is done in places where code stays backward-compatibile and
>> doesn't get duplicated (eg. some class internals, not exposed through class
>> interface). I'm not sure if adding methods with alternative signatures only
>> because of the time is the way to go (lots of duplication, low gain?). I
>> think we should wait with places like this until the 3.0 version.
>>
>> Łukasz
>>
>> śr., 26 wrz 2018, 20:16 użytkownik Jeff Klukas 
>> napisał:
>>
>>> Looks like https://github.com/apache/beam/pull/4964 is somewhat
>>> different from what I had in mind. As Reuven mentioned, I'm specifically
>>> interested in using the Java 8 java.time API as a drop-in replacement for
>>> joda-time objects so that we don't have to rely on an external library. PR
>>> 4964 is using joda-time objects to replace older java.util and java.sql
>>> objects with richer joda-time alternatives.
>>>
>>> Reuven mentioned a "list of things to do when we release Beam 3.0". Is
>>> there a JIRA issue or other document that's tracking Beam 3.0 work?
>>>
>>> Reuven also mentioned that using java.time would introduce
>>> backwards-incompatible changes to the Beam 2.x API, but in many cases (such
>>> as FixedWindows) we could introduce alternative method signatures so that
>>> we can support both joda and java.time. If there are methods that return
>>> joda-time objects, it may be less feasible to support both.
>>>
>>> On Wed, Sep 26, 2018 at 1:51 PM Jean-Baptiste Onofré 
>>> wrote:
>>>
>>>> +1
>>>>
>>>> It makes sense to me and it's also a plan to "split" the core in more
>>>> grained modules, and give a more API flavor to Beam 3.
>>>>
>>>> Regards
>>>> JB
>>>> Le 26 sept. 2018, à 13:49, Reuven Lax  a écrit:
>>>>>
>>>>> We started with Joda because Java 7 time classes were insufficient for
>>>>> our needs. Now that we're on Java 8 we could use Java 8's time libraries
>>>>> (which are much better), but unfortunately that would create
>>>>> backwards-incompatible changes to our APIs. We should add this to the list
>>>>> of things to do when we release Beam 3.0.
>>>>>
>>>>> Reuven
>>>>>
>>>>> On Wed, Sep 26, 2018 at 10:43 AM Andrew Pilloud < apill...@google.com>
>>>>> wrote:
>>>>>
>>>>>> Last I heard we were actually moving the other way, replacing
>>>>>> java.time with joda-time. See the giant schema PR here:
>>>>>> https://github.com/apache/beam/pull/4964 I don't think this was ever
>>>>>> discussed on the list though.
>>>>>>
>>>>>> Andrew
>>>>>>
>>>>>> On Wed, Sep 26, 2018 at 9:21 AM Jeff Klukas < jklu...@mozilla.com>
>>>>>> wrote:
>>>>>>
>>>>>

Re: Are there plans for removing joda-time from the beam java SDK?

2018-09-26 Thread Jeff Klukas
Looks like https://github.com/apache/beam/pull/4964 is somewhat different
from what I had in mind. As Reuven mentioned, I'm specifically interested
in using the Java 8 java.time API as a drop-in replacement for joda-time
objects so that we don't have to rely on an external library. PR 4964 is
using joda-time objects to replace older java.util and java.sql objects
with richer joda-time alternatives.

Reuven mentioned a "list of things to do when we release Beam 3.0". Is
there a JIRA issue or other document that's tracking Beam 3.0 work?

Reuven also mentioned that using java.time would introduce
backwards-incompatible changes to the Beam 2.x API, but in many cases (such
as FixedWindows) we could introduce alternative method signatures so that
we can support both joda and java.time. If there are methods that return
joda-time objects, it may be less feasible to support both.

On Wed, Sep 26, 2018 at 1:51 PM Jean-Baptiste Onofré 
wrote:

> +1
>
> It makes sense to me and it's also a plan to "split" the core in more
> grained modules, and give a more API flavor to Beam 3.
>
> Regards
> JB
> Le 26 sept. 2018, à 13:49, Reuven Lax  a écrit:
>>
>> We started with Joda because Java 7 time classes were insufficient for
>> our needs. Now that we're on Java 8 we could use Java 8's time libraries
>> (which are much better), but unfortunately that would create
>> backwards-incompatible changes to our APIs. We should add this to the list
>> of things to do when we release Beam 3.0.
>>
>> Reuven
>>
>> On Wed, Sep 26, 2018 at 10:43 AM Andrew Pilloud < apill...@google.com>
>> wrote:
>>
>>> Last I heard we were actually moving the other way, replacing java.time
>>> with joda-time. See the giant schema PR here:
>>> https://github.com/apache/beam/pull/4964 I don't think this was ever
>>> discussed on the list though.
>>>
>>> Andrew
>>>
>>> On Wed, Sep 26, 2018 at 9:21 AM Jeff Klukas < jklu...@mozilla.com>
>>> wrote:
>>>
>>>> It looks like there a few spots in the Beam Java API where users have
>>>> to provide joda-time objects, such as
>>>> FixedWindows#of(org.joda.time.Duration).
>>>>
>>>> Are there any plans to support java.time objects in addition to joda
>>>> objects? Any plans to eventually remove joda-time?
>>>>
>>>> My personal interest is that my team would like to eventually
>>>> standardize on usage of java.time and remove all explicit usage of
>>>> joda-time in our codebases. Even if joda-time is still pulled in
>>>> transitively by the beam java SDK and used internally, it would be nice for
>>>> users to be able to avoid explicit interaction with joda-time. I'm
>>>> imagining it would be possible to provide additional methods like
>>>> FixedWindows#of(java.time.Duration) and potentially marking the joda-based
>>>> variants as deprecated.
>>>>
>>>> Does this seem worthy of opening a JIRA issue?
>>>>
>>>>


Re: [Proposal] Add a static PTransform.compose() method for composing transforms in a lambda expression

2018-09-19 Thread Jeff Klukas
Thanks for the thoughts, Lukasz. These are exactly the kinds of issues I
was hoping to get context on, since I don't yet have extensive experience
with beam.

I have not yet run into issues where the output coder was not able to be
inferred. I expect this may be a non-issue, as the individual transforms
used within a user-provided lambda expression would presumably expose the
ability to specify a coder.

I don't have enough context yet to comment on whether display data might be
an issue, so I do hope the user list can provide input there.

On Wed, Sep 19, 2018 at 4:52 PM Lukasz Cwik  wrote:

> Thanks for the proposal and it does seem to make the API cleaner to build
> anonymous composite transforms.
>
> In your experience have you had issues where the API doesn't work out well
> because the PTransform:
> * is not able to override how the output coder is inferred?
> * can't supply display data?
>
> +u...@beam.apache.org , do users think that the
> provided API would be useful enough for it to be added to the core SDK or
> would the addition of the method provide noise/detract from the existing
> API?
>
> On Mon, Sep 17, 2018 at 12:57 PM Jeff Klukas  wrote:
>
>> I've gone ahead and filed a JIRA Issue and GitHub PR to follow up on this
>> suggestion and make it more concrete:
>>
>> https://issues.apache.org/jira/browse/BEAM-5413
>> https://github.com/apache/beam/pull/6414
>>
>> On Fri, Sep 14, 2018 at 1:42 PM Jeff Klukas  wrote:
>>
>>> Hello all, I'm a data engineer at Mozilla working on a first project
>>> using Beam. I've been impressed with the usability of the API as there are
>>> good built-in solutions for handling many simple transformation cases with
>>> minimal code, and wanted to discuss one bit of ergonomics that seems to be
>>> missing.
>>>
>>> It appears that none of the existing PTransform factories are generic
>>> enough to take in or output a PCollectionTuple, but we've found many use
>>> cases where it's convenient to apply a few transforms on a PCollectionTuple
>>> in a lambda expression.
>>>
>>> For example, we've defined several PTransforms that return main and
>>> error output stream bundled in a PCollectionTuple. We defined a
>>> CompositeTransform interface so that we could handle the error output in a
>>> lambda expression like:
>>>
>>> pipeline
>>> .apply("attempt to deserialize messages", new
>>> MyDeserializationTransform())
>>> .apply("write deserialization errors",
>>> CompositeTransform.of((PCollectionTuple input) -> {
>>> input.get(errorTag).apply(new MyErrorOutputTransform())
>>> return input.get(mainTag);
>>> })
>>> .apply("more processing on the deserialized messages", new
>>> MyOtherTransform())
>>>
>>> I'd be interested in contributing a patch to add this functionality,
>>> perhaps as a static method PTransform.compose(). Would that patch be
>>> welcome? Are there other thoughts on naming?
>>>
>>> The full code of the CompositeTransform interface we're currently using
>>> is included below.
>>>
>>>
>>> public interface CompositeTransform>> extends POutput> {
>>>   OutputT expand(InputT input);
>>>
>>>   /**
>>>* The public factory method that serves as the entrypoint for users
>>> to create a composite PTransform.
>>>*/
>>>   static 
>>> PTransform of(CompositeTransform>> OutputT> transform) {
>>> return new PTransform() {
>>>   @Override
>>>   public OutputT expand(InputT input) {
>>> return transform.expand(input);
>>>   }
>>> };
>>>   }
>>> }
>>>
>>>
>>>
>>>


Re: [Proposal] Add a static PTransform.compose() method for composing transforms in a lambda expression

2018-09-17 Thread Jeff Klukas
I've gone ahead and filed a JIRA Issue and GitHub PR to follow up on this
suggestion and make it more concrete:

https://issues.apache.org/jira/browse/BEAM-5413
https://github.com/apache/beam/pull/6414

On Fri, Sep 14, 2018 at 1:42 PM Jeff Klukas  wrote:

> Hello all, I'm a data engineer at Mozilla working on a first project using
> Beam. I've been impressed with the usability of the API as there are good
> built-in solutions for handling many simple transformation cases with
> minimal code, and wanted to discuss one bit of ergonomics that seems to be
> missing.
>
> It appears that none of the existing PTransform factories are generic
> enough to take in or output a PCollectionTuple, but we've found many use
> cases where it's convenient to apply a few transforms on a PCollectionTuple
> in a lambda expression.
>
> For example, we've defined several PTransforms that return main and error
> output stream bundled in a PCollectionTuple. We defined a
> CompositeTransform interface so that we could handle the error output in a
> lambda expression like:
>
> pipeline
> .apply("attempt to deserialize messages", new
> MyDeserializationTransform())
> .apply("write deserialization errors",
> CompositeTransform.of((PCollectionTuple input) -> {
> input.get(errorTag).apply(new MyErrorOutputTransform())
> return input.get(mainTag);
> })
> .apply("more processing on the deserialized messages", new
> MyOtherTransform())
>
> I'd be interested in contributing a patch to add this functionality,
> perhaps as a static method PTransform.compose(). Would that patch be
> welcome? Are there other thoughts on naming?
>
> The full code of the CompositeTransform interface we're currently using is
> included below.
>
>
> public interface CompositeTransform POutput> {
>   OutputT expand(InputT input);
>
>   /**
>* The public factory method that serves as the entrypoint for users to
> create a composite PTransform.
>*/
>   static 
> PTransform of(CompositeTransform
> transform) {
> return new PTransform() {
>   @Override
>   public OutputT expand(InputT input) {
> return transform.expand(input);
>   }
> };
>   }
> }
>
>
>
>


[Proposal] Add a static PTransform.compose() method for composing transforms in a lambda expression

2018-09-14 Thread Jeff Klukas
Hello all, I'm a data engineer at Mozilla working on a first project using
Beam. I've been impressed with the usability of the API as there are good
built-in solutions for handling many simple transformation cases with
minimal code, and wanted to discuss one bit of ergonomics that seems to be
missing.

It appears that none of the existing PTransform factories are generic
enough to take in or output a PCollectionTuple, but we've found many use
cases where it's convenient to apply a few transforms on a PCollectionTuple
in a lambda expression.

For example, we've defined several PTransforms that return main and error
output stream bundled in a PCollectionTuple. We defined a
CompositeTransform interface so that we could handle the error output in a
lambda expression like:

pipeline
.apply("attempt to deserialize messages", new
MyDeserializationTransform())
.apply("write deserialization errors",
CompositeTransform.of((PCollectionTuple input) -> {
input.get(errorTag).apply(new MyErrorOutputTransform())
return input.get(mainTag);
})
.apply("more processing on the deserialized messages", new
MyOtherTransform())

I'd be interested in contributing a patch to add this functionality,
perhaps as a static method PTransform.compose(). Would that patch be
welcome? Are there other thoughts on naming?

The full code of the CompositeTransform interface we're currently using is
included below.


public interface CompositeTransform {
  OutputT expand(InputT input);

  /**
   * The public factory method that serves as the entrypoint for users to
create a composite PTransform.
   */
  static 
PTransform of(CompositeTransform
transform) {
return new PTransform() {
  @Override
  public OutputT expand(InputT input) {
return transform.expand(input);
  }
};
  }
}