[ANNOUNCE] New committer announcement: Gleb Kanterov

2019-01-24 Thread Kenneth Knowles
Hi all,

Please join me and the rest of the Beam PMC in welcoming a new committer: Gleb
Kanterov

Gleb started contributing to Beam and quickly dove deep, doing some
sensitive fixes to schemas, also general build issues, Beam SQL, Avro, and
more. In consideration of Gleb's technical and community contributions, the
Beam PMC trusts Gleb with the responsibilities of a Beam committer [1].

Thank you, Gleb, for your contributions.

Kenn

[1] https://beam.apache.org/contribute/become-a-committer/#an-apache-beam-
committer


BEAM-6324 / #7340: "I've pretty much given up on the PR being merged. I use my own fork for my projects"

2019-01-24 Thread Kenneth Knowles
The subject line is a quote from BEAM-6324*

This makes me sad. I hope/expect it is a failure to route a pull request to
the right reviewer. I am less sad about the functionality than the
sentiment and how a contributor is being discouraged.

Does anyone have ideas that could help?

Kenn

*https://issues.apache.org/jira/browse/BEAM-6324


Re: compileJava broken on master see: BEAM-6495

2019-01-24 Thread Alex Amato
Please try rebasing from master, I believe this issue has been resolved.

On Thu, Jan 24, 2019 at 3:29 PM Ryan Williams  wrote:

> I'm seeing every ≈third `./gradlew compileJava` fail locally due to this;
> re-running the commit has always succeeded, so far.
>
> Sounds like there is not an immediate fix in the works / no one assigned
> on the JIRA?
>
> On Wed, Jan 23, 2019 at 3:17 PM Kenneth Knowles  wrote:
>
>> This might connect to vendoring Calcite. It will be easiest, and have the
>> best incremental build, if we separate the generated code into its own
>> module that has relocation to match the vendored Calcite.
>>
>> Kenn
>>
>> On Wed, Jan 23, 2019 at 11:29 AM Anton Kedin  wrote:
>>
>>> We don't pre-generate the code as a separate step. Code gen from the SQL
>>> parser syntax spec and its compilation happens both during the Beam SQL
>>> build task. Splitting the code generation and compilation might not be
>>> trivial. We definitely should look into fixing this though.
>>>
>>> Regards,
>>> Anton
>>>
>>> On Wed, Jan 23, 2019 at 11:13 AM Alex Amato  wrote:
>>>
 Okay, make sense perhaps we can somehow make it fail when it fails to
 generate the dep, rather than when compiling the java code later on

 On Wed, Jan 23, 2019 at 11:12 AM Anton Kedin  wrote:

> ParserImpl is autogenerated by Calcite at build time. It seems that
> there's a race condition there and it sometimes fails. Rerunning the build
> works for me.
>
> Regards,
> Anton
>
> On Wed, Jan 23, 2019, 11:06 AM Alex Amato  wrote:
>
>> https://jira.apache.org/jira/browse/BEAM-6495?filter=-2
>>
>> Any ideas, how this got through the precommit?
>>
>> > Task :beam-sdks-java-extensions-sql:compileJava FAILED
>>
>> /usr/local/google/home/ajamato/go/src/
>> github.com/apache/beam/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/JdbcFactory.java:29:
>> error: cannot find symbol
>>
>> import
>> org.apache.beam.sdk.extensions.sql.impl.parser.impl.BeamSqlParserImpl;
>>
>>   ^
>>
>>   symbol:   class BeamSqlParserImpl
>>
>>   location: package
>> org.apache.beam.sdk.extensions.sql.impl.parser.impl
>>
>> 1 error
>>
>>


Re: Jenkins seed job fails

2019-01-24 Thread Thomas Weise
Thanks Scott.

I removed beam_PreCommit_Python_PVR_Flink_XXX and hopefully that will
unblock the seed job.


On Thu, Jan 24, 2019 at 1:05 PM Scott Wegner  wrote:

> job_PreCommit_Python_ValidatesRunner_Flink.groovy has [1]:
>
>previousNames('beam_PostCommit_Python_VR_Flink')
>
> This is used by Jenkin to rename an existing job to the new name in order
> to retain history. In this case, both old a new Job names already exist, so
> it's failing:
>
> * https://builds.apache.org/job/beam_PreCommit_Python_PVR_Flink_Cron/
> * https://builds.apache.org/job/beam_PostCommit_Python_VR_Flink/
>
> The jobs got into a similar mess before [2], and I believe it's due to
> having two different SeedJobs producing the same jobs.
>
> To unblock this case, I suggest manually
> deleting beam_PostCommit_Python_VR_Flink, as it appears the job history has
> already been migrated to _Cron. We should also consolidate the Seed Job
> functionality to a single job to prevent this from happening again.
>
> Note: I don't think it's correct to use the 'previousNames' feature with
> PrecommitJobBuilder as it creates three variants of the same job (normal,
> '_PR', '_Cron'); I'm not sure how Jenkins would handle the renaming;
> perhaps just use the first one.
>
> [1]
> https://github.com/apache/beam/blob/98952d20b5ad5e5cb74d53c08423429b20eb6523/.test-infra/jenkins/job_PreCommit_Python_ValidatesRunner_Flink.groovy#L35
> [2] https://issues.apache.org/jira/browse/BEAM-6250
>
> On Thu, Jan 24, 2019 at 12:09 PM Kenneth Knowles  wrote:
>
>> Jenkins has no understanding of this. Those are just different job names
>> that our scripts create. Is it a bug in PreCommitJobBuilder?
>>
>> Kenn
>>
>> On Thu, Jan 24, 2019 at 11:22 AM Ankur Goenka  wrote:
>>
>>> It seems that the cron is not deleted if the job is deleted.
>>> I ran the seed job manually to test the new job which would have created
>>> the cron.
>>>
>>> I am not aware of jenkins internal but it will be great if we can clean
>>> up the cron on job deletion.
>>>
>>> On Thu, Jan 24, 2019 at 7:40 AM Thomas Weise  wrote:
>>>
 Processing DSL script job_PreCommit_Python.groovy
 Processing DSL script job_PreCommit_Python_ValidatesRunner_Flink.groovy
 java.lang.IllegalArgumentException: beam_PreCommit_Python_PVR_Flink_Cron 
 already exists
at hudson.model.Items.verifyItemDoesNotAlreadyExist(Items.java:641)
at hudson.model.AbstractItem.renameTo(AbstractItem.java:372)
at hudson.model.Job.renameTo(Job.java:653)


 Can someone with perms please cleanup old jobs?

 Thanks!


>
> --
>
>
>
>
> Got feedback? tinyurl.com/swegner-feedback
>


Re: Cross-language pipelines

2019-01-24 Thread Robert Bradshaw
On Fri, Jan 25, 2019 at 12:18 AM Reuven Lax  wrote:
>
> On Thu, Jan 24, 2019 at 2:38 PM Robert Bradshaw  wrote:
>>
>> On Thu, Jan 24, 2019 at 6:43 PM Reuven Lax  wrote:
>> >
>> > Keep in mind that these user-supplied lambdas are commonly used in our 
>> > IOs. One common usage is in Sink IOs, to allow dynamic destinations. e.g. 
>> > in BigQueryIO.Write, a user-supplied lambda determines what table a record 
>> > should be written to.
>>
>> This can probably be pre-computed upstream (as part of the wrapping
>> composite that does take a language-native lamdba) and placed in a
>> standard format (e.g. a tuple or other schema) to be extracted by the
>> "core" sink.
>
> I'm not quite sure what you mean. How will you express a lambda as a tuple? 
> Or are you suggesting that we preapply all the lambdas and pass the result 
> down?

Exactly.

> That might work, but would be _far_ more expensive.

Calling back to the SDK on each application would likely be (a
different kind of) expensive.

> The result of the lambda is sometimes must larger than the input (e.g. the 
> result could be a fully-qualified  output location string), so these IOs try 
> and delay application as much as possible; as a result, the actual 
> application is often deep inside the graph.

Batching such PRCs gets messy (though perhaps we'll have to go there).
Some hybrid approach where we compute the truly dynamic part eagerly
and do some "boring" (known URN) application like prefixing with a
prefix delayed may sometimes be possible. Some applications may lend
themselves to interleaving (e.g. so the large lambda output is never
shuffled, but still crosses the data plane).

Worst case there are features that simply wouldn't be available, or at
least not cheaply, until an SDK-native source is written, but it could
still be a huge win for a lot of usecases.

As I said, we just don't have any good answers for this bit yet :).

>>
>> > Given that IOs are one of the big selling points of cross-language 
>> > support, we should think about how we can support this functionality.
>>
>> Yes. There are user-supplied lambdas that can't be as easily pre- or
>> post-applied, and though we had some brainstorming sessions (~ a year
>> ago) we're far from a (good) answer to that.
>>
>> > On Thu, Jan 24, 2019 at 8:34 AM Robert Bradshaw  
>> > wrote:
>> >>
>> >> On Thu, Jan 24, 2019 at 5:08 PM Thomas Weise  wrote:
>> >> >
>> >> > Exciting to see the cross-language train gathering steam :)
>> >> >
>> >> > It may be useful to flesh out the user facing aspects a bit more before 
>> >> > going too deep on the service / expansion side or maybe that was done 
>> >> > elsewhere?
>> >>
>> >> It's been discussed, but no resolution yet.
>> >>
>> >> > A few examples (of varying complexity) of how the shim/proxy transforms 
>> >> > would look like in the other SDKs. Perhaps Java KafkaIO in Python and 
>> >> > Go would be a good candidate?
>> >>
>> >> The core implementation would, almost by definition, be
>> >>
>> >> input.apply(ExternalTransform(URN, payload, service_address).
>> >>
>> >> Nicer shims would just be composite transforms that call this, filling
>> >> in the URNs, payloads, and possibly service details from more
>> >> user-friendly parameters.
>> >>
>> >> > One problem we discovered with custom Flink native transforms for 
>> >> > Python was handling of lambdas / functions. An example could be a user 
>> >> > defined watermark timestamp extractor that the user should be able to 
>> >> > supply in Python and the JVM cannot handle.
>> >>
>> >> Yes, this has never been resolved satisfactorily. For now, if UDFs can
>> >> be reified in terms of a commonly-understood URN + payload, it'll
>> >> work. A transform could provide a wide range of "useful" URNs for its
>> >> internal callbacks, more than that would require significant design if
>> >> it can't be pre- or post-fixed.
>> >>
>> >> > On Wed, Jan 23, 2019 at 7:04 PM Chamikara Jayalath 
>> >> >  wrote:
>> >> >>
>> >> >>
>> >> >>
>> >> >> On Wed, Jan 23, 2019 at 1:03 PM Robert Bradshaw  
>> >> >> wrote:
>> >> >>>
>> >> >>> On Wed, Jan 23, 2019 at 6:38 PM Maximilian Michels  
>> >> >>> wrote:
>> >> >>> >
>> >> >>> > Thank you for starting on the cross-language feature Robert!
>> >> >>> >
>> >> >>> > Just to recap: Each SDK runs an ExpansionService which can be 
>> >> >>> > contacted during
>> >> >>> > pipeline translation to expand transforms that are unknown to the 
>> >> >>> > SDK. The
>> >> >>> > service returns the Proto definitions to the querying process.
>> >> >>>
>> >> >>> Yep. Technically it doesn't have to be the SDK, or even if it is there
>> >> >>> may be a variety of services (e.g. one offering SQL, one offering
>> >> >>> different IOs).
>> >> >>>
>> >> >>> > There will be multiple environments such that during execution 
>> >> >>> > cross-language
>> >> >>> > pipelines select the appropriate environment for a transform.
>> >> >>>
>> >> >>> Exactly. And fuses only those steps with compatible 

Re: compileJava broken on master see: BEAM-6495

2019-01-24 Thread Ryan Williams
I'm seeing every ≈third `./gradlew compileJava` fail locally due to this;
re-running the commit has always succeeded, so far.

Sounds like there is not an immediate fix in the works / no one assigned on
the JIRA?

On Wed, Jan 23, 2019 at 3:17 PM Kenneth Knowles  wrote:

> This might connect to vendoring Calcite. It will be easiest, and have the
> best incremental build, if we separate the generated code into its own
> module that has relocation to match the vendored Calcite.
>
> Kenn
>
> On Wed, Jan 23, 2019 at 11:29 AM Anton Kedin  wrote:
>
>> We don't pre-generate the code as a separate step. Code gen from the SQL
>> parser syntax spec and its compilation happens both during the Beam SQL
>> build task. Splitting the code generation and compilation might not be
>> trivial. We definitely should look into fixing this though.
>>
>> Regards,
>> Anton
>>
>> On Wed, Jan 23, 2019 at 11:13 AM Alex Amato  wrote:
>>
>>> Okay, make sense perhaps we can somehow make it fail when it fails to
>>> generate the dep, rather than when compiling the java code later on
>>>
>>> On Wed, Jan 23, 2019 at 11:12 AM Anton Kedin  wrote:
>>>
 ParserImpl is autogenerated by Calcite at build time. It seems that
 there's a race condition there and it sometimes fails. Rerunning the build
 works for me.

 Regards,
 Anton

 On Wed, Jan 23, 2019, 11:06 AM Alex Amato  wrote:

> https://jira.apache.org/jira/browse/BEAM-6495?filter=-2
>
> Any ideas, how this got through the precommit?
>
> > Task :beam-sdks-java-extensions-sql:compileJava FAILED
>
> /usr/local/google/home/ajamato/go/src/
> github.com/apache/beam/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/JdbcFactory.java:29:
> error: cannot find symbol
>
> import
> org.apache.beam.sdk.extensions.sql.impl.parser.impl.BeamSqlParserImpl;
>
>   ^
>
>   symbol:   class BeamSqlParserImpl
>
>   location: package org.apache.beam.sdk.extensions.sql.impl.parser.impl
>
> 1 error
>
>


Re: Cross-language pipelines

2019-01-24 Thread Reuven Lax
On Thu, Jan 24, 2019 at 2:38 PM Robert Bradshaw  wrote:

> On Thu, Jan 24, 2019 at 6:43 PM Reuven Lax  wrote:
> >
> > Keep in mind that these user-supplied lambdas are commonly used in our
> IOs. One common usage is in Sink IOs, to allow dynamic destinations. e.g.
> in BigQueryIO.Write, a user-supplied lambda determines what table a record
> should be written to.
>
> This can probably be pre-computed upstream (as part of the wrapping
> composite that does take a language-native lamdba) and placed in a
> standard format (e.g. a tuple or other schema) to be extracted by the
> "core" sink.
>

I'm not quite sure what you mean. How will you express a lambda as a tuple?
Or are you suggesting that we preapply all the lambdas and pass the result
down? That might work, but would be _far_ more expensive. The result of the
lambda is sometimes must larger than the input (e.g. the result could be a
fully-qualified  output location string), so these IOs try and delay
application as much as possible; as a result, the actual application is
often deep inside the graph.


> > Given that IOs are one of the big selling points of cross-language
> support, we should think about how we can support this functionality.
>
> Yes. There are user-supplied lambdas that can't be as easily pre- or
> post-applied, and though we had some brainstorming sessions (~ a year
> ago) we're far from a (good) answer to that.
>
> > On Thu, Jan 24, 2019 at 8:34 AM Robert Bradshaw 
> wrote:
> >>
> >> On Thu, Jan 24, 2019 at 5:08 PM Thomas Weise  wrote:
> >> >
> >> > Exciting to see the cross-language train gathering steam :)
> >> >
> >> > It may be useful to flesh out the user facing aspects a bit more
> before going too deep on the service / expansion side or maybe that was
> done elsewhere?
> >>
> >> It's been discussed, but no resolution yet.
> >>
> >> > A few examples (of varying complexity) of how the shim/proxy
> transforms would look like in the other SDKs. Perhaps Java KafkaIO in
> Python and Go would be a good candidate?
> >>
> >> The core implementation would, almost by definition, be
> >>
> >> input.apply(ExternalTransform(URN, payload, service_address).
> >>
> >> Nicer shims would just be composite transforms that call this, filling
> >> in the URNs, payloads, and possibly service details from more
> >> user-friendly parameters.
> >>
> >> > One problem we discovered with custom Flink native transforms for
> Python was handling of lambdas / functions. An example could be a user
> defined watermark timestamp extractor that the user should be able to
> supply in Python and the JVM cannot handle.
> >>
> >> Yes, this has never been resolved satisfactorily. For now, if UDFs can
> >> be reified in terms of a commonly-understood URN + payload, it'll
> >> work. A transform could provide a wide range of "useful" URNs for its
> >> internal callbacks, more than that would require significant design if
> >> it can't be pre- or post-fixed.
> >>
> >> > On Wed, Jan 23, 2019 at 7:04 PM Chamikara Jayalath <
> chamik...@google.com> wrote:
> >> >>
> >> >>
> >> >>
> >> >> On Wed, Jan 23, 2019 at 1:03 PM Robert Bradshaw 
> wrote:
> >> >>>
> >> >>> On Wed, Jan 23, 2019 at 6:38 PM Maximilian Michels 
> wrote:
> >> >>> >
> >> >>> > Thank you for starting on the cross-language feature Robert!
> >> >>> >
> >> >>> > Just to recap: Each SDK runs an ExpansionService which can be
> contacted during
> >> >>> > pipeline translation to expand transforms that are unknown to the
> SDK. The
> >> >>> > service returns the Proto definitions to the querying process.
> >> >>>
> >> >>> Yep. Technically it doesn't have to be the SDK, or even if it is
> there
> >> >>> may be a variety of services (e.g. one offering SQL, one offering
> >> >>> different IOs).
> >> >>>
> >> >>> > There will be multiple environments such that during execution
> cross-language
> >> >>> > pipelines select the appropriate environment for a transform.
> >> >>>
> >> >>> Exactly. And fuses only those steps with compatible environments
> together.
> >> >>>
> >> >>> > It's not clear to me, should the expansion happen during pipeline
> construction
> >> >>> > or during translation by the Runner?
> >> >>>
> >> >>> I think it need to happen as part of construction because the set of
> >> >>> outputs (and their properties) can be dynamic based on the
> expansion.
> >> >>
> >> >>
> >> >> Also, without expansion at pipeline construction, we'll have to
> define all composite cross-language transforms as runner-native transforms
> which won't be practical ?
> >> >>
> >> >>>
> >> >>>
> >> >>> > Thanks,
> >> >>> > Max
> >> >>> >
> >> >>> > On 23.01.19 04:12, Robert Bradshaw wrote:
> >> >>> > > No, this PR simply takes an endpoint address as a parameter,
> expecting
> >> >>> > > it to already be up and available. More convenient APIs, e.g.
> ones
> >> >>> > > that spin up and endpoint and tear it down, or catalog and
> locate code
> >> >>> > > and services offering these endpoints, could be provided as
> wrappers
> 

Replacing visteg plugin with task-tree for Gradle task visualization

2019-01-24 Thread Scott Wegner
I wanted to give a heads-up that PR#7615 [1] replaces the 'visteg' Gradle
plugin used in our build to produce .dot-file representation of the task
build graph. The plugin [2] appears to be abandoned and is not compatible
with Gradle 5.0 [3]

The PR replaces it with a different plugin, 'com.dorongold.task-tree' [4],
which provides similar functionality. Unfortunately, it doesn't have the
ability to produce a .dot-file, but instead outputs an ASCII tree of build
tasks on the command-line.

This functionality is useful for debugging build issues. Once the change is
merged I will update the wiki [5] with usage instructions.

[1] https://github.com/apache/beam/pull/7615
[2] https://github.com/mmalohlava/gradle-visteg
[3] https://github.com/mmalohlava/gradle-visteg/issues/12
[4] https://github.com/dorongold/gradle-task-tree
[5]
https://cwiki.apache.org/confluence/display/BEAM/Gradle+Tips#GradleTips-VisualizingTaskDependencies

-- 


Got feedback? tinyurl.com/swegner-feedback


Re: [DISCUSSION] ParDo Async Java API

2019-01-24 Thread Robert Bradshaw
That's a good point that this "IO" time should be tracked differently.

For a single level, a wrapper/utility that correctly and completely
(and transparently) implements the "naive" bit I sketched above under
the hood may be sufficient and implementable purely in user-space, and
quite useful.

On Thu, Jan 24, 2019 at 7:38 PM Scott Wegner  wrote:
>
> Makes sense to me. We should make it easier to write DoFn's in this pattern 
> that has emerged as common among I/O connectors.
>
> Enabling asynchronous task chaining across a fusion tree is more complicated 
> but not necessary for this scenario.
>
> On Thu, Jan 24, 2019 at 10:13 AM Steve Niemitz  wrote:
>>
>> It's also important to note that in many (most?) IO frameworks (gRPC, 
>> finagle, etc), asynchronous IO is typically completely non-blocking, so 
>> there generally won't be a large number of threads waiting for IO to 
>> complete.  (netty uses a small pool of threads for the Event Loop Group for 
>> example).
>>
>> But in general I agree with Reuven, runners should not count threads in use 
>> in other thread pools for IO for the purpose of autoscaling (or most kinds 
>> of accounting).
>>
>> On Thu, Jan 24, 2019 at 12:54 PM Reuven Lax  wrote:
>>>
>>> As Steve said, the main rationale for this is so that asynchronous IOs (or 
>>> in general, asynchronous remote calls) call be made. To some degree this 
>>> addresses Scott's concern: the asynchronous threads should be, for the most 
>>> part, simply waiting for IOs to complete; the reason to do the waiting 
>>> asynchronously is so that the main threadpool does not become blocked, 
>>> causing the pipeline to become IO bound. A runner like Dataflow should not 
>>> be tracking these threads for the purpose of autoscaling, as adding more 
>>> workers will (usually) not cause these calls to complete any faster.
>>>
>>> Reuven
>>>
>>> On Thu, Jan 24, 2019 at 7:28 AM Steve Niemitz  wrote:

 I think I agree with a lot of what you said here, I'm just going to 
 restate my initial use-case to try to make it more clear as well.

 From my usage of beam, I feel like the big benefit of async DoFns would be 
 to allow batched IO to be implemented more simply inside a DoFn.  Even in 
 the Beam SDK itself, there are a lot of IOs that batch up IO operations in 
 ProcessElement and wait for them to complete in FinishBundle ([1][2], 
 etc).  From my experience, things like error handling, emitting outputs as 
 the result of an asynchronous operation completing (in the correct window, 
 with the correct timestamp, etc) get pretty tricky, and it would be great 
 for the SDK to provide support natively for it.

 It's also probably good to point out that really only DoFns that do IO 
 should be asynchronous, normal CPU bound DoFns have no reason to be 
 asynchronous.

 A really good example of this is an IO I had written recently for 
 Bigtable, it takes an input PCollection of ByteStrings representing row 
 keys, and returns a PCollection of the row data from bigtable.  Naively 
 this could be implemented by simply blocking on the Bigtable read inside 
 the ParDo, however this would limit throughput substantially (even 
 assuming an avg read latency is 1ms, thats still only 1000 QPS / instance 
 of the ParDo).  My implementation batches many reads together (as they 
 arrive at the DoFn), executes them once the batch is big enough (or some 
 time passes), and then emits them once the batch read completes.  Emitting 
 them in the correct window and handling errors gets tricky, so this is 
 certainly something I'd love the framework itself to handle.

 I also don't see a big benefit of making a DoFn receive a future, if all a 
 user is ever supposed to do is attach a continuation to it, that could 
 just as easily be done by the runner itself, basically just invoking the 
 entire ParDo as a continuation on the future (which then assumes the 
 runner is even representing these tasks as futures internally).

 Making the DoFn itself actually return a future could be an option, even 
 if the language itself doesn't support something like `await`, you could 
 still implement it yourself in the DoFn, however, it seems like it'd be a 
 strange contrast to the non-async version, which returns void.

 [1] 
 https://github.com/apache/beam/blob/428dbaf2465292e9e53affd094e4b38cc1d754e5/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java#L720
 [2] 
 https://github.com/apache/beam/blob/428dbaf2465292e9e53affd094e4b38cc1d754e5/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java#L1080


 On Thu, Jan 24, 2019 at 8:43 AM Robert Bradshaw  
 wrote:
>
> If I understand correctly, the end goal is to process input elements
> of a DoFn asynchronously. 

Re: Cross-language pipelines

2019-01-24 Thread Robert Bradshaw
On Thu, Jan 24, 2019 at 6:43 PM Reuven Lax  wrote:
>
> Keep in mind that these user-supplied lambdas are commonly used in our IOs. 
> One common usage is in Sink IOs, to allow dynamic destinations. e.g. in 
> BigQueryIO.Write, a user-supplied lambda determines what table a record 
> should be written to.

This can probably be pre-computed upstream (as part of the wrapping
composite that does take a language-native lamdba) and placed in a
standard format (e.g. a tuple or other schema) to be extracted by the
"core" sink.

> Given that IOs are one of the big selling points of cross-language support, 
> we should think about how we can support this functionality.

Yes. There are user-supplied lambdas that can't be as easily pre- or
post-applied, and though we had some brainstorming sessions (~ a year
ago) we're far from a (good) answer to that.

> On Thu, Jan 24, 2019 at 8:34 AM Robert Bradshaw  wrote:
>>
>> On Thu, Jan 24, 2019 at 5:08 PM Thomas Weise  wrote:
>> >
>> > Exciting to see the cross-language train gathering steam :)
>> >
>> > It may be useful to flesh out the user facing aspects a bit more before 
>> > going too deep on the service / expansion side or maybe that was done 
>> > elsewhere?
>>
>> It's been discussed, but no resolution yet.
>>
>> > A few examples (of varying complexity) of how the shim/proxy transforms 
>> > would look like in the other SDKs. Perhaps Java KafkaIO in Python and Go 
>> > would be a good candidate?
>>
>> The core implementation would, almost by definition, be
>>
>> input.apply(ExternalTransform(URN, payload, service_address).
>>
>> Nicer shims would just be composite transforms that call this, filling
>> in the URNs, payloads, and possibly service details from more
>> user-friendly parameters.
>>
>> > One problem we discovered with custom Flink native transforms for Python 
>> > was handling of lambdas / functions. An example could be a user defined 
>> > watermark timestamp extractor that the user should be able to supply in 
>> > Python and the JVM cannot handle.
>>
>> Yes, this has never been resolved satisfactorily. For now, if UDFs can
>> be reified in terms of a commonly-understood URN + payload, it'll
>> work. A transform could provide a wide range of "useful" URNs for its
>> internal callbacks, more than that would require significant design if
>> it can't be pre- or post-fixed.
>>
>> > On Wed, Jan 23, 2019 at 7:04 PM Chamikara Jayalath  
>> > wrote:
>> >>
>> >>
>> >>
>> >> On Wed, Jan 23, 2019 at 1:03 PM Robert Bradshaw  
>> >> wrote:
>> >>>
>> >>> On Wed, Jan 23, 2019 at 6:38 PM Maximilian Michels  
>> >>> wrote:
>> >>> >
>> >>> > Thank you for starting on the cross-language feature Robert!
>> >>> >
>> >>> > Just to recap: Each SDK runs an ExpansionService which can be 
>> >>> > contacted during
>> >>> > pipeline translation to expand transforms that are unknown to the SDK. 
>> >>> > The
>> >>> > service returns the Proto definitions to the querying process.
>> >>>
>> >>> Yep. Technically it doesn't have to be the SDK, or even if it is there
>> >>> may be a variety of services (e.g. one offering SQL, one offering
>> >>> different IOs).
>> >>>
>> >>> > There will be multiple environments such that during execution 
>> >>> > cross-language
>> >>> > pipelines select the appropriate environment for a transform.
>> >>>
>> >>> Exactly. And fuses only those steps with compatible environments 
>> >>> together.
>> >>>
>> >>> > It's not clear to me, should the expansion happen during pipeline 
>> >>> > construction
>> >>> > or during translation by the Runner?
>> >>>
>> >>> I think it need to happen as part of construction because the set of
>> >>> outputs (and their properties) can be dynamic based on the expansion.
>> >>
>> >>
>> >> Also, without expansion at pipeline construction, we'll have to define 
>> >> all composite cross-language transforms as runner-native transforms which 
>> >> won't be practical ?
>> >>
>> >>>
>> >>>
>> >>> > Thanks,
>> >>> > Max
>> >>> >
>> >>> > On 23.01.19 04:12, Robert Bradshaw wrote:
>> >>> > > No, this PR simply takes an endpoint address as a parameter, 
>> >>> > > expecting
>> >>> > > it to already be up and available. More convenient APIs, e.g. ones
>> >>> > > that spin up and endpoint and tear it down, or catalog and locate 
>> >>> > > code
>> >>> > > and services offering these endpoints, could be provided as wrappers
>> >>> > > on top of or extensions of this.
>> >>> > >
>> >>> > > On Wed, Jan 23, 2019 at 12:19 AM Kenneth Knowles  
>> >>> > > wrote:
>> >>> > >>
>> >>> > >> Nice! If I recall correctly, there was mostly concern about how to 
>> >>> > >> launch and manage the expansion service (Docker? Vendor-specific? 
>> >>> > >> Etc). Does this PR a position on that question?
>> >>> > >>
>> >>> > >> Kenn
>> >>> > >>
>> >>> > >> On Tue, Jan 22, 2019 at 1:44 PM Chamikara Jayalath 
>> >>> > >>  wrote:
>> >>> > >>>
>> >>> > >>>
>> >>> > >>>
>> >>> > >>> On Tue, Jan 22, 2019 at 11:35 AM Udi Meiri  
>> >>> > >>> wrote:
>> >>> > 

Re: contributor permission for Beam Jira tickets

2019-01-24 Thread Ismaël Mejía
  You have the permissions, please self assign the issue and welcome!

On Thu, Jan 24, 2019 at 5:22 PM Robert Collins
 wrote:
>
> Hi,
>
> This is Robert from Fluidly. We are setting up a streaming pipeline which 
> outputs to Redis. Can someone add me as a contributor for Beam's Jira issue 
> tracker? I'd like submit a pull request for an issue we had with flushing 
> bundles.
>
> My Jira username is rob...@fluidly.com
>
> Thanks
>
> Robert


Re: Jenkins seed job fails

2019-01-24 Thread Scott Wegner
job_PreCommit_Python_ValidatesRunner_Flink.groovy has [1]:

   previousNames('beam_PostCommit_Python_VR_Flink')

This is used by Jenkin to rename an existing job to the new name in order
to retain history. In this case, both old a new Job names already exist, so
it's failing:

* https://builds.apache.org/job/beam_PreCommit_Python_PVR_Flink_Cron/
* https://builds.apache.org/job/beam_PostCommit_Python_VR_Flink/

The jobs got into a similar mess before [2], and I believe it's due to
having two different SeedJobs producing the same jobs.

To unblock this case, I suggest manually
deleting beam_PostCommit_Python_VR_Flink, as it appears the job history has
already been migrated to _Cron. We should also consolidate the Seed Job
functionality to a single job to prevent this from happening again.

Note: I don't think it's correct to use the 'previousNames' feature with
PrecommitJobBuilder as it creates three variants of the same job (normal,
'_PR', '_Cron'); I'm not sure how Jenkins would handle the renaming;
perhaps just use the first one.

[1]
https://github.com/apache/beam/blob/98952d20b5ad5e5cb74d53c08423429b20eb6523/.test-infra/jenkins/job_PreCommit_Python_ValidatesRunner_Flink.groovy#L35
[2] https://issues.apache.org/jira/browse/BEAM-6250

On Thu, Jan 24, 2019 at 12:09 PM Kenneth Knowles  wrote:

> Jenkins has no understanding of this. Those are just different job names
> that our scripts create. Is it a bug in PreCommitJobBuilder?
>
> Kenn
>
> On Thu, Jan 24, 2019 at 11:22 AM Ankur Goenka  wrote:
>
>> It seems that the cron is not deleted if the job is deleted.
>> I ran the seed job manually to test the new job which would have created
>> the cron.
>>
>> I am not aware of jenkins internal but it will be great if we can clean
>> up the cron on job deletion.
>>
>> On Thu, Jan 24, 2019 at 7:40 AM Thomas Weise  wrote:
>>
>>> Processing DSL script job_PreCommit_Python.groovy
>>> Processing DSL script job_PreCommit_Python_ValidatesRunner_Flink.groovy
>>> java.lang.IllegalArgumentException: beam_PreCommit_Python_PVR_Flink_Cron 
>>> already exists
>>> at hudson.model.Items.verifyItemDoesNotAlreadyExist(Items.java:641)
>>> at hudson.model.AbstractItem.renameTo(AbstractItem.java:372)
>>> at hudson.model.Job.renameTo(Job.java:653)
>>>
>>>
>>> Can someone with perms please cleanup old jobs?
>>>
>>> Thanks!
>>>
>>>

-- 




Got feedback? tinyurl.com/swegner-feedback


Re: [PROPOSAL] Prepare Beam 2.10.0 release

2019-01-24 Thread Kenneth Knowles
My mistake - the problem was that the Dataflow container image was not yet
built, but I did not dig deep enough into StackDriver to find that message.

All issues are resolved and all PRs against the release branch are now
merged. I am now in process of building those containers and RC1.

Kenn

On Wed, Jan 23, 2019 at 5:25 PM Ahmet Altay  wrote:

> Python precommit cron job (
> https://builds.apache.org/view/A-D/view/Beam/view/PostCommit/job/beam_PreCommit_Python_Cron/)
> seems to be healthy. Could you share one of the PRs that you noticed the
> flakiness, I would be interested in debugging.
>
> On Wed, Jan 23, 2019 at 12:25 PM Kenneth Knowles  wrote:
>
>> I've seen many PRs that I am reviewing with flakiness problems in the
>> python precommit IT. Anyone have any insights?
>>
>> Kenn
>>
>> On Wed, Jan 23, 2019 at 12:23 PM Kenneth Knowles  wrote:
>>
>>> Thanks for the reminder. I was reading from
>>> https://issues.apache.org/jira/projects/BEAM/versions/12344540. RC1
>>> also gated on clearing
>>> https://github.com/apache/beam/pulls?utf8=%E2%9C%93=is%3Apr+is%3Aopen+base%3Arelease-2.10.0
>>> .
>>>
>>> Kenn
>>>
>>> On Wed, Jan 23, 2019 at 9:44 AM Maximilian Michels 
>>> wrote:
>>>
 What about the revert of "Parse SDK-unknown pipeline options"?
 https://github.com/apache/beam/pull/7564

 Should we merge this for the release?

 On 23.01.19 11:56, Scott Wegner wrote:
 > Cherry-pick PR for the last-remaining issue:
 > https://github.com/apache/beam/pull/7603
 >
 > On Wed, Jan 23, 2019 at 7:15 AM Kenneth Knowles >>> > > wrote:
 >
 > The last remaining issue is split into a non-blocker and a revert
 that is
 > confirmed to fix the issue.
 >
 > Once https://github.com/apache/beam/pull/7600 is merged and
 cherry-picked, I
 > will cut RC1 today.
 >
 > Kenn
 >
 > On Tue, Jan 22, 2019 at 6:03 PM Kenneth Knowles >>> > > wrote:
 >
 > OK. There is just one release blocker remaining;
 > https://issues.apache.org/jira/browse/BEAM-6354
 >
 > I have no insights yet, but I am bisecting. It was healthy in
 2.9.0.
 >
 > Kenn
 >
 > On Tue, Jan 22, 2019 at 9:38 AM Scott Wegner <
 sweg...@google.com
 > > wrote:
 >
 > The rollback for BEAM-6352 is now in and cherry-picked
 into the
 > release branch.
 >
 > On Fri, Jan 18, 2019 at 9:04 AM Scott Wegner <
 sc...@apache.org
 > > wrote:
 >
 > For BEAM-6352, I have a rollback ready for review:
 > https://github.com/apache/beam/pull/7540
 > Conversation about the decision to rollback vs.
 roll-forward for
 > this change is on the JIRA issue.
 >
 > On Fri, Jan 18, 2019 at 8:22 AM Maximilian Michels
 > mailto:m...@apache.org>> wrote:
 >
 > I've created the revert for the pipeline options
 parsing
 > which we agreed on:
 > https://github.com/apache/beam/pull/7564
 >
 > On 17.01.19 15:16, Maximilian Michels wrote:
 >  > An issue with the Flink Runner when restarting
 streaming
 > pipelines:
 >  > https://jira.apache.org/jira/browse/BEAM-6460
 >  >
 >  > Looks like it will be easy to fix by
 invalidating the
 > Jackson cache.
 >  >
 >  > -Max
 >  >
 >  > On 16.01.19 23:00, Kenneth Knowles wrote:
 >  >> Quick update on this. There are three
 remaining issues:
 >  >>
 >  >>   -
 https://issues.apache.org/jira/browse/BEAM-6407: A
 > DirectRunner self-check
 >  >> was broken from 2.8.0 to 2.9.0 - PR looks
 good modulo
 > our infra flakes
 >  >>   -
 https://issues.apache.org/jira/browse/BEAM-6354:
 > PAssert + DirectRunner +
 >  >> Unbounded data busted? Investigation not
 started
 >  >>   -
 https://issues.apache.org/jira/browse/BEAM-6352:
 > Watch was broken from
 >  >> 2.8.0 to 2.9.0 - will rollback if no forward
 fix by the
 > time everything else
 >  >> is resolved
 >  >>
 >  >> Kenn
 >  >>
 > 

Re: Jenkins seed job fails

2019-01-24 Thread Kenneth Knowles
Jenkins has no understanding of this. Those are just different job names
that our scripts create. Is it a bug in PreCommitJobBuilder?

Kenn

On Thu, Jan 24, 2019 at 11:22 AM Ankur Goenka  wrote:

> It seems that the cron is not deleted if the job is deleted.
> I ran the seed job manually to test the new job which would have created
> the cron.
>
> I am not aware of jenkins internal but it will be great if we can clean up
> the cron on job deletion.
>
> On Thu, Jan 24, 2019 at 7:40 AM Thomas Weise  wrote:
>
>> Processing DSL script job_PreCommit_Python.groovy
>> Processing DSL script job_PreCommit_Python_ValidatesRunner_Flink.groovy
>> java.lang.IllegalArgumentException: beam_PreCommit_Python_PVR_Flink_Cron 
>> already exists
>>  at hudson.model.Items.verifyItemDoesNotAlreadyExist(Items.java:641)
>>  at hudson.model.AbstractItem.renameTo(AbstractItem.java:372)
>>  at hudson.model.Job.renameTo(Job.java:653)
>>
>>
>> Can someone with perms please cleanup old jobs?
>>
>> Thanks!
>>
>>


Re: Jenkins seed job fails

2019-01-24 Thread Ankur Goenka
It seems that the cron is not deleted if the job is deleted.
I ran the seed job manually to test the new job which would have created
the cron.

I am not aware of jenkins internal but it will be great if we can clean up
the cron on job deletion.

On Thu, Jan 24, 2019 at 7:40 AM Thomas Weise  wrote:

> Processing DSL script job_PreCommit_Python.groovy
> Processing DSL script job_PreCommit_Python_ValidatesRunner_Flink.groovy
> java.lang.IllegalArgumentException: beam_PreCommit_Python_PVR_Flink_Cron 
> already exists
>   at hudson.model.Items.verifyItemDoesNotAlreadyExist(Items.java:641)
>   at hudson.model.AbstractItem.renameTo(AbstractItem.java:372)
>   at hudson.model.Job.renameTo(Job.java:653)
>
>
> Can someone with perms please cleanup old jobs?
>
> Thanks!
>
>


Re: [DISCUSSION] ParDo Async Java API

2019-01-24 Thread Scott Wegner
Makes sense to me. We should make it easier to write DoFn's in this pattern
that has emerged as common among I/O connectors.

Enabling asynchronous task chaining across a fusion tree is more
complicated but not necessary for this scenario.

On Thu, Jan 24, 2019 at 10:13 AM Steve Niemitz  wrote:

> It's also important to note that in many (most?) IO frameworks (gRPC,
> finagle, etc), asynchronous IO is typically completely non-blocking, so
> there generally won't be a large number of threads waiting for IO to
> complete.  (netty uses a small pool of threads for the Event Loop Group for
> example).
>
> But in general I agree with Reuven, runners should not count threads in
> use in other thread pools for IO for the purpose of autoscaling (or most
> kinds of accounting).
>
> On Thu, Jan 24, 2019 at 12:54 PM Reuven Lax  wrote:
>
>> As Steve said, the main rationale for this is so that asynchronous IOs
>> (or in general, asynchronous remote calls) call be made. To some degree
>> this addresses Scott's concern: the asynchronous threads should be, for the
>> most part, simply waiting for IOs to complete; the reason to do the waiting
>> asynchronously is so that the main threadpool does not become blocked,
>> causing the pipeline to become IO bound. A runner like Dataflow should not
>> be tracking these threads for the purpose of autoscaling, as adding more
>> workers will (usually) not cause these calls to complete any faster.
>>
>> Reuven
>>
>> On Thu, Jan 24, 2019 at 7:28 AM Steve Niemitz 
>> wrote:
>>
>>> I think I agree with a lot of what you said here, I'm just going to
>>> restate my initial use-case to try to make it more clear as well.
>>>
>>> From my usage of beam, I feel like the big benefit of async DoFns would
>>> be to allow batched IO to be implemented more simply inside a DoFn.  Even
>>> in the Beam SDK itself, there are a lot of IOs that batch up IO operations
>>> in ProcessElement and wait for them to complete in FinishBundle ([1][2],
>>> etc).  From my experience, things like error handling, emitting outputs as
>>> the result of an asynchronous operation completing (in the correct window,
>>> with the correct timestamp, etc) get pretty tricky, and it would be great
>>> for the SDK to provide support natively for it.
>>>
>>> It's also probably good to point out that really only DoFns that do IO
>>> should be asynchronous, normal CPU bound DoFns have no reason to be
>>> asynchronous.
>>>
>>> A really good example of this is an IO I had written recently for
>>> Bigtable, it takes an input PCollection of ByteStrings representing row
>>> keys, and returns a PCollection of the row data from bigtable.  Naively
>>> this could be implemented by simply blocking on the Bigtable read inside
>>> the ParDo, however this would limit throughput substantially (even assuming
>>> an avg read latency is 1ms, thats still only 1000 QPS / instance of the
>>> ParDo).  My implementation batches many reads together (as they arrive at
>>> the DoFn), executes them once the batch is big enough (or some time
>>> passes), and then emits them once the batch read completes.  Emitting them
>>> in the correct window and handling errors gets tricky, so this is certainly
>>> something I'd love the framework itself to handle.
>>>
>>> I also don't see a big benefit of making a DoFn receive a future, if all
>>> a user is ever supposed to do is attach a continuation to it, that could
>>> just as easily be done by the runner itself, basically just invoking the
>>> entire ParDo as a continuation on the future (which then assumes the runner
>>> is even representing these tasks as futures internally).
>>>
>>> Making the DoFn itself actually return a future could be an option, even
>>> if the language itself doesn't support something like `await`, you could
>>> still implement it yourself in the DoFn, however, it seems like it'd be a
>>> strange contrast to the non-async version, which returns void.
>>>
>>> [1]
>>> https://github.com/apache/beam/blob/428dbaf2465292e9e53affd094e4b38cc1d754e5/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java#L720
>>> [2]
>>> https://github.com/apache/beam/blob/428dbaf2465292e9e53affd094e4b38cc1d754e5/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java#L1080
>>>
>>>
>>> On Thu, Jan 24, 2019 at 8:43 AM Robert Bradshaw 
>>> wrote:
>>>
 If I understand correctly, the end goal is to process input elements
 of a DoFn asynchronously. Were I to do this naively, I would implement
 DoFns that simply take and receive [Serializable?]CompletionStages as
 element types, followed by a DoFn that adds a callback to emit on
 completion (possibly via a queue to avoid being-on-the-wrong-thread
 issues) and whose finalize forces all completions. This would, of
 course, interact poorly with processing time tracking, fusion breaks,
 watermark tracking, counter attribution, window propagation, etc. so
 

Re: [DISCUSSION] ParDo Async Java API

2019-01-24 Thread Steve Niemitz
It's also important to note that in many (most?) IO frameworks (gRPC,
finagle, etc), asynchronous IO is typically completely non-blocking, so
there generally won't be a large number of threads waiting for IO to
complete.  (netty uses a small pool of threads for the Event Loop Group for
example).

But in general I agree with Reuven, runners should not count threads in use
in other thread pools for IO for the purpose of autoscaling (or most kinds
of accounting).

On Thu, Jan 24, 2019 at 12:54 PM Reuven Lax  wrote:

> As Steve said, the main rationale for this is so that asynchronous IOs (or
> in general, asynchronous remote calls) call be made. To some degree this
> addresses Scott's concern: the asynchronous threads should be, for the most
> part, simply waiting for IOs to complete; the reason to do the waiting
> asynchronously is so that the main threadpool does not become blocked,
> causing the pipeline to become IO bound. A runner like Dataflow should not
> be tracking these threads for the purpose of autoscaling, as adding more
> workers will (usually) not cause these calls to complete any faster.
>
> Reuven
>
> On Thu, Jan 24, 2019 at 7:28 AM Steve Niemitz  wrote:
>
>> I think I agree with a lot of what you said here, I'm just going to
>> restate my initial use-case to try to make it more clear as well.
>>
>> From my usage of beam, I feel like the big benefit of async DoFns would
>> be to allow batched IO to be implemented more simply inside a DoFn.  Even
>> in the Beam SDK itself, there are a lot of IOs that batch up IO operations
>> in ProcessElement and wait for them to complete in FinishBundle ([1][2],
>> etc).  From my experience, things like error handling, emitting outputs as
>> the result of an asynchronous operation completing (in the correct window,
>> with the correct timestamp, etc) get pretty tricky, and it would be great
>> for the SDK to provide support natively for it.
>>
>> It's also probably good to point out that really only DoFns that do IO
>> should be asynchronous, normal CPU bound DoFns have no reason to be
>> asynchronous.
>>
>> A really good example of this is an IO I had written recently for
>> Bigtable, it takes an input PCollection of ByteStrings representing row
>> keys, and returns a PCollection of the row data from bigtable.  Naively
>> this could be implemented by simply blocking on the Bigtable read inside
>> the ParDo, however this would limit throughput substantially (even assuming
>> an avg read latency is 1ms, thats still only 1000 QPS / instance of the
>> ParDo).  My implementation batches many reads together (as they arrive at
>> the DoFn), executes them once the batch is big enough (or some time
>> passes), and then emits them once the batch read completes.  Emitting them
>> in the correct window and handling errors gets tricky, so this is certainly
>> something I'd love the framework itself to handle.
>>
>> I also don't see a big benefit of making a DoFn receive a future, if all
>> a user is ever supposed to do is attach a continuation to it, that could
>> just as easily be done by the runner itself, basically just invoking the
>> entire ParDo as a continuation on the future (which then assumes the runner
>> is even representing these tasks as futures internally).
>>
>> Making the DoFn itself actually return a future could be an option, even
>> if the language itself doesn't support something like `await`, you could
>> still implement it yourself in the DoFn, however, it seems like it'd be a
>> strange contrast to the non-async version, which returns void.
>>
>> [1]
>> https://github.com/apache/beam/blob/428dbaf2465292e9e53affd094e4b38cc1d754e5/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java#L720
>> [2]
>> https://github.com/apache/beam/blob/428dbaf2465292e9e53affd094e4b38cc1d754e5/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java#L1080
>>
>>
>> On Thu, Jan 24, 2019 at 8:43 AM Robert Bradshaw 
>> wrote:
>>
>>> If I understand correctly, the end goal is to process input elements
>>> of a DoFn asynchronously. Were I to do this naively, I would implement
>>> DoFns that simply take and receive [Serializable?]CompletionStages as
>>> element types, followed by a DoFn that adds a callback to emit on
>>> completion (possibly via a queue to avoid being-on-the-wrong-thread
>>> issues) and whose finalize forces all completions. This would, of
>>> course, interact poorly with processing time tracking, fusion breaks,
>>> watermark tracking, counter attribution, window propagation, etc. so
>>> it is desirable to make it part of the system itself.
>>>
>>> Taking a OutputReceiver> seems like a decent
>>> API. The invoking of the downstream process could be chained onto
>>> this, with all the implicit tracking and tracing set up correctly.
>>> Taking a CompletionStage as input means a DoFn would not have to
>>> create its output CompletionStage ex nihilo and possibly allow for

Re: Precomit broken due to style violation. Why are failures getting past precommit?

2019-01-24 Thread Kenneth Knowles
I strongly support making flakes more painful to get the incentives right.
We can control the "blast radius" of precommit flakes by more focused test
suites. Postcommit they still need triage and deflake.

Kenn

On Thu, Jan 24, 2019 at 8:56 AM Scott Wegner  wrote:

> Yes, you're correct that PR#7571 [1] had a checkstyle violation when
> merged. I didn't notice the checkstyle failure and I hit the merge button.
> Sorry about that.
>
> Here's where I went wrong:
> * The precommits showed one failing: Java. GitHub shows the status as a
> green check or red X on the head commit for a PR [2].
> * Opening the Jenkins link [3], it shows "Test Result (1 failure/ +-0)",
> and clicking on that shows the failing test was testMatchWatchForNewFiles
> [4]
> * I recognized this as BEAM-6491 [5] and decided not to block the PR since
> it should be unrelated. So I hit merge.
>
> What I didn't do was click on the Gradle Build Scan link [6], which shows
> that :beam-runners-direct-java:checkstyleMain failed as well.
>
> I take the blame for letting this in, and I'll follow-up to make sure it
> gets fixed. I also think that this is an easy mistake to make. Some ideas
> to decrease the chances:
>
> a) Split out checkstyle and other static analysis as a separate pre-commit
> from running tests. Because Jenkins reports the test report separately and
> more prominently, it's easy to miss other failures. We already split out
> Spotless and Rat, which also provides the value of giving a faster signal
> on those checks.
>
> b) Have a stronger policy about not merging when tests are red. I just
> checked and this is actually the policy already [7], but exceptions are
> regularly made for flaky or unrelated test failures (evidence: each red X
> on the master commit history [8]). Right now it's up to a human to make the
> call on, and us humans are prone to make mistakes. We could consider
> enforcing the policy and configure GitHub to require all checks passing
> before merge. This will make flaky tests more painful, though it will also
> provide a stronger incentive to fix the flaky tests.
>
> [1] https://github.com/apache/beam/pull/7571
> [2] https://github.com/apache/beam/pull/7571/commits
> [3] https://builds.apache.org/job/beam_PreCommit_Java_Phrase/577/
> [4]
> https://builds.apache.org/job/beam_PreCommit_Java_Phrase/577/testReport/
> [5] https://jira.apache.org/jira/browse/BEAM-6491
> [6] https://scans.gradle.com/s/s3wdusaicauee
> [7] https://beam.apache.org/contribute/precommit-policies/#pull-requests
> [8] https://github.com/apache/beam/commits/master
>
> On Wed, Jan 23, 2019 at 5:39 PM Alex Amato  wrote:
>
>> See: https://issues.apache.org/jira/browse/BEAM-6500
>>
>> I think that this PR introduced the issue. Though I am not sure how to
>> read the test status. It looks like its marked with an X for the postcommit
>> status, but presumably the precommit was okay even though java precommit
>> appears to be broken now? Is there any explanation as to how this PR got
>> through?
>> https://github.com/apache/beam/pull/7571
>> 
>>
>> Today there have been numerous issues with presubmit. I would just like
>> to understand if there is some underlying issue going on here missing some
>> checks when we merge. Any ideas why this keeps occurring?
>>
>> Thanks
>> Alex
>>
>
>
> --
>
>
>
>
> Got feedback? tinyurl.com/swegner-feedback
>


Re: [DISCUSSION] ParDo Async Java API

2019-01-24 Thread Reuven Lax
As Steve said, the main rationale for this is so that asynchronous IOs (or
in general, asynchronous remote calls) call be made. To some degree this
addresses Scott's concern: the asynchronous threads should be, for the most
part, simply waiting for IOs to complete; the reason to do the waiting
asynchronously is so that the main threadpool does not become blocked,
causing the pipeline to become IO bound. A runner like Dataflow should not
be tracking these threads for the purpose of autoscaling, as adding more
workers will (usually) not cause these calls to complete any faster.

Reuven

On Thu, Jan 24, 2019 at 7:28 AM Steve Niemitz  wrote:

> I think I agree with a lot of what you said here, I'm just going to
> restate my initial use-case to try to make it more clear as well.
>
> From my usage of beam, I feel like the big benefit of async DoFns would be
> to allow batched IO to be implemented more simply inside a DoFn.  Even in
> the Beam SDK itself, there are a lot of IOs that batch up IO operations in
> ProcessElement and wait for them to complete in FinishBundle ([1][2],
> etc).  From my experience, things like error handling, emitting outputs as
> the result of an asynchronous operation completing (in the correct window,
> with the correct timestamp, etc) get pretty tricky, and it would be great
> for the SDK to provide support natively for it.
>
> It's also probably good to point out that really only DoFns that do IO
> should be asynchronous, normal CPU bound DoFns have no reason to be
> asynchronous.
>
> A really good example of this is an IO I had written recently for
> Bigtable, it takes an input PCollection of ByteStrings representing row
> keys, and returns a PCollection of the row data from bigtable.  Naively
> this could be implemented by simply blocking on the Bigtable read inside
> the ParDo, however this would limit throughput substantially (even assuming
> an avg read latency is 1ms, thats still only 1000 QPS / instance of the
> ParDo).  My implementation batches many reads together (as they arrive at
> the DoFn), executes them once the batch is big enough (or some time
> passes), and then emits them once the batch read completes.  Emitting them
> in the correct window and handling errors gets tricky, so this is certainly
> something I'd love the framework itself to handle.
>
> I also don't see a big benefit of making a DoFn receive a future, if all a
> user is ever supposed to do is attach a continuation to it, that could just
> as easily be done by the runner itself, basically just invoking the entire
> ParDo as a continuation on the future (which then assumes the runner is
> even representing these tasks as futures internally).
>
> Making the DoFn itself actually return a future could be an option, even
> if the language itself doesn't support something like `await`, you could
> still implement it yourself in the DoFn, however, it seems like it'd be a
> strange contrast to the non-async version, which returns void.
>
> [1]
> https://github.com/apache/beam/blob/428dbaf2465292e9e53affd094e4b38cc1d754e5/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java#L720
> [2]
> https://github.com/apache/beam/blob/428dbaf2465292e9e53affd094e4b38cc1d754e5/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java#L1080
>
>
> On Thu, Jan 24, 2019 at 8:43 AM Robert Bradshaw 
> wrote:
>
>> If I understand correctly, the end goal is to process input elements
>> of a DoFn asynchronously. Were I to do this naively, I would implement
>> DoFns that simply take and receive [Serializable?]CompletionStages as
>> element types, followed by a DoFn that adds a callback to emit on
>> completion (possibly via a queue to avoid being-on-the-wrong-thread
>> issues) and whose finalize forces all completions. This would, of
>> course, interact poorly with processing time tracking, fusion breaks,
>> watermark tracking, counter attribution, window propagation, etc. so
>> it is desirable to make it part of the system itself.
>>
>> Taking a OutputReceiver> seems like a decent
>> API. The invoking of the downstream process could be chained onto
>> this, with all the implicit tracking and tracing set up correctly.
>> Taking a CompletionStage as input means a DoFn would not have to
>> create its output CompletionStage ex nihilo and possibly allow for
>> better chaining (depending on the asynchronous APIs used).
>>
>> Even better might be to simply let the invocation of all
>> DoFn.process() methods be asynchronous, but as Java doesn't offer an
>> await primitive to relinquish control in the middle of a function body
>> this might be hard.
>>
>> I think for correctness, completion would have to be forced at the end
>> of each bundle. If your bundles are large enough, this may not be that
>> big of a deal. In this case you could also start executing subsequent
>> bundles while waiting for prior ones to complete.
>>
>>
>>
>>
>> On Wed, Jan 23, 2019 

Re: Cross-language pipelines

2019-01-24 Thread Reuven Lax
Keep in mind that these user-supplied lambdas are commonly used in our IOs.
One common usage is in Sink IOs, to allow dynamic destinations. e.g. in
BigQueryIO.Write, a user-supplied lambda determines what table a record
should be written to.

Given that IOs are one of the big selling points of cross-language support,
we should think about how we can support this functionality.

Reuven

On Thu, Jan 24, 2019 at 8:34 AM Robert Bradshaw  wrote:

> On Thu, Jan 24, 2019 at 5:08 PM Thomas Weise  wrote:
> >
> > Exciting to see the cross-language train gathering steam :)
> >
> > It may be useful to flesh out the user facing aspects a bit more before
> going too deep on the service / expansion side or maybe that was done
> elsewhere?
>
> It's been discussed, but no resolution yet.
>
> > A few examples (of varying complexity) of how the shim/proxy transforms
> would look like in the other SDKs. Perhaps Java KafkaIO in Python and Go
> would be a good candidate?
>
> The core implementation would, almost by definition, be
>
> input.apply(ExternalTransform(URN, payload, service_address).
>
> Nicer shims would just be composite transforms that call this, filling
> in the URNs, payloads, and possibly service details from more
> user-friendly parameters.
>
> > One problem we discovered with custom Flink native transforms for Python
> was handling of lambdas / functions. An example could be a user defined
> watermark timestamp extractor that the user should be able to supply in
> Python and the JVM cannot handle.
>
> Yes, this has never been resolved satisfactorily. For now, if UDFs can
> be reified in terms of a commonly-understood URN + payload, it'll
> work. A transform could provide a wide range of "useful" URNs for its
> internal callbacks, more than that would require significant design if
> it can't be pre- or post-fixed.
>
> > On Wed, Jan 23, 2019 at 7:04 PM Chamikara Jayalath 
> wrote:
> >>
> >>
> >>
> >> On Wed, Jan 23, 2019 at 1:03 PM Robert Bradshaw 
> wrote:
> >>>
> >>> On Wed, Jan 23, 2019 at 6:38 PM Maximilian Michels 
> wrote:
> >>> >
> >>> > Thank you for starting on the cross-language feature Robert!
> >>> >
> >>> > Just to recap: Each SDK runs an ExpansionService which can be
> contacted during
> >>> > pipeline translation to expand transforms that are unknown to the
> SDK. The
> >>> > service returns the Proto definitions to the querying process.
> >>>
> >>> Yep. Technically it doesn't have to be the SDK, or even if it is there
> >>> may be a variety of services (e.g. one offering SQL, one offering
> >>> different IOs).
> >>>
> >>> > There will be multiple environments such that during execution
> cross-language
> >>> > pipelines select the appropriate environment for a transform.
> >>>
> >>> Exactly. And fuses only those steps with compatible environments
> together.
> >>>
> >>> > It's not clear to me, should the expansion happen during pipeline
> construction
> >>> > or during translation by the Runner?
> >>>
> >>> I think it need to happen as part of construction because the set of
> >>> outputs (and their properties) can be dynamic based on the expansion.
> >>
> >>
> >> Also, without expansion at pipeline construction, we'll have to define
> all composite cross-language transforms as runner-native transforms which
> won't be practical ?
> >>
> >>>
> >>>
> >>> > Thanks,
> >>> > Max
> >>> >
> >>> > On 23.01.19 04:12, Robert Bradshaw wrote:
> >>> > > No, this PR simply takes an endpoint address as a parameter,
> expecting
> >>> > > it to already be up and available. More convenient APIs, e.g. ones
> >>> > > that spin up and endpoint and tear it down, or catalog and locate
> code
> >>> > > and services offering these endpoints, could be provided as
> wrappers
> >>> > > on top of or extensions of this.
> >>> > >
> >>> > > On Wed, Jan 23, 2019 at 12:19 AM Kenneth Knowles 
> wrote:
> >>> > >>
> >>> > >> Nice! If I recall correctly, there was mostly concern about how
> to launch and manage the expansion service (Docker? Vendor-specific? Etc).
> Does this PR a position on that question?
> >>> > >>
> >>> > >> Kenn
> >>> > >>
> >>> > >> On Tue, Jan 22, 2019 at 1:44 PM Chamikara Jayalath <
> chamik...@google.com> wrote:
> >>> > >>>
> >>> > >>>
> >>> > >>>
> >>> > >>> On Tue, Jan 22, 2019 at 11:35 AM Udi Meiri 
> wrote:
> >>> > 
> >>> >  Also debugability: collecting logs from each of these systems.
> >>> > >>>
> >>> > >>>
> >>> > >>> Agree.
> >>> > >>>
> >>> > 
> >>> > 
> >>> >  On Tue, Jan 22, 2019 at 10:53 AM Chamikara Jayalath <
> chamik...@google.com> wrote:
> >>> > >
> >>> > > Thanks Robert.
> >>> > >
> >>> > > On Tue, Jan 22, 2019 at 4:39 AM Robert Bradshaw <
> rober...@google.com> wrote:
> >>> > >>
> >>> > >> Now that we have the FnAPI, I started playing around with
> support for
> >>> > >> cross-language pipelines. This will allow things like IOs to
> be shared
> >>> > >> across all languages, SQL to be invoked from non-Java, TFX
> tensorflow
> >>> 

Re: ContainerLaunchException in precommit [BEAM-6497]

2019-01-24 Thread Alex Amato
I have just seen it randomly occur on presubmits. So I don't have a
reliable repro, unfortunately.
It may be a specific environmental issue to the beam1 machine the tests ran
on?
https://builds.apache.org/job/beam_PreCommit_Java_Commit/3722/


On Thu, Jan 24, 2019 at 8:16 AM Gleb Kanterov  wrote:

> I'm wondering if anybody can reproduce this issue. The build has failed
> once because testcontainers didn't pull docker image. If we use caching
> proxy for docker, it could be a reason for that. I didn't find any similar
> known issue in testcontainers fixed recently, but just in case, I bumped
> testcontainers to use never docker-java.
>
> https://github.com/apache/beam/pull/7610
>
> On Thu, Jan 24, 2019 at 12:27 AM Alex Amato  wrote:
>
>> Thank you Gleb, appreciate it.
>>
>> On Wed, Jan 23, 2019 at 2:40 PM Gleb Kanterov  wrote:
>>
>>> I'm looking into it. This image exists in docker hub [1], but for some
>>> reason, it wasn't picked up.
>>>
>>> [1] https://hub.docker.com/r/yandex/clickhouse-server/tags
>>>
>>> On Wed, Jan 23, 2019 at 10:01 PM Alex Amato  wrote:
>>>

1.
   See: BEAM-6497 
   1. This is also causing issues blocking precommits.
   2.
   Seems to be caused by this failure to locate the image. Are
  these stored somewhere or built by the build process? Any idea 
 why these
  are failing?

  Caused by: com.github.dockerjava.api.exception.NotFoundException: 
 {"message":"No such image: yandex/clickhouse-server:18.10.3"}




>>>
>>> --
>>> Cheers,
>>> Gleb
>>>
>>
>
> --
> Cheers,
> Gleb
>


Re: Precomit broken due to style violation. Why are failures getting past precommit?

2019-01-24 Thread Scott Wegner
Yes, you're correct that PR#7571 [1] had a checkstyle violation when
merged. I didn't notice the checkstyle failure and I hit the merge button.
Sorry about that.

Here's where I went wrong:
* The precommits showed one failing: Java. GitHub shows the status as a
green check or red X on the head commit for a PR [2].
* Opening the Jenkins link [3], it shows "Test Result (1 failure/ +-0)",
and clicking on that shows the failing test was testMatchWatchForNewFiles
[4]
* I recognized this as BEAM-6491 [5] and decided not to block the PR since
it should be unrelated. So I hit merge.

What I didn't do was click on the Gradle Build Scan link [6], which shows
that :beam-runners-direct-java:checkstyleMain failed as well.

I take the blame for letting this in, and I'll follow-up to make sure it
gets fixed. I also think that this is an easy mistake to make. Some ideas
to decrease the chances:

a) Split out checkstyle and other static analysis as a separate pre-commit
from running tests. Because Jenkins reports the test report separately and
more prominently, it's easy to miss other failures. We already split out
Spotless and Rat, which also provides the value of giving a faster signal
on those checks.

b) Have a stronger policy about not merging when tests are red. I just
checked and this is actually the policy already [7], but exceptions are
regularly made for flaky or unrelated test failures (evidence: each red X
on the master commit history [8]). Right now it's up to a human to make the
call on, and us humans are prone to make mistakes. We could consider
enforcing the policy and configure GitHub to require all checks passing
before merge. This will make flaky tests more painful, though it will also
provide a stronger incentive to fix the flaky tests.

[1] https://github.com/apache/beam/pull/7571
[2] https://github.com/apache/beam/pull/7571/commits
[3] https://builds.apache.org/job/beam_PreCommit_Java_Phrase/577/
[4] https://builds.apache.org/job/beam_PreCommit_Java_Phrase/577/testReport/
[5] https://jira.apache.org/jira/browse/BEAM-6491
[6] https://scans.gradle.com/s/s3wdusaicauee
[7] https://beam.apache.org/contribute/precommit-policies/#pull-requests
[8] https://github.com/apache/beam/commits/master

On Wed, Jan 23, 2019 at 5:39 PM Alex Amato  wrote:

> See: https://issues.apache.org/jira/browse/BEAM-6500
>
> I think that this PR introduced the issue. Though I am not sure how to
> read the test status. It looks like its marked with an X for the postcommit
> status, but presumably the precommit was okay even though java precommit
> appears to be broken now? Is there any explanation as to how this PR got
> through?
> https://github.com/apache/beam/pull/7571
> 
>
> Today there have been numerous issues with presubmit. I would just like to
> understand if there is some underlying issue going on here missing some
> checks when we merge. Any ideas why this keeps occurring?
>
> Thanks
> Alex
>


-- 




Got feedback? tinyurl.com/swegner-feedback


Re: Cross-language pipelines

2019-01-24 Thread Robert Bradshaw
On Thu, Jan 24, 2019 at 5:08 PM Thomas Weise  wrote:
>
> Exciting to see the cross-language train gathering steam :)
>
> It may be useful to flesh out the user facing aspects a bit more before going 
> too deep on the service / expansion side or maybe that was done elsewhere?

It's been discussed, but no resolution yet.

> A few examples (of varying complexity) of how the shim/proxy transforms would 
> look like in the other SDKs. Perhaps Java KafkaIO in Python and Go would be a 
> good candidate?

The core implementation would, almost by definition, be

input.apply(ExternalTransform(URN, payload, service_address).

Nicer shims would just be composite transforms that call this, filling
in the URNs, payloads, and possibly service details from more
user-friendly parameters.

> One problem we discovered with custom Flink native transforms for Python was 
> handling of lambdas / functions. An example could be a user defined watermark 
> timestamp extractor that the user should be able to supply in Python and the 
> JVM cannot handle.

Yes, this has never been resolved satisfactorily. For now, if UDFs can
be reified in terms of a commonly-understood URN + payload, it'll
work. A transform could provide a wide range of "useful" URNs for its
internal callbacks, more than that would require significant design if
it can't be pre- or post-fixed.

> On Wed, Jan 23, 2019 at 7:04 PM Chamikara Jayalath  
> wrote:
>>
>>
>>
>> On Wed, Jan 23, 2019 at 1:03 PM Robert Bradshaw  wrote:
>>>
>>> On Wed, Jan 23, 2019 at 6:38 PM Maximilian Michels  wrote:
>>> >
>>> > Thank you for starting on the cross-language feature Robert!
>>> >
>>> > Just to recap: Each SDK runs an ExpansionService which can be contacted 
>>> > during
>>> > pipeline translation to expand transforms that are unknown to the SDK. The
>>> > service returns the Proto definitions to the querying process.
>>>
>>> Yep. Technically it doesn't have to be the SDK, or even if it is there
>>> may be a variety of services (e.g. one offering SQL, one offering
>>> different IOs).
>>>
>>> > There will be multiple environments such that during execution 
>>> > cross-language
>>> > pipelines select the appropriate environment for a transform.
>>>
>>> Exactly. And fuses only those steps with compatible environments together.
>>>
>>> > It's not clear to me, should the expansion happen during pipeline 
>>> > construction
>>> > or during translation by the Runner?
>>>
>>> I think it need to happen as part of construction because the set of
>>> outputs (and their properties) can be dynamic based on the expansion.
>>
>>
>> Also, without expansion at pipeline construction, we'll have to define all 
>> composite cross-language transforms as runner-native transforms which won't 
>> be practical ?
>>
>>>
>>>
>>> > Thanks,
>>> > Max
>>> >
>>> > On 23.01.19 04:12, Robert Bradshaw wrote:
>>> > > No, this PR simply takes an endpoint address as a parameter, expecting
>>> > > it to already be up and available. More convenient APIs, e.g. ones
>>> > > that spin up and endpoint and tear it down, or catalog and locate code
>>> > > and services offering these endpoints, could be provided as wrappers
>>> > > on top of or extensions of this.
>>> > >
>>> > > On Wed, Jan 23, 2019 at 12:19 AM Kenneth Knowles  
>>> > > wrote:
>>> > >>
>>> > >> Nice! If I recall correctly, there was mostly concern about how to 
>>> > >> launch and manage the expansion service (Docker? Vendor-specific? 
>>> > >> Etc). Does this PR a position on that question?
>>> > >>
>>> > >> Kenn
>>> > >>
>>> > >> On Tue, Jan 22, 2019 at 1:44 PM Chamikara Jayalath 
>>> > >>  wrote:
>>> > >>>
>>> > >>>
>>> > >>>
>>> > >>> On Tue, Jan 22, 2019 at 11:35 AM Udi Meiri  wrote:
>>> > 
>>> >  Also debugability: collecting logs from each of these systems.
>>> > >>>
>>> > >>>
>>> > >>> Agree.
>>> > >>>
>>> > 
>>> > 
>>> >  On Tue, Jan 22, 2019 at 10:53 AM Chamikara Jayalath 
>>> >   wrote:
>>> > >
>>> > > Thanks Robert.
>>> > >
>>> > > On Tue, Jan 22, 2019 at 4:39 AM Robert Bradshaw 
>>> > >  wrote:
>>> > >>
>>> > >> Now that we have the FnAPI, I started playing around with support 
>>> > >> for
>>> > >> cross-language pipelines. This will allow things like IOs to be 
>>> > >> shared
>>> > >> across all languages, SQL to be invoked from non-Java, TFX 
>>> > >> tensorflow
>>> > >> transforms to be invoked from non-Python, etc. and I think is the 
>>> > >> next
>>> > >> step in extending (and taking advantage of) the portability layer
>>> > >> we've developed. These are often composite transforms whose inner
>>> > >> structure depends in non-trivial ways on their configuration.
>>> > >
>>> > >
>>> > > Some additional benefits of cross-language transforms are given 
>>> > > below.
>>> > >
>>> > > (1) Current large collection of Java IO connectors will be become 
>>> > > available to other languages.
>>> > > (2) Current Java 

contributor permission for Beam Jira tickets

2019-01-24 Thread Robert Collins
Hi,

This is Robert from Fluidly. We are setting up a streaming pipeline which
outputs to Redis. Can someone add me as a contributor for Beam's Jira issue
tracker? I'd like submit a pull request for an issue we had with flushing
bundles.

My Jira username is rob...@fluidly.com

Thanks

Robert


Re: Cross-language pipelines

2019-01-24 Thread Thomas Weise
Exciting to see the cross-language train gathering steam :)

It may be useful to flesh out the user facing aspects a bit more before
going too deep on the service / expansion side or maybe that was done
elsewhere?

A few examples (of varying complexity) of how the shim/proxy transforms
would look like in the other SDKs. Perhaps Java KafkaIO in Python and Go
would be a good candidate?

One problem we discovered with custom Flink native transforms for Python
was handling of lambdas / functions. An example could be a user defined
watermark timestamp extractor that the user should be able to supply in
Python and the JVM cannot handle.

Thanks,
Thomas


On Wed, Jan 23, 2019 at 7:04 PM Chamikara Jayalath 
wrote:

>
>
> On Wed, Jan 23, 2019 at 1:03 PM Robert Bradshaw 
> wrote:
>
>> On Wed, Jan 23, 2019 at 6:38 PM Maximilian Michels 
>> wrote:
>> >
>> > Thank you for starting on the cross-language feature Robert!
>> >
>> > Just to recap: Each SDK runs an ExpansionService which can be contacted
>> during
>> > pipeline translation to expand transforms that are unknown to the SDK.
>> The
>> > service returns the Proto definitions to the querying process.
>>
>> Yep. Technically it doesn't have to be the SDK, or even if it is there
>> may be a variety of services (e.g. one offering SQL, one offering
>> different IOs).
>>
>> > There will be multiple environments such that during execution
>> cross-language
>> > pipelines select the appropriate environment for a transform.
>>
>> Exactly. And fuses only those steps with compatible environments together.
>>
>> > It's not clear to me, should the expansion happen during pipeline
>> construction
>> > or during translation by the Runner?
>>
>> I think it need to happen as part of construction because the set of
>> outputs (and their properties) can be dynamic based on the expansion.
>>
>
> Also, without expansion at pipeline construction, we'll have to define all
> composite cross-language transforms as runner-native transforms which won't
> be practical ?
>
>
>>
>> > Thanks,
>> > Max
>> >
>> > On 23.01.19 04:12, Robert Bradshaw wrote:
>> > > No, this PR simply takes an endpoint address as a parameter, expecting
>> > > it to already be up and available. More convenient APIs, e.g. ones
>> > > that spin up and endpoint and tear it down, or catalog and locate code
>> > > and services offering these endpoints, could be provided as wrappers
>> > > on top of or extensions of this.
>> > >
>> > > On Wed, Jan 23, 2019 at 12:19 AM Kenneth Knowles 
>> wrote:
>> > >>
>> > >> Nice! If I recall correctly, there was mostly concern about how to
>> launch and manage the expansion service (Docker? Vendor-specific? Etc).
>> Does this PR a position on that question?
>> > >>
>> > >> Kenn
>> > >>
>> > >> On Tue, Jan 22, 2019 at 1:44 PM Chamikara Jayalath <
>> chamik...@google.com> wrote:
>> > >>>
>> > >>>
>> > >>>
>> > >>> On Tue, Jan 22, 2019 at 11:35 AM Udi Meiri 
>> wrote:
>> > 
>> >  Also debugability: collecting logs from each of these systems.
>> > >>>
>> > >>>
>> > >>> Agree.
>> > >>>
>> > 
>> > 
>> >  On Tue, Jan 22, 2019 at 10:53 AM Chamikara Jayalath <
>> chamik...@google.com> wrote:
>> > >
>> > > Thanks Robert.
>> > >
>> > > On Tue, Jan 22, 2019 at 4:39 AM Robert Bradshaw <
>> rober...@google.com> wrote:
>> > >>
>> > >> Now that we have the FnAPI, I started playing around with
>> support for
>> > >> cross-language pipelines. This will allow things like IOs to be
>> shared
>> > >> across all languages, SQL to be invoked from non-Java, TFX
>> tensorflow
>> > >> transforms to be invoked from non-Python, etc. and I think is
>> the next
>> > >> step in extending (and taking advantage of) the portability layer
>> > >> we've developed. These are often composite transforms whose inner
>> > >> structure depends in non-trivial ways on their configuration.
>> > >
>> > >
>> > > Some additional benefits of cross-language transforms are given
>> below.
>> > >
>> > > (1) Current large collection of Java IO connectors will be become
>> available to other languages.
>> > > (2) Current Java and Python transforms will be available for Go
>> and any other future SDKs.
>> > > (3) New transform authors will be able to pick their language of
>> choice and make their transform available to all Beam SDKs. For example,
>> this can be the language the transform author is most familiar with or the
>> only language for which a client library is available for connecting to an
>> external data store.
>> > >
>> > >>
>> > >> I created a PR [1] that basically follows the "expand via an
>> external
>> > >> process" over RPC alternative from the proposals we came up with
>> when
>> > >> we were discussing this last time [2]. There are still some
>> unknowns,
>> > >> e.g. how to handle artifacts supplied by an alternative SDK (they
>> > >> currently must be provided by the environment), but I think 

Re: ContainerLaunchException in precommit [BEAM-6497]

2019-01-24 Thread Gleb Kanterov
I'm wondering if anybody can reproduce this issue. The build has failed
once because testcontainers didn't pull docker image. If we use caching
proxy for docker, it could be a reason for that. I didn't find any similar
known issue in testcontainers fixed recently, but just in case, I bumped
testcontainers to use never docker-java.

https://github.com/apache/beam/pull/7610

On Thu, Jan 24, 2019 at 12:27 AM Alex Amato  wrote:

> Thank you Gleb, appreciate it.
>
> On Wed, Jan 23, 2019 at 2:40 PM Gleb Kanterov  wrote:
>
>> I'm looking into it. This image exists in docker hub [1], but for some
>> reason, it wasn't picked up.
>>
>> [1] https://hub.docker.com/r/yandex/clickhouse-server/tags
>>
>> On Wed, Jan 23, 2019 at 10:01 PM Alex Amato  wrote:
>>
>>>
>>>1.
>>>   See: BEAM-6497 
>>>   1. This is also causing issues blocking precommits.
>>>   2.
>>>   Seems to be caused by this failure to locate the image. Are these
>>>  stored somewhere or built by the build process? Any idea why these 
>>> are
>>>  failing?
>>>
>>>  Caused by: com.github.dockerjava.api.exception.NotFoundException: 
>>> {"message":"No such image: yandex/clickhouse-server:18.10.3"}
>>>
>>>
>>>
>>>
>>
>> --
>> Cheers,
>> Gleb
>>
>

-- 
Cheers,
Gleb


Jenkins seed job fails

2019-01-24 Thread Thomas Weise
Processing DSL script job_PreCommit_Python.groovy
Processing DSL script job_PreCommit_Python_ValidatesRunner_Flink.groovy
java.lang.IllegalArgumentException:
beam_PreCommit_Python_PVR_Flink_Cron already exists
at hudson.model.Items.verifyItemDoesNotAlreadyExist(Items.java:641)
at hudson.model.AbstractItem.renameTo(AbstractItem.java:372)
at hudson.model.Job.renameTo(Job.java:653)


Can someone with perms please cleanup old jobs?

Thanks!


Re: [DISCUSSION] ParDo Async Java API

2019-01-24 Thread Steve Niemitz
I think I agree with a lot of what you said here, I'm just going to restate
my initial use-case to try to make it more clear as well.

>From my usage of beam, I feel like the big benefit of async DoFns would be
to allow batched IO to be implemented more simply inside a DoFn.  Even in
the Beam SDK itself, there are a lot of IOs that batch up IO operations in
ProcessElement and wait for them to complete in FinishBundle ([1][2],
etc).  From my experience, things like error handling, emitting outputs as
the result of an asynchronous operation completing (in the correct window,
with the correct timestamp, etc) get pretty tricky, and it would be great
for the SDK to provide support natively for it.

It's also probably good to point out that really only DoFns that do IO
should be asynchronous, normal CPU bound DoFns have no reason to be
asynchronous.

A really good example of this is an IO I had written recently for Bigtable,
it takes an input PCollection of ByteStrings representing row keys, and
returns a PCollection of the row data from bigtable.  Naively this could be
implemented by simply blocking on the Bigtable read inside the ParDo,
however this would limit throughput substantially (even assuming an avg
read latency is 1ms, thats still only 1000 QPS / instance of the ParDo).
My implementation batches many reads together (as they arrive at the DoFn),
executes them once the batch is big enough (or some time passes), and then
emits them once the batch read completes.  Emitting them in the correct
window and handling errors gets tricky, so this is certainly something I'd
love the framework itself to handle.

I also don't see a big benefit of making a DoFn receive a future, if all a
user is ever supposed to do is attach a continuation to it, that could just
as easily be done by the runner itself, basically just invoking the entire
ParDo as a continuation on the future (which then assumes the runner is
even representing these tasks as futures internally).

Making the DoFn itself actually return a future could be an option, even if
the language itself doesn't support something like `await`, you could still
implement it yourself in the DoFn, however, it seems like it'd be a strange
contrast to the non-async version, which returns void.

[1]
https://github.com/apache/beam/blob/428dbaf2465292e9e53affd094e4b38cc1d754e5/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java#L720
[2]
https://github.com/apache/beam/blob/428dbaf2465292e9e53affd094e4b38cc1d754e5/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java#L1080


On Thu, Jan 24, 2019 at 8:43 AM Robert Bradshaw  wrote:

> If I understand correctly, the end goal is to process input elements
> of a DoFn asynchronously. Were I to do this naively, I would implement
> DoFns that simply take and receive [Serializable?]CompletionStages as
> element types, followed by a DoFn that adds a callback to emit on
> completion (possibly via a queue to avoid being-on-the-wrong-thread
> issues) and whose finalize forces all completions. This would, of
> course, interact poorly with processing time tracking, fusion breaks,
> watermark tracking, counter attribution, window propagation, etc. so
> it is desirable to make it part of the system itself.
>
> Taking a OutputReceiver> seems like a decent
> API. The invoking of the downstream process could be chained onto
> this, with all the implicit tracking and tracing set up correctly.
> Taking a CompletionStage as input means a DoFn would not have to
> create its output CompletionStage ex nihilo and possibly allow for
> better chaining (depending on the asynchronous APIs used).
>
> Even better might be to simply let the invocation of all
> DoFn.process() methods be asynchronous, but as Java doesn't offer an
> await primitive to relinquish control in the middle of a function body
> this might be hard.
>
> I think for correctness, completion would have to be forced at the end
> of each bundle. If your bundles are large enough, this may not be that
> big of a deal. In this case you could also start executing subsequent
> bundles while waiting for prior ones to complete.
>
>
>
>
> On Wed, Jan 23, 2019 at 11:58 PM Bharath Kumara Subramanian
>  wrote:
> >>
> >> I'd love to see something like this as well.  Also +1 to
> process(@Element InputT element, @Output
> OutputReceiver>). I don't know if there's much
> benefit to passing a future in, since the framework itself could hook up
> the process function to complete when the future completes.
> >
> >
> > One benefit we get by wrapping the input with CompletionStage is to
> mandate[1] users to chain their processing logic to the input future;
> thereby, ensuring asynchrony for the most part. However, it is still
> possible for users to go out of their way and write blocking code.
> >
> > Although, I am not sure how counter intuitive it is for the runners to
> wrap the input element into a future before 

Re: Beam JIRA contributor permissions

2019-01-24 Thread Ismaël Mejía
Done, welcome and enjoy!

On Thu, Jan 24, 2019 at 1:24 PM Michał Walenia
 wrote:
>
> Hi all,
> I’d like to take on BEAM-6207 issue, but I lack JIRA permissions to assign 
> tickets to myself
> Can anyone add me as a contributor?
> My username is mwalenia.
> Thanks and have a great day
>
> Michal


Re: [DISCUSSION] ParDo Async Java API

2019-01-24 Thread Robert Bradshaw
If I understand correctly, the end goal is to process input elements
of a DoFn asynchronously. Were I to do this naively, I would implement
DoFns that simply take and receive [Serializable?]CompletionStages as
element types, followed by a DoFn that adds a callback to emit on
completion (possibly via a queue to avoid being-on-the-wrong-thread
issues) and whose finalize forces all completions. This would, of
course, interact poorly with processing time tracking, fusion breaks,
watermark tracking, counter attribution, window propagation, etc. so
it is desirable to make it part of the system itself.

Taking a OutputReceiver> seems like a decent
API. The invoking of the downstream process could be chained onto
this, with all the implicit tracking and tracing set up correctly.
Taking a CompletionStage as input means a DoFn would not have to
create its output CompletionStage ex nihilo and possibly allow for
better chaining (depending on the asynchronous APIs used).

Even better might be to simply let the invocation of all
DoFn.process() methods be asynchronous, but as Java doesn't offer an
await primitive to relinquish control in the middle of a function body
this might be hard.

I think for correctness, completion would have to be forced at the end
of each bundle. If your bundles are large enough, this may not be that
big of a deal. In this case you could also start executing subsequent
bundles while waiting for prior ones to complete.




On Wed, Jan 23, 2019 at 11:58 PM Bharath Kumara Subramanian
 wrote:
>>
>> I'd love to see something like this as well.  Also +1 to process(@Element 
>> InputT element, @Output OutputReceiver>). I don't 
>> know if there's much benefit to passing a future in, since the framework 
>> itself could hook up the process function to complete when the future 
>> completes.
>
>
> One benefit we get by wrapping the input with CompletionStage is to 
> mandate[1] users to chain their processing logic to the input future; 
> thereby, ensuring asynchrony for the most part. However, it is still possible 
> for users to go out of their way and write blocking code.
>
> Although, I am not sure how counter intuitive it is for the runners to wrap 
> the input element into a future before passing it to the user code.
>
> Bharath
>
> [1] CompletionStage interface does not define methods for initially creating, 
> forcibly completing normally or exceptionally, probing completion status or 
> results, or awaiting completion of a stage. Implementations of 
> CompletionStage may provide means of achieving such effects, as appropriate
>
>
> On Wed, Jan 23, 2019 at 11:31 AM Kenneth Knowles  wrote:
>>
>> I think your concerns are valid but i want to clarify about "first class 
>> async APIs". Does "first class" mean that it is a well-encapsulated 
>> abstraction? or does it mean that the user can more or less do whatever they 
>> want? These are opposite but both valid meanings for "first class", to me.
>>
>> I would not want to encourage users to do explicit multi-threaded 
>> programming or control parallelism. Part of the point of Beam is to gain big 
>> data parallelism without explicit multithreading. I see asynchronous 
>> chaining of futures (or their best-approximation in your language of choice) 
>> as a highly disciplined way of doing asynchronous dependency-driven 
>> computation that is nonetheless conceptually, and readably, straight-line 
>> code. Threads are not required nor the only way to execute this code. In 
>> fact you might often want to execute without threading for a reference 
>> implementation to provide canonically correct results. APIs that leak 
>> lower-level details of threads are asking for trouble.
>>
>> One of our other ideas was to provide a dynamic parameter of type 
>> ExecutorService. The SDK harness (pre-portability: the runner) would control 
>> and observe parallelism while the user could simply register tasks. 
>> Providing a future/promise API is even more disciplined.
>>
>> Kenn
>>
>> On Wed, Jan 23, 2019 at 10:35 AM Scott Wegner  wrote:
>>>
>>> A related question is how to make execution observable such that a runner 
>>> can make proper scaling decisions. Runners decide how to schedule bundles 
>>> within and across multiple worker instances, and can use information about 
>>> execution to make dynamic scaling decisions. First-class async APIs seem 
>>> like they would encourage DoFn authors to implement their own 
>>> parallelization, rather than deferring to the runner that should be more 
>>> capable of providing the right level of parallelism.
>>>
>>> In the Dataflow worker harness, we estimate execution time to PTransform 
>>> steps by sampling execution time on the execution thread and attributing it 
>>> to the currently invoked method. This approach is fairly simple and 
>>> possible because we assume that execution happens within the thread 
>>> controlled by the runner. Some DoFn's already implement their own async 
>>> logic and break this 

Re: [spark runner based on dataset POC] your opinion

2019-01-24 Thread Etienne Chauchot
@Gleb, I'll also take a look at ExpressionEncoder thanks for the pointer to 
typelevel/frameless.
Etienne
Le mercredi 23 janvier 2019 à 17:06 +0100, Etienne Chauchot a écrit :
> Hi all ,Thanks for your feedback! I was indeed thinking about Reuven's work 
> around Schema PCollections, hence my email
> to the community. I don't see how it fits considering that, as I'm wrapping a 
> source, I need to store both the
> timestamp and the value hence the use of WindowedValue (as the other 
> runners do). Yet, WindowedValue might be an
> overkill because I obviously have no windowing as I'm at the input of a 
> pipeline. Hence the idea to create a beam
> schema that has the fields of T + a long for the timestamp and make a 
> converter between beam schema and spark schema.
> But I'm not sure it will be more performant than simply serializing 
> WindowedValue object to bytes. I guess Schema
> PCollection is a tool more usefull for the sdk part than for the runner part 
> of Beam. Am I missing something @Reuven ?
> for the same source wrapping problem, both current spark and flink also store 
> WindowedValue but do not enforce any
> schema in their Dataset equivalent structures. So they don't have this problem
> @Manu, regarding your concerns about serialization/deserialization roundtrip, 
> artificial roundtrips (not triggered by
> the spark FMWK) only happen once at the source execution time. downstream we 
> have Dataset>. But,
> indeed, if we apply a pardo it gets wrapped into a mapPartition for which 
> spark will require a encoder (similar to
> beam coder) for serialization. And indeed we provide a bytes encoder. But 
> once again if we had a schema, spark would
> still serde and I'm not sure a bean/schema Encoder would be more performant 
> than a binary one.
> Side note: @Gleb, yes using a schema would allow to use pushdown predicates 
> that are included in spark DataSourceV2
> API. But such predicates would depend on the backend IO technology  that we 
> dont' know in advance (e.g. filter by a
> column with is not a primary/clustering column in Cassandra could not pushed 
> down). We would have to translate
> differently depending on the IO in place of translating only BoundedSource 
> and UnboundedSource.
> BestEtienne
> Le vendredi 18 janvier 2019 à 18:33 +0100, Gleb Kanterov a écrit :
> > Agree with Kenn. It should be possible, Spark has a similar concept called 
> > ExpressionEncoder, I was doing similar
> > derivation using Scala macro in typelevel/frameless.
> > 
> > Most of the code in Beam is a blackbox function in ParDo, and the only way 
> > to translate it is using `mapPartition`,
> > however, we could override behavior for known transforms from 
> > beam-java-core, for instance, Group, Select, and
> > use FieldAccessDescriptor to pushdown projections. There is a bigger 
> > opportunity for Beam SQL, that translates into
> > a transforms that fit more Spark DataFrame model.
> > 
> > Gleb
> > 
> > 
> > 
> > On Fri, Jan 18, 2019 at 3:25 PM Kenneth Knowles  wrote:
> > > I wonder if this could tie in with Reuven's recent work. He's basically 
> > > making it so every type with an "obvious"
> > > schema automatically converts to/from Row whenever needed. Sounds like a 
> > > similar need, superficially.
> > > Kenn
> > > On Fri, Jan 18, 2019, 02:36 Manu Zhang  > > > Hi Etienne,
> > > > I see your point. I'm a bit worried that every ParDo has to be wrapped 
> > > > in a `mapPartition` which introduces cost
> > > > of serde and forgoes the benefits of Dataset API. 
> > > > Maybe Dataset is not the best idea to integrate Beam with Spark. Just 
> > > > my $0.02. 
> > > > 
> > > > Manu
> > > >  
> > > > On Thu, Jan 17, 2019 at 10:44 PM Etienne Chauchot 
> > > >  wrote:
> > > > > Hi Manu,Yes a json schema can make its way to the spark source with 
> > > > > no difficulty. but still we need to store
> > > > > windowedValue not only the elements that would comply with this 
> > > > > schema. The problem is that spark will try to
> > > > > match the element (windowedValue) to the schema of the source at any 
> > > > > element wise processing. (and downstream
> > > > > it will auto guess the schema with the content of dataset. For 
> > > > > example if I extract timestamp in a pardo I get
> > > > > a Long schema in the output dataset). The problem is that 
> > > > > windowedValue is complex and has many subclasses.
> > > > > Maybe bytes  serialization is still the best way to go, but we don't 
> > > > > leverage schema PCollections. BestEtienne
> > > > > Le jeudi 17 janvier 2019 à 21:52 +0800, Manu Zhang a écrit :
> > > > > > Nice Try, Etienne ! Is it possible to pass in the schema through 
> > > > > > pipeline options ?
> > > > > > Manu 
> > > > > > On Thu, Jan 17, 2019 at 5:25 PM Etienne Chauchot 
> > > > > >  wrote:
> > > > > > > Hi Kenn,
> > > > > > > Sure, in spark DataSourceV2 providing a schema is mandatory:- if 
> > > > > > > I set it to null, I obviously get a NPE-
> > > > > > > if I set it empty: I 

Beam JIRA contributor permissions

2019-01-24 Thread Michał Walenia
Hi all,
I’d like to take on BEAM-6207 issue, but I lack JIRA permissions to assign 
tickets to myself
Can anyone add me as a contributor?
My username is mwalenia.
Thanks and have a great day

Michal