Re: Magic number explanation in ParDoTest.java

2019-01-22 Thread Kenneth Knowles
OK I dug in to the code.

1. This test should be using TestStream to control the watermark if it is
to be a reliable test, since it sets a relative timer. Sorry I missed that
in review. It is surprising that this test works on any runner, much less
multiple. I wonder if it is blacklisted for most.
2. The application of offset and alignment should be explicitly sequenced
since they would yield different results.
3. The result is what it is because Java's % does not implement modular
arithmetic:
https://stackoverflow.com/questions/4412179/best-way-to-make-javas-modulus-behave-like-it-should-with-negative-numbers
but the result is a bug independent of that:

Min timestamp is -9223372036854775 so ignoring most of the digits and
inlining the code in SimpleDoFnRunner, you might think:

millisSinceStart = (now + offset) % period = (-4775 + 1) % 1000 = 226

So that would be accurately named. But since Java copies C/C++ instead of
mathematics, the result is -774.

target = now + period - millisSinceStart = -4775 + 1000 - (-774) = -3001

So that's 1774 later, but that doesn't make the answer useful. What I would
have expected given the implicit sequencing would be -4000, the result of
"set a timer for 1 milli from now, but align it to 1000 milli intervals
relative to the epoch". Or in the sequence expressed in the test (not
reflected in the data structures) it could mean "align the current
(watermark) time to the next 1000 milli since epoch boundary then set a
timer for 1 milli later", which would be -3999. FWIW the result of this
code using proper modular arithmetic would be -4001 and that number is not
a good result either.

Kenn

On Tue, Jan 22, 2019 at 6:11 PM Sam Rohde  wrote:

> Thanks for digging up the PR. I'm still confused as to why that magic
> number is still there though. Why is there an expectation that the
> timestamp from the timer is *exactly *1774ms past
> BoundedWindow.TIMESTAMP_MIN_VALUE?
>
> On Tue, Jan 22, 2019 at 10:43 AM Kenneth Knowles  wrote:
>
>> The commit comes from this PR: https://github.com/apache/beam/pull/2273
>>
>> Kenn
>>
>> On Tue, Jan 22, 2019 at 10:21 AM Sam Rohde  wrote:
>>
>>> Hi all,
>>>
>>> Does anyone have context why there is a magic number of "1774"
>>> milliseconds in the ParDoTest.java on line 2618? This is in
>>> the testEventTimeTimerAlignBounded method.
>>>
>>> File at master:
>>> https://github.com/apache/beam/blob/master/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java#L2618
>>>
>>> First added commit:
>>> https://github.com/apache/beam/commit/4f934923d28798dfe7cd18c86ff4bcf8eebc27e5
>>>
>>> Regards,
>>> Sam
>>>
>>


Re: [DISCUSSION] ParDo Async Java API

2019-01-22 Thread Kenneth Knowles
When executed over the portable APIs, it will be primarily the Java SDK
harness that makes all of these decisions. If we wanted runners to have
some insight into it we would have to add it to the Beam model protos. I
don't have any suggestions there, so I would leave it out of this
discussion until there's good ideas. We could learn a lot by trying it out
just in the SDK harness.

Kenn

On Tue, Jan 22, 2019 at 6:12 PM Xinyu Liu  wrote:

> I don't have a strong opinion on the resolution of the futures regarding
> to @FinishBundle invocation. Leaving it to be unspecified does give runners
> more room to implement it with their own support.
>
> Optimization is also another great point. Fuse seems pretty complex to me
> too if we need to find a way to chain the resulting future into the next
> transform, or leave the async transform as a standalone stage initially?
>
> Btw, I was counting the number of replies before we hit the portability.
> Seems after 4 replies fuse finally showed up :).
>
> Thanks,
> Xinyu
>
>
> On Tue, Jan 22, 2019 at 5:42 PM Kenneth Knowles  wrote:
>
>>
>>
>> On Tue, Jan 22, 2019, 17:23 Reuven Lax >
>>>
>>>
>>> On Tue, Jan 22, 2019 at 5:08 PM Xinyu Liu  wrote:
>>>
 @Steve: it's good to see that this is going to be useful in your use
 cases as well. Thanks for sharing the code from Scio! I can see in your
 implementation that waiting for the future completion is part of the
 @FinishBundle. We are thinking of taking advantage of the underlying runner
 async support so the user-level code won't need to implement this logic,
 e.g. Samza has an AsyncSteamTask api that provides a callback to invoke
 after future completion[1], and Flink also has AsyncFunction api [2] which
 provides a ResultFuture similar to the API we discussed.

>>>
>>> Can this be done correctly? What I mean is that if the process dies, can
>>> you guarantee that no data is lost? Beam currently guarantees this for
>>> FinishBundle, but if you use an arbitrary async framework this might not be
>>> true.
>>>
>>
>> What a Beam runner guarantees is that *if* the bundle is committed,
>> *then* finishbundle has run. So it seems just as easy to say *if* a bundle
>> is committed, *then* every async result has been resolved.
>>
>> If the process dies the two cases should be naturally analogous.
>>
>> But it raises the question of whether they should be resolved prior to
>> finishbundle, after, or unspecified. I lean toward unspecified.
>>
>> That's for a single ParDo. Where this could get complex is optimizing
>> fused stages for greater asynchrony.
>>
>> Kenn
>>
>>
>>>
 A simple use case for this is to execute a Runnable asynchronously in
 user's own executor. The following code illustrates Kenn's option #2, with
 a very simple single-thread pool being the executor:

 new DoFn() {
   @ProcessElement
   public void process(@Element InputT element, @Output 
 OutputReceiver> outputReceiver) {
 CompletableFuture future = CompletableFuture.supplyAsync(
 () -> someOutput,
 Executors.newSingleThreadExecutor());
 outputReceiver.output(future);
   }
 }

 The neat thing about this API is that the user can choose their own async 
 framework and we only expect the output to be a CompletionStage.


 For the implementation of bundling, can we compose a CompletableFuture 
 from each element in the bundle, e.g. CompletableFuture.allOf(...), and 
 then invoke @FinishBundle when this future is complete? Seems this might 
 work.

 Thanks,
 Xinyu


 [1]
 https://samza.apache.org/learn/documentation/1.0.0/api/javadocs/org/apache/samza/task/AsyncStreamTask.html
 [2]
 https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/stream/operators/asyncio.html

 On Tue, Jan 22, 2019 at 2:45 PM Steve Niemitz 
 wrote:

> I'd love to see something like this as well.  Also +1 to process(@Element
> InputT element, @Output OutputReceiver>).  I
> don't know if there's much benefit to passing a future in, since the
> framework itself could hook up the process function to complete when the
> future completes.
>
> I feel like I've spent a bunch of time writing very similar "kick off
> a future in ProcessElement, join it in FinishBundle" code, and looking
> around beam itself a lot of built-in transforms do it as well.  Scio
> provides a few AsyncDoFn implementations [1] but it'd be great to see this
> as a first-class concept in beam itself.  Doing error handling,
> concurrency, etc correctly can be tricky.
>
> [1]
> https://github.com/spotify/scio/blob/master/scio-core/src/main/java/com/spotify/scio/transforms/BaseAsyncDoFn.java
>
> On Tue, Jan 22, 2019 at 5:39 PM Kenneth Knowles 
> wrote:
>
>> If the input is a CompletionStage then the output should also
>> be a CompletionStage, 

Re: [DISCUSSION] ParDo Async Java API

2019-01-22 Thread Xinyu Liu
I don't have a strong opinion on the resolution of the futures regarding
to @FinishBundle invocation. Leaving it to be unspecified does give runners
more room to implement it with their own support.

Optimization is also another great point. Fuse seems pretty complex to me
too if we need to find a way to chain the resulting future into the next
transform, or leave the async transform as a standalone stage initially?

Btw, I was counting the number of replies before we hit the portability.
Seems after 4 replies fuse finally showed up :).

Thanks,
Xinyu


On Tue, Jan 22, 2019 at 5:42 PM Kenneth Knowles  wrote:

>
>
> On Tue, Jan 22, 2019, 17:23 Reuven Lax 
>>
>>
>> On Tue, Jan 22, 2019 at 5:08 PM Xinyu Liu  wrote:
>>
>>> @Steve: it's good to see that this is going to be useful in your use
>>> cases as well. Thanks for sharing the code from Scio! I can see in your
>>> implementation that waiting for the future completion is part of the
>>> @FinishBundle. We are thinking of taking advantage of the underlying runner
>>> async support so the user-level code won't need to implement this logic,
>>> e.g. Samza has an AsyncSteamTask api that provides a callback to invoke
>>> after future completion[1], and Flink also has AsyncFunction api [2] which
>>> provides a ResultFuture similar to the API we discussed.
>>>
>>
>> Can this be done correctly? What I mean is that if the process dies, can
>> you guarantee that no data is lost? Beam currently guarantees this for
>> FinishBundle, but if you use an arbitrary async framework this might not be
>> true.
>>
>
> What a Beam runner guarantees is that *if* the bundle is committed, *then*
> finishbundle has run. So it seems just as easy to say *if* a bundle is
> committed, *then* every async result has been resolved.
>
> If the process dies the two cases should be naturally analogous.
>
> But it raises the question of whether they should be resolved prior to
> finishbundle, after, or unspecified. I lean toward unspecified.
>
> That's for a single ParDo. Where this could get complex is optimizing
> fused stages for greater asynchrony.
>
> Kenn
>
>
>>
>>> A simple use case for this is to execute a Runnable asynchronously in
>>> user's own executor. The following code illustrates Kenn's option #2, with
>>> a very simple single-thread pool being the executor:
>>>
>>> new DoFn() {
>>>   @ProcessElement
>>>   public void process(@Element InputT element, @Output 
>>> OutputReceiver> outputReceiver) {
>>> CompletableFuture future = CompletableFuture.supplyAsync(
>>> () -> someOutput,
>>> Executors.newSingleThreadExecutor());
>>> outputReceiver.output(future);
>>>   }
>>> }
>>>
>>> The neat thing about this API is that the user can choose their own async 
>>> framework and we only expect the output to be a CompletionStage.
>>>
>>>
>>> For the implementation of bundling, can we compose a CompletableFuture from 
>>> each element in the bundle, e.g. CompletableFuture.allOf(...), and then 
>>> invoke @FinishBundle when this future is complete? Seems this might work.
>>>
>>> Thanks,
>>> Xinyu
>>>
>>>
>>> [1]
>>> https://samza.apache.org/learn/documentation/1.0.0/api/javadocs/org/apache/samza/task/AsyncStreamTask.html
>>> [2]
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/stream/operators/asyncio.html
>>>
>>> On Tue, Jan 22, 2019 at 2:45 PM Steve Niemitz 
>>> wrote:
>>>
 I'd love to see something like this as well.  Also +1 to process(@Element
 InputT element, @Output OutputReceiver>).  I
 don't know if there's much benefit to passing a future in, since the
 framework itself could hook up the process function to complete when the
 future completes.

 I feel like I've spent a bunch of time writing very similar "kick off a
 future in ProcessElement, join it in FinishBundle" code, and looking around
 beam itself a lot of built-in transforms do it as well.  Scio provides a
 few AsyncDoFn implementations [1] but it'd be great to see this as a
 first-class concept in beam itself.  Doing error handling, concurrency, etc
 correctly can be tricky.

 [1]
 https://github.com/spotify/scio/blob/master/scio-core/src/main/java/com/spotify/scio/transforms/BaseAsyncDoFn.java

 On Tue, Jan 22, 2019 at 5:39 PM Kenneth Knowles  wrote:

> If the input is a CompletionStage then the output should also
> be a CompletionStage, since all you should do is async chaining.
> We could enforce this by giving the DoFn an
> OutputReceiver(CompletionStage).
>
> Another possibility that might be even more robust against poor future
> use could be process(@Element InputT element, @Output
> OutputReceiver>). In this way, the process method
> itself will be async chained, rather than counting on the user to do the
> right thing.
>
> We should see how these look in real use cases. The way that
> processing is split between @ProcessElement and @FinishBundle might

Re: Magic number explanation in ParDoTest.java

2019-01-22 Thread Sam Rohde
Thanks for digging up the PR. I'm still confused as to why that magic
number is still there though. Why is there an expectation that the
timestamp from the timer is *exactly *1774ms past
BoundedWindow.TIMESTAMP_MIN_VALUE?

On Tue, Jan 22, 2019 at 10:43 AM Kenneth Knowles  wrote:

> The commit comes from this PR: https://github.com/apache/beam/pull/2273
>
> Kenn
>
> On Tue, Jan 22, 2019 at 10:21 AM Sam Rohde  wrote:
>
>> Hi all,
>>
>> Does anyone have context why there is a magic number of "1774"
>> milliseconds in the ParDoTest.java on line 2618? This is in
>> the testEventTimeTimerAlignBounded method.
>>
>> File at master:
>> https://github.com/apache/beam/blob/master/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java#L2618
>>
>> First added commit:
>> https://github.com/apache/beam/commit/4f934923d28798dfe7cd18c86ff4bcf8eebc27e5
>>
>> Regards,
>> Sam
>>
>


Re: [PROPOSAL] Prepare Beam 2.10.0 release

2019-01-22 Thread Kenneth Knowles
OK. There is just one release blocker remaining;
https://issues.apache.org/jira/browse/BEAM-6354

I have no insights yet, but I am bisecting. It was healthy in 2.9.0.

Kenn

On Tue, Jan 22, 2019 at 9:38 AM Scott Wegner  wrote:

> The rollback for BEAM-6352 is now in and cherry-picked into the release
> branch.
>
> On Fri, Jan 18, 2019 at 9:04 AM Scott Wegner  wrote:
>
>> For BEAM-6352, I have a rollback ready for review:
>> https://github.com/apache/beam/pull/7540
>> Conversation about the decision to rollback vs. roll-forward for this
>> change is on the JIRA issue.
>>
>> On Fri, Jan 18, 2019 at 8:22 AM Maximilian Michels 
>> wrote:
>>
>>> I've created the revert for the pipeline options parsing which we agreed
>>> on:
>>> https://github.com/apache/beam/pull/7564
>>>
>>> On 17.01.19 15:16, Maximilian Michels wrote:
>>> > An issue with the Flink Runner when restarting streaming pipelines:
>>> > https://jira.apache.org/jira/browse/BEAM-6460
>>> >
>>> > Looks like it will be easy to fix by invalidating the Jackson cache.
>>> >
>>> > -Max
>>> >
>>> > On 16.01.19 23:00, Kenneth Knowles wrote:
>>> >> Quick update on this. There are three remaining issues:
>>> >>
>>> >>   - https://issues.apache.org/jira/browse/BEAM-6407: A DirectRunner
>>> self-check
>>> >> was broken from 2.8.0 to 2.9.0 - PR looks good modulo our infra flakes
>>> >>   - https://issues.apache.org/jira/browse/BEAM-6354: PAssert +
>>> DirectRunner +
>>> >> Unbounded data busted? Investigation not started
>>> >>   - https://issues.apache.org/jira/browse/BEAM-6352: Watch was
>>> broken from
>>> >> 2.8.0 to 2.9.0 - will rollback if no forward fix by the time
>>> everything else
>>> >> is resolved
>>> >>
>>> >> Kenn
>>> >>
>>> >> On Wed, Jan 16, 2019 at 6:00 AM Kenneth Knowles >> >> > wrote:
>>> >>
>>> >> Thanks, Ismaël!
>>> >>
>>> >> On Wed, Jan 16, 2019 at 2:13 AM Ismaël Mejía >> >> > wrote:
>>> >>
>>> >> Ok since there were not many issues I did the 'update' for the
>>> >> misplaced issues to version 2.10. We are good to go. New
>>> resolved
>>> >> issues in master musg go now into 2.11.0
>>> >>
>>> >> On Wed, Jan 16, 2019 at 10:38 AM Ismaël Mejía <
>>> ieme...@gmail.com
>>> >> > wrote:
>>> >>  >
>>> >>  > This means that the tickets resolved and marked for 2.11
>>> since January
>>> >>  > 2 should be reviewed and retargetted to version 2.10.
>>> >>  > So this is a call for action for committers who have
>>> merged fixes
>>> >>  > after the cut to update the tickets if required.
>>> >>  >
>>> >>  > Ismaël
>>> >>  >
>>> >>  > On Tue, Jan 15, 2019 at 9:22 PM Kenneth Knowles <
>>> k...@apache.org
>>> >> > wrote:
>>> >>  > >
>>> >>  > > As a heads up, I did not realize that the release guide
>>> specified a
>>> >> custom process for starting a release branch. It makes sense;
>>> >> cut_release_branch.sh consolidates knowledge about all the
>>> places the
>>> >> version is hardcoded in the codebase. To keep the history
>>> simple, I will
>>> >> re-cut the release branch at the point where master moved from
>>> >> 2.10.0-SNAPSHOT to 2.11.0-SNAPSHOT. All PRs to the branch
>>> have been
>>> >> cherry-picked from master, so they will all be incorporated
>>> without any
>>> >> action by their authors.
>>> >>  > >
>>> >>  > > Kenn
>>> >>  > >
>>> >>  > > On Tue, Jan 15, 2019 at 10:31 AM Kenneth Knowles <
>>> k...@google.com
>>> >> > wrote:
>>> >>  > >>
>>> >>  > >> I'm on it.
>>> >>  > >>
>>> >>  > >> On Tue, Jan 15, 2019 at 8:10 AM Ismaël Mejía <
>>> ieme...@gmail.com
>>> >> > wrote:
>>> >>  > >>>
>>> >>  > >>> There is also another issue, after the 2.10.0 branch
>>> cut some
>>> >>  > >>> identifier in the build was not changed and the Apache
>>> Beam
>>> >> Snapshots
>>> >>  > >>> keep generating SNAPSHOTS for 2.10.0 instead of the
>>> now current
>>> >>  > >>> 2.11.0-SNAPSHOT. Can somebody PTAL?
>>> >>  > >>>
>>> >>  > >>> On Thu, Jan 3, 2019 at 6:17 PM Maximilian Michels <
>>> m...@apache.org
>>> >> > wrote:
>>> >>  > >>> >
>>> >>  > >>> > Thanks for driving this Kenn! I'm in favor of a
>>> strict cut off,
>>> >> but I'd like to
>>> >>  > >>> > propose a week for cherry-picking relevant changes
>>> to the
>>> >> release branch. It
>>> >>  > >>> > looks like many people are returning from holidays
>>> or are still
>>> >> off.
>>> >>  > >>> >
>>> >>  > >>> > Cheers,
>>> >>  > >>> > Max
>>> >>  > >>> >
>>> >>  > >>> > On 02.01.19 17:20, Kenneth 

Re: [DISCUSSION] ParDo Async Java API

2019-01-22 Thread Kenneth Knowles
On Tue, Jan 22, 2019, 17:23 Reuven Lax 
>
> On Tue, Jan 22, 2019 at 5:08 PM Xinyu Liu  wrote:
>
>> @Steve: it's good to see that this is going to be useful in your use
>> cases as well. Thanks for sharing the code from Scio! I can see in your
>> implementation that waiting for the future completion is part of the
>> @FinishBundle. We are thinking of taking advantage of the underlying runner
>> async support so the user-level code won't need to implement this logic,
>> e.g. Samza has an AsyncSteamTask api that provides a callback to invoke
>> after future completion[1], and Flink also has AsyncFunction api [2] which
>> provides a ResultFuture similar to the API we discussed.
>>
>
> Can this be done correctly? What I mean is that if the process dies, can
> you guarantee that no data is lost? Beam currently guarantees this for
> FinishBundle, but if you use an arbitrary async framework this might not be
> true.
>

What a Beam runner guarantees is that *if* the bundle is committed, *then*
finishbundle has run. So it seems just as easy to say *if* a bundle is
committed, *then* every async result has been resolved.

If the process dies the two cases should be naturally analogous.

But it raises the question of whether they should be resolved prior to
finishbundle, after, or unspecified. I lean toward unspecified.

That's for a single ParDo. Where this could get complex is optimizing fused
stages for greater asynchrony.

Kenn


>
>> A simple use case for this is to execute a Runnable asynchronously in
>> user's own executor. The following code illustrates Kenn's option #2, with
>> a very simple single-thread pool being the executor:
>>
>> new DoFn() {
>>   @ProcessElement
>>   public void process(@Element InputT element, @Output 
>> OutputReceiver> outputReceiver) {
>> CompletableFuture future = CompletableFuture.supplyAsync(
>> () -> someOutput,
>> Executors.newSingleThreadExecutor());
>> outputReceiver.output(future);
>>   }
>> }
>>
>> The neat thing about this API is that the user can choose their own async 
>> framework and we only expect the output to be a CompletionStage.
>>
>>
>> For the implementation of bundling, can we compose a CompletableFuture from 
>> each element in the bundle, e.g. CompletableFuture.allOf(...), and then 
>> invoke @FinishBundle when this future is complete? Seems this might work.
>>
>> Thanks,
>> Xinyu
>>
>>
>> [1]
>> https://samza.apache.org/learn/documentation/1.0.0/api/javadocs/org/apache/samza/task/AsyncStreamTask.html
>> [2]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/stream/operators/asyncio.html
>>
>> On Tue, Jan 22, 2019 at 2:45 PM Steve Niemitz 
>> wrote:
>>
>>> I'd love to see something like this as well.  Also +1 to process(@Element
>>> InputT element, @Output OutputReceiver>).  I
>>> don't know if there's much benefit to passing a future in, since the
>>> framework itself could hook up the process function to complete when the
>>> future completes.
>>>
>>> I feel like I've spent a bunch of time writing very similar "kick off a
>>> future in ProcessElement, join it in FinishBundle" code, and looking around
>>> beam itself a lot of built-in transforms do it as well.  Scio provides a
>>> few AsyncDoFn implementations [1] but it'd be great to see this as a
>>> first-class concept in beam itself.  Doing error handling, concurrency, etc
>>> correctly can be tricky.
>>>
>>> [1]
>>> https://github.com/spotify/scio/blob/master/scio-core/src/main/java/com/spotify/scio/transforms/BaseAsyncDoFn.java
>>>
>>> On Tue, Jan 22, 2019 at 5:39 PM Kenneth Knowles  wrote:
>>>
 If the input is a CompletionStage then the output should also
 be a CompletionStage, since all you should do is async chaining.
 We could enforce this by giving the DoFn an
 OutputReceiver(CompletionStage).

 Another possibility that might be even more robust against poor future
 use could be process(@Element InputT element, @Output
 OutputReceiver>). In this way, the process method
 itself will be async chained, rather than counting on the user to do the
 right thing.

 We should see how these look in real use cases. The way that processing
 is split between @ProcessElement and @FinishBundle might complicate things.

 Kenn

 On Tue, Jan 22, 2019 at 12:44 PM Xinyu Liu 
 wrote:

> Hi, guys,
>
> As more users try out Beam running on the SamzaRunner, we got a lot of
> asks for an asynchronous processing API. There are a few reasons for these
> asks:
>
>- The users here are experienced in asynchronous programming. With
>async frameworks such as Netty and ParSeq and libs like async jersey
>client, they are able to make remote calls efficiently and the 
> libraries
>help manage the execution threads underneath. Async remote calls are 
> very
>common in most of our streaming applications today.
>- Many jobs are running on a 

Re: [DISCUSSION] ParDo Async Java API

2019-01-22 Thread Xinyu Liu
I can speak on Samza's perspective: Samza only commits the messages once
the async callbacks have been completed. So if there are any failures, it
will recover from last checkpoint and reprocess the messages that we
haven't got the completion. So there is no data lost. The "Guaranteed
Semantics" in [1] has a little bit more details. I believe Flink honors the
same semantics by reading the "Fault Tolerance Guarantees" section in [2].

Thanks,
Xinyu

[1]:
https://samza.apache.org/learn/tutorials/0.11/samza-async-user-guide.html
[2]:
https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/stream/operators/asyncio.html

On Tue, Jan 22, 2019 at 5:23 PM Reuven Lax  wrote:

>
>
> On Tue, Jan 22, 2019 at 5:08 PM Xinyu Liu  wrote:
>
>> @Steve: it's good to see that this is going to be useful in your use
>> cases as well. Thanks for sharing the code from Scio! I can see in your
>> implementation that waiting for the future completion is part of the
>> @FinishBundle. We are thinking of taking advantage of the underlying runner
>> async support so the user-level code won't need to implement this logic,
>> e.g. Samza has an AsyncSteamTask api that provides a callback to invoke
>> after future completion[1], and Flink also has AsyncFunction api [2] which
>> provides a ResultFuture similar to the API we discussed.
>>
>
> Can this be done correctly? What I mean is that if the process dies, can
> you guarantee that no data is lost? Beam currently guarantees this for
> FinishBundle, but if you use an arbitrary async framework this might not be
> true.
>
>
>> A simple use case for this is to execute a Runnable asynchronously in
>> user's own executor. The following code illustrates Kenn's option #2, with
>> a very simple single-thread pool being the executor:
>>
>> new DoFn() {
>>   @ProcessElement
>>   public void process(@Element InputT element, @Output 
>> OutputReceiver> outputReceiver) {
>> CompletableFuture future = CompletableFuture.supplyAsync(
>> () -> someOutput,
>> Executors.newSingleThreadExecutor());
>> outputReceiver.output(future);
>>   }
>> }
>>
>> The neat thing about this API is that the user can choose their own async 
>> framework and we only expect the output to be a CompletionStage.
>>
>>
>> For the implementation of bundling, can we compose a CompletableFuture from 
>> each element in the bundle, e.g. CompletableFuture.allOf(...), and then 
>> invoke @FinishBundle when this future is complete? Seems this might work.
>>
>> Thanks,
>> Xinyu
>>
>>
>> [1]
>> https://samza.apache.org/learn/documentation/1.0.0/api/javadocs/org/apache/samza/task/AsyncStreamTask.html
>> [2]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/stream/operators/asyncio.html
>>
>> On Tue, Jan 22, 2019 at 2:45 PM Steve Niemitz 
>> wrote:
>>
>>> I'd love to see something like this as well.  Also +1 to process(@Element
>>> InputT element, @Output OutputReceiver>).  I
>>> don't know if there's much benefit to passing a future in, since the
>>> framework itself could hook up the process function to complete when the
>>> future completes.
>>>
>>> I feel like I've spent a bunch of time writing very similar "kick off a
>>> future in ProcessElement, join it in FinishBundle" code, and looking around
>>> beam itself a lot of built-in transforms do it as well.  Scio provides a
>>> few AsyncDoFn implementations [1] but it'd be great to see this as a
>>> first-class concept in beam itself.  Doing error handling, concurrency, etc
>>> correctly can be tricky.
>>>
>>> [1]
>>> https://github.com/spotify/scio/blob/master/scio-core/src/main/java/com/spotify/scio/transforms/BaseAsyncDoFn.java
>>>
>>> On Tue, Jan 22, 2019 at 5:39 PM Kenneth Knowles  wrote:
>>>
 If the input is a CompletionStage then the output should also
 be a CompletionStage, since all you should do is async chaining.
 We could enforce this by giving the DoFn an
 OutputReceiver(CompletionStage).

 Another possibility that might be even more robust against poor future
 use could be process(@Element InputT element, @Output
 OutputReceiver>). In this way, the process method
 itself will be async chained, rather than counting on the user to do the
 right thing.

 We should see how these look in real use cases. The way that processing
 is split between @ProcessElement and @FinishBundle might complicate things.

 Kenn

 On Tue, Jan 22, 2019 at 12:44 PM Xinyu Liu 
 wrote:

> Hi, guys,
>
> As more users try out Beam running on the SamzaRunner, we got a lot of
> asks for an asynchronous processing API. There are a few reasons for these
> asks:
>
>- The users here are experienced in asynchronous programming. With
>async frameworks such as Netty and ParSeq and libs like async jersey
>client, they are able to make remote calls efficiently and the 
> libraries
>help manage the execution threads underneath. Async 

Re: FileIOTest.testMatchWatchForNewFiles flakey in java presubmit

2019-01-22 Thread Udi Meiri
Alex, the only way to implement my suggestion #1 (that I know of) would be
to write to a file and read it back.
I don't have good example for #2.

Eugene's suggestion no. 1 seems like a good idea. There are some example

in the codebase.

On Tue, Jan 22, 2019 at 5:16 PM Eugene Kirpichov 
wrote:

> Yeah the "List expected" is constructed
> from Files.getLastModifiedTime() calls before the files are actually
> modified, the code is basically unconditionally broken rather than merely
> flaky.
>
> There's several easy options:
> 1) Use PAssert.that().satisfies() instead of .contains(), and use
> assertThat().contains() inside that, with the list constructed at time the
> assertion is applied rather than declared.
> 2) Implement a Matcher that ignores last modified time and use
> that
>
> Jeff - your option #3 is unfortunately also race-prone, because the code
> may match the files after they have been written but before
> setLastModifiedTime was called.
>
> On Tue, Jan 22, 2019 at 5:08 PM Jeff Klukas  wrote:
>
>> Another option:
>>
>> #3 Have the writer thread call Files.setLastModifiedTime explicitly after
>> each File.write. Then the lastModifiedMillis can be a stable value for each
>> file and we can use those same static values in our expected result. I
>> think that would also eliminate the race condition.
>>
>> On Tue, Jan 22, 2019 at 7:48 PM Alex Amato  wrote:
>>
>>> Thanks Udi, is there a good example for either of these?
>>> #1 - seems like you have to rewrite your assertion logic without the
>>> PAssert? Is there some way to capture the pipeline output and iterate over
>>> it? The pattern I have seen for this in the past also has thread safety
>>> issues (Using a DoFn at the end of the pipeline to add the output to a
>>> collection is not safe since the collection can be executed concurrently)
>>> #2 - Would BigqueryMatcher be a good example for this? which is used in
>>> BigQueryTornadoesIT.java Or is there another example you would suggest
>>> looking at for reference?
>>>
>>>- I guess to this you need to implement the SerializableMatcher
>>>interface and use the matcher as an option in the pipeline options.
>>>
>>>
>>> On Tue, Jan 22, 2019 at 4:28 PM Udi Meiri  wrote:
>>>
 Some options:
 - You could wait to assert until after p.waitForFinish().
 - You could PAssert using SerializableMatcher and allow any
 lastModifiedTime.

 On Tue, Jan 22, 2019 at 3:56 PM Alex Amato  wrote:

> +Jeff, Eugene,
>
> Hi Jeff and Eugene,
>
> I've noticed that Jeff's PR
> 
>  introduced
> a race condition in this test, but its not clear exactly how to add Jeff's
> test check in a thread safe way. I believe this to be the source of the
> flakeyness Do you have any suggestions Eugene (since you authored this
> test)?
>
> I added some details to this JIRA issue explaining in full
> https://jira.apache.org/jira/browse/BEAM-6491?filter=-2
>
>
> On Tue, Jan 22, 2019 at 3:34 PM Alex Amato  wrote:
>
>> I've seen this fail in a few different PRs for different
>> contributors, and its causing some issues during the presubmit process..
>> This is a multithreadred test with a lot of sleeps, so it looks a bit
>> suspicious as the source of the problem.
>>
>> https://builds.apache.org/job/beam_PreCommit_Java_Commit/3688/testReport/org.apache.beam.sdk.io/FileIOTest/testMatchWatchForNewFiles/
>>
>> I filed a JIRA for this issue:
>> https://jira.apache.org/jira/browse/BEAM-6491?filter=-2
>>
>>
>>


smime.p7s
Description: S/MIME Cryptographic Signature


Re: [DISCUSSION] ParDo Async Java API

2019-01-22 Thread Reuven Lax
On Tue, Jan 22, 2019 at 5:08 PM Xinyu Liu  wrote:

> @Steve: it's good to see that this is going to be useful in your use cases
> as well. Thanks for sharing the code from Scio! I can see in your
> implementation that waiting for the future completion is part of the
> @FinishBundle. We are thinking of taking advantage of the underlying runner
> async support so the user-level code won't need to implement this logic,
> e.g. Samza has an AsyncSteamTask api that provides a callback to invoke
> after future completion[1], and Flink also has AsyncFunction api [2] which
> provides a ResultFuture similar to the API we discussed.
>

Can this be done correctly? What I mean is that if the process dies, can
you guarantee that no data is lost? Beam currently guarantees this for
FinishBundle, but if you use an arbitrary async framework this might not be
true.


> A simple use case for this is to execute a Runnable asynchronously in
> user's own executor. The following code illustrates Kenn's option #2, with
> a very simple single-thread pool being the executor:
>
> new DoFn() {
>   @ProcessElement
>   public void process(@Element InputT element, @Output 
> OutputReceiver> outputReceiver) {
> CompletableFuture future = CompletableFuture.supplyAsync(
> () -> someOutput,
> Executors.newSingleThreadExecutor());
> outputReceiver.output(future);
>   }
> }
>
> The neat thing about this API is that the user can choose their own async 
> framework and we only expect the output to be a CompletionStage.
>
>
> For the implementation of bundling, can we compose a CompletableFuture from 
> each element in the bundle, e.g. CompletableFuture.allOf(...), and then 
> invoke @FinishBundle when this future is complete? Seems this might work.
>
> Thanks,
> Xinyu
>
>
> [1]
> https://samza.apache.org/learn/documentation/1.0.0/api/javadocs/org/apache/samza/task/AsyncStreamTask.html
> [2]
> https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/stream/operators/asyncio.html
>
> On Tue, Jan 22, 2019 at 2:45 PM Steve Niemitz  wrote:
>
>> I'd love to see something like this as well.  Also +1 to process(@Element
>> InputT element, @Output OutputReceiver>).  I
>> don't know if there's much benefit to passing a future in, since the
>> framework itself could hook up the process function to complete when the
>> future completes.
>>
>> I feel like I've spent a bunch of time writing very similar "kick off a
>> future in ProcessElement, join it in FinishBundle" code, and looking around
>> beam itself a lot of built-in transforms do it as well.  Scio provides a
>> few AsyncDoFn implementations [1] but it'd be great to see this as a
>> first-class concept in beam itself.  Doing error handling, concurrency, etc
>> correctly can be tricky.
>>
>> [1]
>> https://github.com/spotify/scio/blob/master/scio-core/src/main/java/com/spotify/scio/transforms/BaseAsyncDoFn.java
>>
>> On Tue, Jan 22, 2019 at 5:39 PM Kenneth Knowles  wrote:
>>
>>> If the input is a CompletionStage then the output should also be
>>> a CompletionStage, since all you should do is async chaining. We
>>> could enforce this by giving the DoFn an
>>> OutputReceiver(CompletionStage).
>>>
>>> Another possibility that might be even more robust against poor future
>>> use could be process(@Element InputT element, @Output
>>> OutputReceiver>). In this way, the process method
>>> itself will be async chained, rather than counting on the user to do the
>>> right thing.
>>>
>>> We should see how these look in real use cases. The way that processing
>>> is split between @ProcessElement and @FinishBundle might complicate things.
>>>
>>> Kenn
>>>
>>> On Tue, Jan 22, 2019 at 12:44 PM Xinyu Liu 
>>> wrote:
>>>
 Hi, guys,

 As more users try out Beam running on the SamzaRunner, we got a lot of
 asks for an asynchronous processing API. There are a few reasons for these
 asks:

- The users here are experienced in asynchronous programming. With
async frameworks such as Netty and ParSeq and libs like async jersey
client, they are able to make remote calls efficiently and the libraries
help manage the execution threads underneath. Async remote calls are 
 very
common in most of our streaming applications today.
- Many jobs are running on a multi-tenancy cluster. Async
processing helps for less resource usage and fast computation (less 
 context
switch).

 I asked about the async support in a previous email thread. The
 following API was mentioned in the reply:

   new DoFn() {
 @ProcessElement
 public void process(@Element CompletionStage element, ...) {
   element.thenApply(...)
 }
   }

 We are wondering whether there are any discussions on this API and
 related docs. It is awesome that you guys already considered having DoFn to
 process asynchronously. Out of curiosity, this API seems to create a
 

Re: FileIOTest.testMatchWatchForNewFiles flakey in java presubmit

2019-01-22 Thread Eugene Kirpichov
Yeah the "List expected" is constructed
from Files.getLastModifiedTime() calls before the files are actually
modified, the code is basically unconditionally broken rather than merely
flaky.

There's several easy options:
1) Use PAssert.that().satisfies() instead of .contains(), and use
assertThat().contains() inside that, with the list constructed at time the
assertion is applied rather than declared.
2) Implement a Matcher that ignores last modified time and use
that

Jeff - your option #3 is unfortunately also race-prone, because the code
may match the files after they have been written but before
setLastModifiedTime was called.

On Tue, Jan 22, 2019 at 5:08 PM Jeff Klukas  wrote:

> Another option:
>
> #3 Have the writer thread call Files.setLastModifiedTime explicitly after
> each File.write. Then the lastModifiedMillis can be a stable value for each
> file and we can use those same static values in our expected result. I
> think that would also eliminate the race condition.
>
> On Tue, Jan 22, 2019 at 7:48 PM Alex Amato  wrote:
>
>> Thanks Udi, is there a good example for either of these?
>> #1 - seems like you have to rewrite your assertion logic without the
>> PAssert? Is there some way to capture the pipeline output and iterate over
>> it? The pattern I have seen for this in the past also has thread safety
>> issues (Using a DoFn at the end of the pipeline to add the output to a
>> collection is not safe since the collection can be executed concurrently)
>> #2 - Would BigqueryMatcher be a good example for this? which is used in
>> BigQueryTornadoesIT.java Or is there another example you would suggest
>> looking at for reference?
>>
>>- I guess to this you need to implement the SerializableMatcher
>>interface and use the matcher as an option in the pipeline options.
>>
>>
>> On Tue, Jan 22, 2019 at 4:28 PM Udi Meiri  wrote:
>>
>>> Some options:
>>> - You could wait to assert until after p.waitForFinish().
>>> - You could PAssert using SerializableMatcher and allow any
>>> lastModifiedTime.
>>>
>>> On Tue, Jan 22, 2019 at 3:56 PM Alex Amato  wrote:
>>>
 +Jeff, Eugene,

 Hi Jeff and Eugene,

 I've noticed that Jeff's PR
 
  introduced
 a race condition in this test, but its not clear exactly how to add Jeff's
 test check in a thread safe way. I believe this to be the source of the
 flakeyness Do you have any suggestions Eugene (since you authored this
 test)?

 I added some details to this JIRA issue explaining in full
 https://jira.apache.org/jira/browse/BEAM-6491?filter=-2


 On Tue, Jan 22, 2019 at 3:34 PM Alex Amato  wrote:

> I've seen this fail in a few different PRs for different contributors,
> and its causing some issues during the presubmit process.. This is a
> multithreadred test with a lot of sleeps, so it looks a bit suspicious as
> the source of the problem.
>
> https://builds.apache.org/job/beam_PreCommit_Java_Commit/3688/testReport/org.apache.beam.sdk.io/FileIOTest/testMatchWatchForNewFiles/
>
> I filed a JIRA for this issue:
> https://jira.apache.org/jira/browse/BEAM-6491?filter=-2
>
>
>


Re: [DISCUSSION] ParDo Async Java API

2019-01-22 Thread Xinyu Liu
@Steve: it's good to see that this is going to be useful in your use cases
as well. Thanks for sharing the code from Scio! I can see in your
implementation that waiting for the future completion is part of the
@FinishBundle. We are thinking of taking advantage of the underlying runner
async support so the user-level code won't need to implement this logic,
e.g. Samza has an AsyncSteamTask api that provides a callback to invoke
after future completion[1], and Flink also has AsyncFunction api [2] which
provides a ResultFuture similar to the API we discussed.

A simple use case for this is to execute a Runnable asynchronously in
user's own executor. The following code illustrates Kenn's option #2, with
a very simple single-thread pool being the executor:

new DoFn() {
  @ProcessElement
  public void process(@Element InputT element, @Output
OutputReceiver> outputReceiver) {
CompletableFuture future = CompletableFuture.supplyAsync(
() -> someOutput,
Executors.newSingleThreadExecutor());
outputReceiver.output(future);
  }
}

The neat thing about this API is that the user can choose their own
async framework and we only expect the output to be a CompletionStage.


For the implementation of bundling, can we compose a CompletableFuture
from each element in the bundle, e.g. CompletableFuture.allOf(...),
and then invoke @FinishBundle when this future is complete? Seems this
might work.

Thanks,
Xinyu


[1]
https://samza.apache.org/learn/documentation/1.0.0/api/javadocs/org/apache/samza/task/AsyncStreamTask.html
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/stream/operators/asyncio.html

On Tue, Jan 22, 2019 at 2:45 PM Steve Niemitz  wrote:

> I'd love to see something like this as well.  Also +1 to process(@Element
> InputT element, @Output OutputReceiver>).  I
> don't know if there's much benefit to passing a future in, since the
> framework itself could hook up the process function to complete when the
> future completes.
>
> I feel like I've spent a bunch of time writing very similar "kick off a
> future in ProcessElement, join it in FinishBundle" code, and looking around
> beam itself a lot of built-in transforms do it as well.  Scio provides a
> few AsyncDoFn implementations [1] but it'd be great to see this as a
> first-class concept in beam itself.  Doing error handling, concurrency, etc
> correctly can be tricky.
>
> [1]
> https://github.com/spotify/scio/blob/master/scio-core/src/main/java/com/spotify/scio/transforms/BaseAsyncDoFn.java
>
> On Tue, Jan 22, 2019 at 5:39 PM Kenneth Knowles  wrote:
>
>> If the input is a CompletionStage then the output should also be
>> a CompletionStage, since all you should do is async chaining. We
>> could enforce this by giving the DoFn an
>> OutputReceiver(CompletionStage).
>>
>> Another possibility that might be even more robust against poor future
>> use could be process(@Element InputT element, @Output
>> OutputReceiver>). In this way, the process method
>> itself will be async chained, rather than counting on the user to do the
>> right thing.
>>
>> We should see how these look in real use cases. The way that processing
>> is split between @ProcessElement and @FinishBundle might complicate things.
>>
>> Kenn
>>
>> On Tue, Jan 22, 2019 at 12:44 PM Xinyu Liu  wrote:
>>
>>> Hi, guys,
>>>
>>> As more users try out Beam running on the SamzaRunner, we got a lot of
>>> asks for an asynchronous processing API. There are a few reasons for these
>>> asks:
>>>
>>>- The users here are experienced in asynchronous programming. With
>>>async frameworks such as Netty and ParSeq and libs like async jersey
>>>client, they are able to make remote calls efficiently and the libraries
>>>help manage the execution threads underneath. Async remote calls are very
>>>common in most of our streaming applications today.
>>>- Many jobs are running on a multi-tenancy cluster. Async processing
>>>helps for less resource usage and fast computation (less context switch).
>>>
>>> I asked about the async support in a previous email thread. The
>>> following API was mentioned in the reply:
>>>
>>>   new DoFn() {
>>> @ProcessElement
>>> public void process(@Element CompletionStage element, ...) {
>>>   element.thenApply(...)
>>> }
>>>   }
>>>
>>> We are wondering whether there are any discussions on this API and
>>> related docs. It is awesome that you guys already considered having DoFn to
>>> process asynchronously. Out of curiosity, this API seems to create a
>>> CompletionState out of the input element (probably using framework's
>>> executor) and then allow user to chain on it. To us, it seems more
>>> convenient if the DoFn output a CompletionStage or pass in a
>>> CompletionStage to invoke upon completion.
>>>
>>> We would like to discuss further on the async API and hopefully we will
>>> have a great support in Beam. Really appreciate the feedback!
>>>
>>> Thanks,
>>> Xinyu
>>>
>>


Re: FileIOTest.testMatchWatchForNewFiles flakey in java presubmit

2019-01-22 Thread Alex Amato
Thanks Udi, is there a good example for either of these?
#1 - seems like you have to rewrite your assertion logic without the
PAssert? Is there some way to capture the pipeline output and iterate over
it? The pattern I have seen for this in the past also has thread safety
issues (Using a DoFn at the end of the pipeline to add the output to a
collection is not safe since the collection can be executed concurrently)
#2 - Would BigqueryMatcher be a good example for this? which is used in
BigQueryTornadoesIT.java Or is there another example you would suggest
looking at for reference?

   - I guess to this you need to implement the SerializableMatcher
   interface and use the matcher as an option in the pipeline options.


On Tue, Jan 22, 2019 at 4:28 PM Udi Meiri  wrote:

> Some options:
> - You could wait to assert until after p.waitForFinish().
> - You could PAssert using SerializableMatcher and allow any
> lastModifiedTime.
>
> On Tue, Jan 22, 2019 at 3:56 PM Alex Amato  wrote:
>
>> +Jeff, Eugene,
>>
>> Hi Jeff and Eugene,
>>
>> I've noticed that Jeff's PR
>> 
>>  introduced
>> a race condition in this test, but its not clear exactly how to add Jeff's
>> test check in a thread safe way. I believe this to be the source of the
>> flakeyness Do you have any suggestions Eugene (since you authored this
>> test)?
>>
>> I added some details to this JIRA issue explaining in full
>> https://jira.apache.org/jira/browse/BEAM-6491?filter=-2
>>
>>
>> On Tue, Jan 22, 2019 at 3:34 PM Alex Amato  wrote:
>>
>>> I've seen this fail in a few different PRs for different contributors,
>>> and its causing some issues during the presubmit process.. This is a
>>> multithreadred test with a lot of sleeps, so it looks a bit suspicious as
>>> the source of the problem.
>>>
>>> https://builds.apache.org/job/beam_PreCommit_Java_Commit/3688/testReport/org.apache.beam.sdk.io/FileIOTest/testMatchWatchForNewFiles/
>>>
>>> I filed a JIRA for this issue:
>>> https://jira.apache.org/jira/browse/BEAM-6491?filter=-2
>>>
>>>
>>>


Re: FileIOTest.testMatchWatchForNewFiles flakey in java presubmit

2019-01-22 Thread Udi Meiri
Some options:
- You could wait to assert until after p.waitForFinish().
- You could PAssert using SerializableMatcher and allow any
lastModifiedTime.

On Tue, Jan 22, 2019 at 3:56 PM Alex Amato  wrote:

> +Jeff, Eugene,
>
> Hi Jeff and Eugene,
>
> I've noticed that Jeff's PR
> 
>  introduced
> a race condition in this test, but its not clear exactly how to add Jeff's
> test check in a thread safe way. I believe this to be the source of the
> flakeyness Do you have any suggestions Eugene (since you authored this
> test)?
>
> I added some details to this JIRA issue explaining in full
> https://jira.apache.org/jira/browse/BEAM-6491?filter=-2
>
>
> On Tue, Jan 22, 2019 at 3:34 PM Alex Amato  wrote:
>
>> I've seen this fail in a few different PRs for different contributors,
>> and its causing some issues during the presubmit process.. This is a
>> multithreadred test with a lot of sleeps, so it looks a bit suspicious as
>> the source of the problem.
>>
>> https://builds.apache.org/job/beam_PreCommit_Java_Commit/3688/testReport/org.apache.beam.sdk.io/FileIOTest/testMatchWatchForNewFiles/
>>
>> I filed a JIRA for this issue:
>> https://jira.apache.org/jira/browse/BEAM-6491?filter=-2
>>
>>
>>


smime.p7s
Description: S/MIME Cryptographic Signature


Re: FileIOTest.testMatchWatchForNewFiles flakey in java presubmit

2019-01-22 Thread Ruoyun Huang
+1  getting the same issue as well.

I saw there were @Ignore on those tests before. If it is not critical and
just caused by the way how we do test, does it make sense to put those
@Ignores back until it's resolved?


On Tue, Jan 22, 2019 at 3:35 PM Alex Amato  wrote:

> I've seen this fail in a few different PRs for different contributors, and
> its causing some issues during the presubmit process.. This is a
> multithreadred test with a lot of sleeps, so it looks a bit suspicious as
> the source of the problem.
>
> https://builds.apache.org/job/beam_PreCommit_Java_Commit/3688/testReport/org.apache.beam.sdk.io/FileIOTest/testMatchWatchForNewFiles/
>
> I filed a JIRA for this issue:
> https://jira.apache.org/jira/browse/BEAM-6491?filter=-2
>
>
>

-- 

Ruoyun  Huang


Re: FileIOTest.testMatchWatchForNewFiles flakey in java presubmit

2019-01-22 Thread Alex Amato
+Jeff, Eugene,

Hi Jeff and Eugene,

I've noticed that Jeff's PR

introduced
a race condition in this test, but its not clear exactly how to add Jeff's
test check in a thread safe way. I believe this to be the source of the
flakeyness Do you have any suggestions Eugene (since you authored this
test)?

I added some details to this JIRA issue explaining in full
https://jira.apache.org/jira/browse/BEAM-6491?filter=-2


On Tue, Jan 22, 2019 at 3:34 PM Alex Amato  wrote:

> I've seen this fail in a few different PRs for different contributors, and
> its causing some issues during the presubmit process.. This is a
> multithreadred test with a lot of sleeps, so it looks a bit suspicious as
> the source of the problem.
>
> https://builds.apache.org/job/beam_PreCommit_Java_Commit/3688/testReport/org.apache.beam.sdk.io/FileIOTest/testMatchWatchForNewFiles/
>
> I filed a JIRA for this issue:
> https://jira.apache.org/jira/browse/BEAM-6491?filter=-2
>
>
>


FileIOTest.testMatchWatchForNewFiles flakey in java presubmit

2019-01-22 Thread Alex Amato
I've seen this fail in a few different PRs for different contributors, and
its causing some issues during the presubmit process.. This is a
multithreadred test with a lot of sleeps, so it looks a bit suspicious as
the source of the problem.
https://builds.apache.org/job/beam_PreCommit_Java_Commit/3688/testReport/org.apache.beam.sdk.io/FileIOTest/testMatchWatchForNewFiles/

I filed a JIRA for this issue:
https://jira.apache.org/jira/browse/BEAM-6491?filter=-2


Re: Cross-language pipelines

2019-01-22 Thread Kenneth Knowles
Nice! If I recall correctly, there was mostly concern about how to launch
and manage the expansion service (Docker? Vendor-specific? Etc). Does this
PR a position on that question?

Kenn

On Tue, Jan 22, 2019 at 1:44 PM Chamikara Jayalath 
wrote:

>
>
> On Tue, Jan 22, 2019 at 11:35 AM Udi Meiri  wrote:
>
>> Also debugability: collecting logs from each of these systems.
>>
>
> Agree.
>
>
>>
>> On Tue, Jan 22, 2019 at 10:53 AM Chamikara Jayalath 
>> wrote:
>>
>>> Thanks Robert.
>>>
>>> On Tue, Jan 22, 2019 at 4:39 AM Robert Bradshaw 
>>> wrote:
>>>
 Now that we have the FnAPI, I started playing around with support for
 cross-language pipelines. This will allow things like IOs to be shared
 across all languages, SQL to be invoked from non-Java, TFX tensorflow
 transforms to be invoked from non-Python, etc. and I think is the next
 step in extending (and taking advantage of) the portability layer
 we've developed. These are often composite transforms whose inner
 structure depends in non-trivial ways on their configuration.

>>>
>>> Some additional benefits of cross-language transforms are given below.
>>>
>>> (1) Current large collection of Java IO connectors will be become
>>> available to other languages.
>>> (2) Current Java and Python transforms will be available for Go and any
>>> other future SDKs.
>>> (3) New transform authors will be able to pick their language of choice
>>> and make their transform available to all Beam SDKs. For example, this can
>>> be the language the transform author is most familiar with or the only
>>> language for which a client library is available for connecting to an
>>> external data store.
>>>
>>>
 I created a PR [1] that basically follows the "expand via an external
 process" over RPC alternative from the proposals we came up with when
 we were discussing this last time [2]. There are still some unknowns,
 e.g. how to handle artifacts supplied by an alternative SDK (they
 currently must be provided by the environment), but I think this is a
 good incremental step forward that will already be useful in a large
 number of cases. It would be good to validate the general direction
 and I would be interested in any feedback others may have on it.

>>>
>>> I think there are multiple semi-dependent problems we have to tackle to
>>> reach the final goal of supporting fully-fledged cross-language transforms
>>> in Beam. I agree with taking an incremental approach here with overall
>>> vision in mind. Some other problems we have to tackle involve following.
>>>
>>> * Defining a user API that will allow pipelines defined in a SDK X to
>>> use transforms defined in SDK Y.
>>> * Update various runners to use URN/payload based environment definition
>>> [1]
>>> * Updating various runners to support starting containers for multiple
>>> environments/languages for the same pipeline and supporting executing
>>> pipeline steps in containers started for multiple environments.
>>>
>>
> I've been working with +Heejong Lee  to add some of
> the missing pieces mentioned above.
>
> We created following doc that captures some of the ongoing work related to
> cross-language transforms and which will hopefully serve as a knowledge
> base for anybody who wish to quickly learn context related to this.
> Feel free to refer to this and/or add to this.
>
>
> https://docs.google.com/document/d/1H3yCyVFI9xYs1jsiF1GfrDtARgWGnLDEMwG5aQIx2AU/edit?usp=sharing
>
>
>
>>
>>> Thanks,
>>> Cham
>>>
>>> [1]
>>> https://github.com/apache/beam/blob/master/model/pipeline/src/main/proto/beam_runner_api.proto#L952
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>

 - Robert

 [1] https://github.com/apache/beam/pull/7316
 [2] https://s.apache.org/beam-mixed-language-pipelines

>>>


Re: :beam-sdks-python:docs fails with docs invocation failure

2019-01-22 Thread Valentyn Tymofieiev
Hi, I just opened https://issues.apache.org/jira/browse/BEAM-6489, and plan
to look into this.

On Tue, Jan 22, 2019 at 3:13 PM Mikhail Gryzykhin  wrote:

> Hi everyone,
>
> I see python precommit tests fail with
>
> no such option: --process-dependency-links
>
>
> Supposedly when invoking pip install.
>
>
> Examples:
> https://builds.apache.org/job/beam_PreCommit_Python_Commit/3752/console
> https://builds.apache.org/job/beam_PreCommit_Python_Phrase/152/console
> More can be found on jenkins.
>
> I'm new to python part of beam, so it is hard for me to figure out root
> cause.
>
> Does anyone have ideas what might cause this? Cron jobs seem to pass fine.
>
> Regards,
> --Mikhail
>
> Have feedback ?
>


:beam-sdks-python:docs fails with docs invocation failure

2019-01-22 Thread Mikhail Gryzykhin
Hi everyone,

I see python precommit tests fail with

no such option: --process-dependency-links


Supposedly when invoking pip install.


Examples:
https://builds.apache.org/job/beam_PreCommit_Python_Commit/3752/console
https://builds.apache.org/job/beam_PreCommit_Python_Phrase/152/console
More can be found on jenkins.

I'm new to python part of beam, so it is hard for me to figure out root
cause.

Does anyone have ideas what might cause this? Cron jobs seem to pass fine.

Regards,
--Mikhail

Have feedback ?


Re: [DISCUSSION] ParDo Async Java API

2019-01-22 Thread Steve Niemitz
I'd love to see something like this as well.  Also +1 to process(@Element
InputT element, @Output OutputReceiver>).  I don't
know if there's much benefit to passing a future in, since the framework
itself could hook up the process function to complete when the future
completes.

I feel like I've spent a bunch of time writing very similar "kick off a
future in ProcessElement, join it in FinishBundle" code, and looking around
beam itself a lot of built-in transforms do it as well.  Scio provides a
few AsyncDoFn implementations [1] but it'd be great to see this as a
first-class concept in beam itself.  Doing error handling, concurrency, etc
correctly can be tricky.

[1]
https://github.com/spotify/scio/blob/master/scio-core/src/main/java/com/spotify/scio/transforms/BaseAsyncDoFn.java

On Tue, Jan 22, 2019 at 5:39 PM Kenneth Knowles  wrote:

> If the input is a CompletionStage then the output should also be a
> CompletionStage, since all you should do is async chaining. We
> could enforce this by giving the DoFn an
> OutputReceiver(CompletionStage).
>
> Another possibility that might be even more robust against poor future use
> could be process(@Element InputT element, @Output
> OutputReceiver>). In this way, the process method
> itself will be async chained, rather than counting on the user to do the
> right thing.
>
> We should see how these look in real use cases. The way that processing is
> split between @ProcessElement and @FinishBundle might complicate things.
>
> Kenn
>
> On Tue, Jan 22, 2019 at 12:44 PM Xinyu Liu  wrote:
>
>> Hi, guys,
>>
>> As more users try out Beam running on the SamzaRunner, we got a lot of
>> asks for an asynchronous processing API. There are a few reasons for these
>> asks:
>>
>>- The users here are experienced in asynchronous programming. With
>>async frameworks such as Netty and ParSeq and libs like async jersey
>>client, they are able to make remote calls efficiently and the libraries
>>help manage the execution threads underneath. Async remote calls are very
>>common in most of our streaming applications today.
>>- Many jobs are running on a multi-tenancy cluster. Async processing
>>helps for less resource usage and fast computation (less context switch).
>>
>> I asked about the async support in a previous email thread. The following
>> API was mentioned in the reply:
>>
>>   new DoFn() {
>> @ProcessElement
>> public void process(@Element CompletionStage element, ...) {
>>   element.thenApply(...)
>> }
>>   }
>>
>> We are wondering whether there are any discussions on this API and
>> related docs. It is awesome that you guys already considered having DoFn to
>> process asynchronously. Out of curiosity, this API seems to create a
>> CompletionState out of the input element (probably using framework's
>> executor) and then allow user to chain on it. To us, it seems more
>> convenient if the DoFn output a CompletionStage or pass in a
>> CompletionStage to invoke upon completion.
>>
>> We would like to discuss further on the async API and hopefully we will
>> have a great support in Beam. Really appreciate the feedback!
>>
>> Thanks,
>> Xinyu
>>
>


Re: [DISCUSSION] ParDo Async Java API

2019-01-22 Thread Kenneth Knowles
If the input is a CompletionStage then the output should also be a
CompletionStage, since all you should do is async chaining. We
could enforce this by giving the DoFn an
OutputReceiver(CompletionStage).

Another possibility that might be even more robust against poor future use
could be process(@Element InputT element, @Output
OutputReceiver>). In this way, the process method
itself will be async chained, rather than counting on the user to do the
right thing.

We should see how these look in real use cases. The way that processing is
split between @ProcessElement and @FinishBundle might complicate things.

Kenn

On Tue, Jan 22, 2019 at 12:44 PM Xinyu Liu  wrote:

> Hi, guys,
>
> As more users try out Beam running on the SamzaRunner, we got a lot of
> asks for an asynchronous processing API. There are a few reasons for these
> asks:
>
>- The users here are experienced in asynchronous programming. With
>async frameworks such as Netty and ParSeq and libs like async jersey
>client, they are able to make remote calls efficiently and the libraries
>help manage the execution threads underneath. Async remote calls are very
>common in most of our streaming applications today.
>- Many jobs are running on a multi-tenancy cluster. Async processing
>helps for less resource usage and fast computation (less context switch).
>
> I asked about the async support in a previous email thread. The following
> API was mentioned in the reply:
>
>   new DoFn() {
> @ProcessElement
> public void process(@Element CompletionStage element, ...) {
>   element.thenApply(...)
> }
>   }
>
> We are wondering whether there are any discussions on this API and related
> docs. It is awesome that you guys already considered having DoFn to process
> asynchronously. Out of curiosity, this API seems to create a
> CompletionState out of the input element (probably using framework's
> executor) and then allow user to chain on it. To us, it seems more
> convenient if the DoFn output a CompletionStage or pass in a
> CompletionStage to invoke upon completion.
>
> We would like to discuss further on the async API and hopefully we will
> have a great support in Beam. Really appreciate the feedback!
>
> Thanks,
> Xinyu
>


Re: Cross-language pipelines

2019-01-22 Thread Chamikara Jayalath
On Tue, Jan 22, 2019 at 11:35 AM Udi Meiri  wrote:

> Also debugability: collecting logs from each of these systems.
>

Agree.


>
> On Tue, Jan 22, 2019 at 10:53 AM Chamikara Jayalath 
> wrote:
>
>> Thanks Robert.
>>
>> On Tue, Jan 22, 2019 at 4:39 AM Robert Bradshaw 
>> wrote:
>>
>>> Now that we have the FnAPI, I started playing around with support for
>>> cross-language pipelines. This will allow things like IOs to be shared
>>> across all languages, SQL to be invoked from non-Java, TFX tensorflow
>>> transforms to be invoked from non-Python, etc. and I think is the next
>>> step in extending (and taking advantage of) the portability layer
>>> we've developed. These are often composite transforms whose inner
>>> structure depends in non-trivial ways on their configuration.
>>>
>>
>> Some additional benefits of cross-language transforms are given below.
>>
>> (1) Current large collection of Java IO connectors will be become
>> available to other languages.
>> (2) Current Java and Python transforms will be available for Go and any
>> other future SDKs.
>> (3) New transform authors will be able to pick their language of choice
>> and make their transform available to all Beam SDKs. For example, this can
>> be the language the transform author is most familiar with or the only
>> language for which a client library is available for connecting to an
>> external data store.
>>
>>
>>> I created a PR [1] that basically follows the "expand via an external
>>> process" over RPC alternative from the proposals we came up with when
>>> we were discussing this last time [2]. There are still some unknowns,
>>> e.g. how to handle artifacts supplied by an alternative SDK (they
>>> currently must be provided by the environment), but I think this is a
>>> good incremental step forward that will already be useful in a large
>>> number of cases. It would be good to validate the general direction
>>> and I would be interested in any feedback others may have on it.
>>>
>>
>> I think there are multiple semi-dependent problems we have to tackle to
>> reach the final goal of supporting fully-fledged cross-language transforms
>> in Beam. I agree with taking an incremental approach here with overall
>> vision in mind. Some other problems we have to tackle involve following.
>>
>> * Defining a user API that will allow pipelines defined in a SDK X to use
>> transforms defined in SDK Y.
>> * Update various runners to use URN/payload based environment definition
>> [1]
>> * Updating various runners to support starting containers for multiple
>> environments/languages for the same pipeline and supporting executing
>> pipeline steps in containers started for multiple environments.
>>
>
I've been working with +Heejong Lee  to add some of the
missing pieces mentioned above.

We created following doc that captures some of the ongoing work related to
cross-language transforms and which will hopefully serve as a knowledge
base for anybody who wish to quickly learn context related to this.
Feel free to refer to this and/or add to this.

https://docs.google.com/document/d/1H3yCyVFI9xYs1jsiF1GfrDtARgWGnLDEMwG5aQIx2AU/edit?usp=sharing



>
>> Thanks,
>> Cham
>>
>> [1]
>> https://github.com/apache/beam/blob/master/model/pipeline/src/main/proto/beam_runner_api.proto#L952
>>
>>
>>
>>
>>
>>
>>
>>
>>>
>>> - Robert
>>>
>>> [1] https://github.com/apache/beam/pull/7316
>>> [2] https://s.apache.org/beam-mixed-language-pipelines
>>>
>>


Re: How to use "PortableRunner" in Python SDK?

2019-01-22 Thread Heejong Lee
You can also try without --streaming option. There's a separate streaming
wordcount example in the same directory.

If you want to look into the output files, it would be easier to use
external target like gs:// instead of local file.

python -m apache_beam.examples.wordcount --input=/etc/profile
--output=gs://tmp_location/py-wordcount --runner=PortableRunner
--job_endpoint=localhost:8099 --parallelism=1

On Tue, Jan 22, 2019 at 11:44 AM junwa...@gmail.com 
wrote:

> Hello,
>
> I tried to follow the instructions at
> https://beam.apache.org/roadmap/portability/#python-on-flink,
>
> 1. I installed Flink local cluster, and followed their
> SocketWindowWordCount example and confirmed  the cluster works properly.
>
> 2. Start Flink job server:
> ./gradlew :beam-runners-flink_2.11-job-server:runShadow
> -PflinkMasterUrl=localhost:8081
>
> 3. Subject the job as suggested by an earlier thread:
> python -m apache_beam.examples.wordcount --input=/etc/profile
> --output=/tmp/py-wordcount-direct --runner=PortableRunner
> --job_endpoint=localhost:8099 --parallelism=1
> --OPTIONALflink_master=localhost:8081 --streaming
>
> But got the following NullPointerException error (sorry for the long text
> below), any ideas? Thanks
>
> Jun Wan
>
>  log starts 
> [grpc-default-executor-2] INFO
> org.apache.beam.runners.flink.FlinkJobInvoker - Invoking job
> BeamApp-jwan-0121192804-387b3baa_1d32eea3-d71a-45a9-afa8-edbc66bc1d6b
> [grpc-default-executor-2] INFO
> org.apache.beam.runners.flink.FlinkJobInvocation - Starting job invocation
> BeamApp-jwan-0121192804-387b3baa_1d32eea3-d71a-45a9-afa8-edbc66bc1d6b
> [flink-runner-job-server] INFO
> org.apache.beam.runners.flink.FlinkJobInvocation - Translating pipeline to
> Flink program.
> [flink-runner-job-server] INFO
> org.apache.beam.runners.flink.FlinkExecutionEnvironments - Creating a
> Streaming Environment.
> [flink-runner-job-server] INFO
> org.apache.beam.runners.flink.FlinkExecutionEnvironments - Using Flink
> Master URL localhost:8081.
> [flink-runner-job-server] INFO
> org.apache.flink.api.java.typeutils.TypeExtractor - No fields were detected
> for class org.apache.beam.sdk.util.WindowedValue so it cannot be used as a
> POJO type and must be processed as GenericType. Please read the Flink
> documentation on "Data Types & Serialization" for details of the effect on
> performance.
> [flink-runner-job-server] INFO
> org.apache.flink.api.java.typeutils.TypeExtractor - No fields were detected
> for class org.apache.beam.sdk.util.WindowedValue so it cannot be used as a
> POJO type and must be processed as GenericType. Please read the Flink
> documentation on "Data Types & Serialization" for details of the effect on
> performance.
> [flink-runner-job-server] INFO
> org.apache.flink.api.java.typeutils.TypeExtractor - No fields were detected
> for class org.apache.beam.sdk.util.WindowedValue so it cannot be used as a
> POJO type and must be processed as GenericType. Please read the Flink
> documentation on "Data Types & Serialization" for details of the effect on
> performance.
> [flink-runner-job-server] INFO
> org.apache.flink.api.java.typeutils.TypeExtractor - No fields were detected
> for class org.apache.beam.sdk.util.WindowedValue so it cannot be used as a
> POJO type and must be processed as GenericType. Please read the Flink
> documentation on "Data Types & Serialization" for details of the effect on
> performance.
> [flink-runner-job-server] INFO
> org.apache.flink.api.java.typeutils.TypeExtractor - No fields were detected
> for class org.apache.beam.sdk.util.WindowedValue so it cannot be used as a
> POJO type and must be processed as GenericType. Please read the Flink
> documentation on "Data Types & Serialization" for details of the effect on
> performance.
> [flink-runner-job-server] INFO
> org.apache.flink.api.java.typeutils.TypeExtractor - No fields were detected
> for class org.apache.beam.sdk.util.WindowedValue so it cannot be used as a
> POJO type and must be processed as GenericType. Please read the Flink
> documentation on "Data Types & Serialization" for details of the effect on
> performance.
> [flink-runner-job-server] INFO
> org.apache.flink.api.java.typeutils.TypeExtractor - class
> org.apache.beam.sdk.transforms.join.RawUnionValue does not contain a setter
> for field unionTag
> [flink-runner-job-server] INFO
> org.apache.flink.api.java.typeutils.TypeExtractor - Class class
> org.apache.beam.sdk.transforms.join.RawUnionValue cannot be used as a POJO
> type because not all fields are valid POJO fields, and must be processed as
> GenericType. Please read the Flink documentation on "Data Types &
> Serialization" for details of the effect on performance.
> [flink-runner-job-server] INFO
> org.apache.flink.api.java.typeutils.TypeExtractor - No fields were detected
> for class org.apache.beam.sdk.util.WindowedValue so it cannot be used as a
> POJO type and must be processed as GenericType. Please read the Flink
> documentation on "Data Types & 

Re: How to use "PortableRunner" in Python SDK?

2019-01-22 Thread Ankur Goenka
Hi Jun,

This error can be because of different Flink version.
Please make sure that you are using Flink 1.5.6 for the commands you
mentioned.

Thanks,
Ankur

On Tue, Jan 22, 2019 at 11:44 AM junwa...@gmail.com 
wrote:

> Hello,
>
> I tried to follow the instructions at
> https://beam.apache.org/roadmap/portability/#python-on-flink,
>
> 1. I installed Flink local cluster, and followed their
> SocketWindowWordCount example and confirmed  the cluster works properly.
>
> 2. Start Flink job server:
> ./gradlew :beam-runners-flink_2.11-job-server:runShadow
> -PflinkMasterUrl=localhost:8081
>
> 3. Subject the job as suggested by an earlier thread:
> python -m apache_beam.examples.wordcount --input=/etc/profile
> --output=/tmp/py-wordcount-direct --runner=PortableRunner
> --job_endpoint=localhost:8099 --parallelism=1
> --OPTIONALflink_master=localhost:8081 --streaming
>
> But got the following NullPointerException error (sorry for the long text
> below), any ideas? Thanks
>
> Jun Wan
>
>  log starts 
> [grpc-default-executor-2] INFO
> org.apache.beam.runners.flink.FlinkJobInvoker - Invoking job
> BeamApp-jwan-0121192804-387b3baa_1d32eea3-d71a-45a9-afa8-edbc66bc1d6b
> [grpc-default-executor-2] INFO
> org.apache.beam.runners.flink.FlinkJobInvocation - Starting job invocation
> BeamApp-jwan-0121192804-387b3baa_1d32eea3-d71a-45a9-afa8-edbc66bc1d6b
> [flink-runner-job-server] INFO
> org.apache.beam.runners.flink.FlinkJobInvocation - Translating pipeline to
> Flink program.
> [flink-runner-job-server] INFO
> org.apache.beam.runners.flink.FlinkExecutionEnvironments - Creating a
> Streaming Environment.
> [flink-runner-job-server] INFO
> org.apache.beam.runners.flink.FlinkExecutionEnvironments - Using Flink
> Master URL localhost:8081.
> [flink-runner-job-server] INFO
> org.apache.flink.api.java.typeutils.TypeExtractor - No fields were detected
> for class org.apache.beam.sdk.util.WindowedValue so it cannot be used as a
> POJO type and must be processed as GenericType. Please read the Flink
> documentation on "Data Types & Serialization" for details of the effect on
> performance.
> [flink-runner-job-server] INFO
> org.apache.flink.api.java.typeutils.TypeExtractor - No fields were detected
> for class org.apache.beam.sdk.util.WindowedValue so it cannot be used as a
> POJO type and must be processed as GenericType. Please read the Flink
> documentation on "Data Types & Serialization" for details of the effect on
> performance.
> [flink-runner-job-server] INFO
> org.apache.flink.api.java.typeutils.TypeExtractor - No fields were detected
> for class org.apache.beam.sdk.util.WindowedValue so it cannot be used as a
> POJO type and must be processed as GenericType. Please read the Flink
> documentation on "Data Types & Serialization" for details of the effect on
> performance.
> [flink-runner-job-server] INFO
> org.apache.flink.api.java.typeutils.TypeExtractor - No fields were detected
> for class org.apache.beam.sdk.util.WindowedValue so it cannot be used as a
> POJO type and must be processed as GenericType. Please read the Flink
> documentation on "Data Types & Serialization" for details of the effect on
> performance.
> [flink-runner-job-server] INFO
> org.apache.flink.api.java.typeutils.TypeExtractor - No fields were detected
> for class org.apache.beam.sdk.util.WindowedValue so it cannot be used as a
> POJO type and must be processed as GenericType. Please read the Flink
> documentation on "Data Types & Serialization" for details of the effect on
> performance.
> [flink-runner-job-server] INFO
> org.apache.flink.api.java.typeutils.TypeExtractor - No fields were detected
> for class org.apache.beam.sdk.util.WindowedValue so it cannot be used as a
> POJO type and must be processed as GenericType. Please read the Flink
> documentation on "Data Types & Serialization" for details of the effect on
> performance.
> [flink-runner-job-server] INFO
> org.apache.flink.api.java.typeutils.TypeExtractor - class
> org.apache.beam.sdk.transforms.join.RawUnionValue does not contain a setter
> for field unionTag
> [flink-runner-job-server] INFO
> org.apache.flink.api.java.typeutils.TypeExtractor - Class class
> org.apache.beam.sdk.transforms.join.RawUnionValue cannot be used as a POJO
> type because not all fields are valid POJO fields, and must be processed as
> GenericType. Please read the Flink documentation on "Data Types &
> Serialization" for details of the effect on performance.
> [flink-runner-job-server] INFO
> org.apache.flink.api.java.typeutils.TypeExtractor - No fields were detected
> for class org.apache.beam.sdk.util.WindowedValue so it cannot be used as a
> POJO type and must be processed as GenericType. Please read the Flink
> documentation on "Data Types & Serialization" for details of the effect on
> performance.
> [flink-runner-job-server] INFO
> org.apache.flink.api.java.typeutils.TypeExtractor - No fields were detected
> for class org.apache.beam.sdk.util.WindowedValue so it cannot be used as a
> POJO type 

Re: How to use "PortableRunner" in Python SDK?

2019-01-22 Thread Ankur Goenka
Hi Jun,

Thanks for reporting the error.
This seems to be because of mismatch in SDKHarness (Docker image) and the
Python SDK. As we are actively developing, this can happen.

Can you please retry after rebuilding the docker images and the Python sdk
from master and install the python sdk to your virtual environment.

Thanks,
Ankur

On Tue, Jan 22, 2019 at 11:44 AM junwa...@gmail.com 
wrote:

> I downgraded the Flink from 1.7.1 to 1.5.6, and was able to go further,
> but still fails, here is the latest error from Flink. Thanks!
>
> the job cmd I launched : python -m apache_beam.examples.wordcount
> --input=/etc/profile --output=/tmp/py-wordcount-direct
> --runner=PortableRunner --job_endpoint=localhost:8099 --parallelism=1
> --OPTIONALflink_master=localhost:8081 --streaming
> --experiments=worker_threads=100 --execution_mode_for_batch=BATCH_FORCED
> --experiments=beam_fn_api
>
> Jun
>
>  log starts 
> [flink-runner-job-server] INFO
> org.apache.beam.runners.flink.FlinkExecutionEnvironments - Running remotely
> at localhost:8081
> [flink-runner-job-server] WARN
> org.apache.flink.configuration.Configuration - Config uses deprecated
> configuration key 'jobmanager.rpc.address' instead of proper key
> 'rest.address'
> [flink-runner-job-server] INFO org.apache.flink.runtime.rest.RestClient -
> Rest client endpoint started.
> [flink-runner-job-server] INFO
> org.apache.flink.client.program.rest.RestClusterClient - Submitting job
> 4ecb5e5cfd4718de440f48cbfaf7216a (detached: false).
> [flink-runner-job-server] INFO org.apache.flink.runtime.rest.RestClient -
> Shutting down rest endpoint.
> [flink-runner-job-server] INFO org.apache.flink.runtime.rest.RestClient -
> Rest endpoint shutdown complete.
> [flink-runner-job-server] ERROR
> org.apache.beam.runners.flink.FlinkJobInvocation - Error during job
> invocation
> BeamApp-jwan-012125-328178bb_d2dadedb-6dbf-4c1e-82d4-208a2d3177e9.
> org.apache.flink.client.program.ProgramInvocationException: Job
> 4ecb5e5cfd4718de440f48cbfaf7216a failed.
> at
> org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:265)
> at
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:464)
> at
> org.apache.beam.runners.flink.FlinkExecutionEnvironments$BeamFlinkRemoteStreamEnvironment.executeRemotely(FlinkExecutionEnvironments.java:355)
> at
> org.apache.flink.streaming.api.environment.RemoteStreamEnvironment.execute(RemoteStreamEnvironment.java:179)
> at
> org.apache.beam.runners.flink.FlinkStreamingPortablePipelineTranslator$StreamingTranslationContext.execute(FlinkStreamingPortablePipelineTranslator.java:158)
> at
> org.apache.beam.runners.flink.FlinkJobInvocation.runPipelineWithTranslator(FlinkJobInvocation.java:142)
> at
> org.apache.beam.runners.flink.FlinkJobInvocation.runPipeline(FlinkJobInvocation.java:112)
> at org.apache.beam.repackaged.beam_runners_flink_2.11.com
> .google.common.util.concurrent.TrustedListenableFutureTask$TrustedFutureInterruptibleTask.runInterruptibly(TrustedListenableFutureTask.java:111)
> at org.apache.beam.repackaged.beam_runners_flink_2.11.com
> .google.common.util.concurrent.InterruptibleTask.run(InterruptibleTask.java:58)
> at org.apache.beam.repackaged.beam_runners_flink_2.11.com
> .google.common.util.concurrent.TrustedListenableFutureTask.run(TrustedListenableFutureTask.java:75)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.flink.runtime.client.JobExecutionException: Job
> execution failed.
> at
> org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146)
> at
> org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:262)
> ... 12 more
> Caused by: java.lang.RuntimeException: Exception occurred while processing
> valve output watermark:
> at org.apache.flink.streaming.runtime.io
> .StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:265)
> at
> org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:189)
> at
> org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:111)
> at org.apache.flink.streaming.runtime.io
> .StreamInputProcessor.processInput(StreamInputProcessor.java:184)
> at
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:104)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:306)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:712)
> ... 1 more
> Caused by:
> 

[DISCUSSION] ParDo Async Java API

2019-01-22 Thread Xinyu Liu
Hi, guys,

As more users try out Beam running on the SamzaRunner, we got a lot of asks
for an asynchronous processing API. There are a few reasons for these asks:

   - The users here are experienced in asynchronous programming. With async
   frameworks such as Netty and ParSeq and libs like async jersey client, they
   are able to make remote calls efficiently and the libraries help manage the
   execution threads underneath. Async remote calls are very common in most of
   our streaming applications today.
   - Many jobs are running on a multi-tenancy cluster. Async processing
   helps for less resource usage and fast computation (less context switch).

I asked about the async support in a previous email thread. The following
API was mentioned in the reply:

  new DoFn() {
@ProcessElement
public void process(@Element CompletionStage element, ...) {
  element.thenApply(...)
}
  }

We are wondering whether there are any discussions on this API and related
docs. It is awesome that you guys already considered having DoFn to process
asynchronously. Out of curiosity, this API seems to create a
CompletionState out of the input element (probably using framework's
executor) and then allow user to chain on it. To us, it seems more
convenient if the DoFn output a CompletionStage or pass in a
CompletionStage to invoke upon completion.

We would like to discuss further on the async API and hopefully we will
have a great support in Beam. Really appreciate the feedback!

Thanks,
Xinyu


Re: Confusing sentence in Windowing section in Beam programming guide

2019-01-22 Thread Reuven Lax
Ah yes, Kenn is correct, and i forget we made that change.

To clarify - Beam does not expose late elements as a concept, rather it
exposes late panes on its triggering API. The reason we made the change was
not just because we wanted to include as much data as possible. but also
because we wanted to preserve reasonable invariants around late data (e.g.
one invariant should be that the result of processing an on-time pane
should not produce late data). Simply checking element timestamps against
the watermark is non deterministic (since the watermark advances
asynchronously), so we moved to a different definition of late data.

Reuven

On Fri, Jan 18, 2019 at 11:24 AM Kenneth Knowles  wrote:

> That is correct. For global window there is no such thing as late data.
>
> Kenn
>
> On Fri, Jan 18, 2019, 11:13 Ruoyun Huang 
>> Very helpful discussion (and the fixing PR).
>>
>> To make sure my take-way is correct. The status quo is a) "for a Global
>> Window, then there is *no possible scenario* where data is identified as
>> late".  Rather than b) "for a global window we *no longer* compare
>> watermark to identify late data, but *there is still other criteria*
>> that determines data late".
>>
>> a) is correct and b) is not.  Is that so?
>>
>> On Thu, Jan 17, 2019 at 8:57 PM Kenneth Knowles  wrote:
>>
>>> Actually, Reuven, that's no longer the case.
>>>
>>> It used to be that incoming data was compared to the watermark but it is
>>> not today. Instead, Jeff's first phrasing is perfect.
>>>
>>> One way to see it is the think about what are the consequences of late
>>> data: if there is a grouping/aggregation by key+window, the window
>>> determines when the grouping is complete. We go ahead and include any data
>>> that shows up before the window is complete. And if you set up allowed
>>> lateness it matches exactly: any data that arrives before the ON_TIME
>>> output gets to be in that output.
>>>
>>> Previously, when we compared incoming elements to the watermark
>>> directly, you could have a window that was still being aggregated but the
>>> elements that fell in the window were dropped. There was no technical
>>> benefit to losing this data, so we stopped dropping it. We also had lots of
>>> tricky bugs and hard-to-manage code related to what we do if an element
>>> arrives after the watermark. And you could have an ON_TIME firing that
>>> included a bunch of "late" data which is confusing.
>>>
>>> Now it is simple: if the window is still alive, the element goes into it.
>>>
>>> I very rarely use the term "late data" when describing Beam's semantics
>>> anyhow. I always found the term / definition a bit arbitrary.
>>>
>>> Kenn
>>>
>>> On Thu, Jan 17, 2019 at 8:13 PM Rui Wang  wrote:
>>>
 I created this PR: https://github.com/apache/beam/pull/7556

 Feel free to review/comment it.

 -Rui

 On Thu, Jan 17, 2019 at 2:37 PM Rui Wang  wrote:

> It might be better to keep something like "watermark usually
> consistently moves forward". But "Elements that arrive with a smaller
> timestamp than the current watermark are considered late data." has 
> already
> given the order of late data ts and watermark.
>
>
> -Rui
>
> On Thu, Jan 17, 2019 at 1:39 PM Jeff Klukas 
> wrote:
>
>> Reuven - I don't think I realized it was possible to have late data
>> with the global window, so I'm definitely learning things through this
>> discussion.
>>
>> New suggested wording, then:
>>
>> Elements that arrive with a smaller timestamp than the current
>> watermark are considered late data.
>>
>> That says basically the same thing as the wording currently in the
>> guide, but uses "smaller" (which implies a less-than-watermark 
>> comparison)
>> rather than "later" (which folks have interpreted as a
>> greater-than-watermark comparison).
>>
>> On Thu, Jan 17, 2019 at 3:40 PM Reuven Lax  wrote:
>>
>>> Though it's not tied to window. You could be in the global window,
>>> so the watermark never advances past the end of the window, yet still 
>>> get
>>> late data.
>>>
>>> On Thu, Jan 17, 2019, 11:14 AM Jeff Klukas >> wrote:
>>>
 How about: "Once the watermark progresses past the end of a window,
 any further elements that arrive with a timestamp in that window are
 considered late data."

 On Thu, Jan 17, 2019 at 1:43 PM Rui Wang  wrote:

> Hi Community,
>
> In Beam programming guide [1], there is a sentence: "Data that
> arrives with a timestamp after the watermark is considered *late
> data*"
>
> Seems like people get confused by it. For example, see
> Stackoverflow comment [2]. Basically it makes people think that a 
> event
> timestamp that is bigger than watermark is considered late (due to 
> that
> 

Re: Our jenkins beam1 server is down

2019-01-22 Thread Yifan Zou
The inventory test on the beam1 passed. The beam1 is back to normal.
https://builds.apache.org/job/beam_Inventory_beam1/303/

On Tue, Jan 22, 2019 at 11:41 AM Yifan Zou  wrote:

> Thanks for reporting the failures. Just disconnect and reconnect beam1. I
> am creating a PR that force run a job on that agent to verify.
>
> On Tue, Jan 22, 2019 at 11:08 AM Ankur Goenka  wrote:
>
>> Beam 1 seems to be down again
>>
>> https://builds.apache.org/job/beam_PreCommit_Portable_Python_Phrase/88/console
>>
>> https://builds.apache.org/job/beam_PostCommit_Python_VR_Flink_PR/141/console
>>
>> On Tue, Jan 22, 2019 at 10:53 AM Yifan Zou  wrote:
>>
>>> The beam1 and 14 are back and building.
>>>
>>> On Thu, Jan 17, 2019 at 7:04 AM Ismaël Mejía  wrote:
>>>
 Thanks Yifan for taking care.

 On Thu, Jan 17, 2019 at 1:24 AM Yifan Zou  wrote:
 >
 > Yes, beam14 is offline as well. We're on it.
 >
 > On Wed, Jan 16, 2019 at 4:11 PM Ruoyun Huang 
 wrote:
 >>
 >> With another try, succeeding on beam10.
 >>
 >> Thanks for the fix.
 >>
 >> On Wed, Jan 16, 2019 at 3:53 PM Ruoyun Huang 
 wrote:
 >>>
 >>> Just did a rerun, got error saying "10:12:21 ERROR: beam14 is
 offline; cannot locate JDK 1.8 (latest)".
 >>>
 >>> Beam1 is not the only one broken?
 >>>
 >>> On Wed, Jan 16, 2019 at 3:45 PM Yifan Zou 
 wrote:
 
  The beam1 was still accepting jobs and breaking them after reset
 this morning. We temporarily disconnect it so that jobs could be scheduled
 on healthy nodes. Infra is making efforts to fix beam1.
 
  On Wed, Jan 16, 2019 at 11:15 AM Yifan Zou 
 wrote:
 >
 > The VM instance was reset and Infra is trying to repuppetize it.
 https://issues.apache.org/jira/browse/INFRA-17672 is created to track
 this issue.
 >
 > On Wed, Jan 16, 2019 at 10:51 AM Mark Liu 
 wrote:
 >>
 >> Thanks you Yifan!
 >>
 >> Looks like following precommits are affected according to my PR:
 >>
 >> Java_Examples_Dataflow,
 >> Portable_Python,
 >> Website_Stage_GCS
 >>
 >> On Wed, Jan 16, 2019 at 9:25 AM Yifan Zou 
 wrote:
 >>>
 >>> I am looking on it.
 >>>
 >>> On Wed, Jan 16, 2019 at 8:18 AM Ismaël Mejía 
 wrote:
 
  Can somebody PTAL. Sadly the poor jenkins shuffling algorithm
 is
  sending most builds to it so there are issues to validate some
 PRs.
 >>>
 >>>
 >>>
 >>> --
 >>> 
 >>> Ruoyun  Huang
 >>>
 >>
 >>
 >> --
 >> 
 >> Ruoyun  Huang
 >>

>>>


Re: Confluence wiki edit access request

2019-01-22 Thread Udi Meiri
bump

On Fri, Jan 18, 2019 at 1:57 PM Udi Meiri  wrote:

> username: udim
>
> Thanks!
>


smime.p7s
Description: S/MIME Cryptographic Signature


Re: How to use "PortableRunner" in Python SDK?

2019-01-22 Thread junwan01
I downgraded the Flink from 1.7.1 to 1.5.6, and was able to go further, but 
still fails, here is the latest error from Flink. Thanks!

the job cmd I launched : python -m apache_beam.examples.wordcount 
--input=/etc/profile --output=/tmp/py-wordcount-direct --runner=PortableRunner 
--job_endpoint=localhost:8099 --parallelism=1 
--OPTIONALflink_master=localhost:8081 --streaming 
--experiments=worker_threads=100 --execution_mode_for_batch=BATCH_FORCED 
--experiments=beam_fn_api

Jun
 
 log starts 
[flink-runner-job-server] INFO 
org.apache.beam.runners.flink.FlinkExecutionEnvironments - Running remotely at 
localhost:8081
[flink-runner-job-server] WARN org.apache.flink.configuration.Configuration - 
Config uses deprecated configuration key 'jobmanager.rpc.address' instead of 
proper key 'rest.address'
[flink-runner-job-server] INFO org.apache.flink.runtime.rest.RestClient - Rest 
client endpoint started.
[flink-runner-job-server] INFO 
org.apache.flink.client.program.rest.RestClusterClient - Submitting job 
4ecb5e5cfd4718de440f48cbfaf7216a (detached: false).
[flink-runner-job-server] INFO org.apache.flink.runtime.rest.RestClient - 
Shutting down rest endpoint.
[flink-runner-job-server] INFO org.apache.flink.runtime.rest.RestClient - Rest 
endpoint shutdown complete.
[flink-runner-job-server] ERROR 
org.apache.beam.runners.flink.FlinkJobInvocation - Error during job invocation 
BeamApp-jwan-012125-328178bb_d2dadedb-6dbf-4c1e-82d4-208a2d3177e9.
org.apache.flink.client.program.ProgramInvocationException: Job 
4ecb5e5cfd4718de440f48cbfaf7216a failed.
at 
org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:265)
at 
org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:464)
at 
org.apache.beam.runners.flink.FlinkExecutionEnvironments$BeamFlinkRemoteStreamEnvironment.executeRemotely(FlinkExecutionEnvironments.java:355)
at 
org.apache.flink.streaming.api.environment.RemoteStreamEnvironment.execute(RemoteStreamEnvironment.java:179)
at 
org.apache.beam.runners.flink.FlinkStreamingPortablePipelineTranslator$StreamingTranslationContext.execute(FlinkStreamingPortablePipelineTranslator.java:158)
at 
org.apache.beam.runners.flink.FlinkJobInvocation.runPipelineWithTranslator(FlinkJobInvocation.java:142)
at 
org.apache.beam.runners.flink.FlinkJobInvocation.runPipeline(FlinkJobInvocation.java:112)
at 
org.apache.beam.repackaged.beam_runners_flink_2.11.com.google.common.util.concurrent.TrustedListenableFutureTask$TrustedFutureInterruptibleTask.runInterruptibly(TrustedListenableFutureTask.java:111)
at 
org.apache.beam.repackaged.beam_runners_flink_2.11.com.google.common.util.concurrent.InterruptibleTask.run(InterruptibleTask.java:58)
at 
org.apache.beam.repackaged.beam_runners_flink_2.11.com.google.common.util.concurrent.TrustedListenableFutureTask.run(TrustedListenableFutureTask.java:75)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution 
failed.
at 
org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146)
at 
org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:262)
... 12 more
Caused by: java.lang.RuntimeException: Exception occurred while processing 
valve output watermark:
at 
org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:265)
at 
org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:189)
at 
org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:111)
at 
org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:184)
at 
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:104)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:306)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:712)
... 1 more
Caused by: 
org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: 
Could not forward element to next operator
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.emitWatermark(OperatorChain.java:483)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.emitWatermark(AbstractStreamOperator.java:694)
at 
org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.emitWatermark(DoFnOperator.java:591)
at 

Re: How to use "PortableRunner" in Python SDK?

2019-01-22 Thread junwan01
Hello,

I tried to follow the instructions at 
https://beam.apache.org/roadmap/portability/#python-on-flink, 

1. I installed Flink local cluster, and followed their SocketWindowWordCount 
example and confirmed  the cluster works properly.

2. Start Flink job server:
./gradlew :beam-runners-flink_2.11-job-server:runShadow 
-PflinkMasterUrl=localhost:8081

3. Subject the job as suggested by an earlier thread:
python -m apache_beam.examples.wordcount --input=/etc/profile 
--output=/tmp/py-wordcount-direct --runner=PortableRunner 
--job_endpoint=localhost:8099 --parallelism=1 
--OPTIONALflink_master=localhost:8081 --streaming

But got the following NullPointerException error (sorry for the long text 
below), any ideas? Thanks

Jun Wan

 log starts 
[grpc-default-executor-2] INFO org.apache.beam.runners.flink.FlinkJobInvoker - 
Invoking job 
BeamApp-jwan-0121192804-387b3baa_1d32eea3-d71a-45a9-afa8-edbc66bc1d6b
[grpc-default-executor-2] INFO org.apache.beam.runners.flink.FlinkJobInvocation 
- Starting job invocation 
BeamApp-jwan-0121192804-387b3baa_1d32eea3-d71a-45a9-afa8-edbc66bc1d6b
[flink-runner-job-server] INFO org.apache.beam.runners.flink.FlinkJobInvocation 
- Translating pipeline to Flink program.
[flink-runner-job-server] INFO 
org.apache.beam.runners.flink.FlinkExecutionEnvironments - Creating a Streaming 
Environment.
[flink-runner-job-server] INFO 
org.apache.beam.runners.flink.FlinkExecutionEnvironments - Using Flink Master 
URL localhost:8081.
[flink-runner-job-server] INFO 
org.apache.flink.api.java.typeutils.TypeExtractor - No fields were detected for 
class org.apache.beam.sdk.util.WindowedValue so it cannot be used as a POJO 
type and must be processed as GenericType. Please read the Flink documentation 
on "Data Types & Serialization" for details of the effect on performance.
[flink-runner-job-server] INFO 
org.apache.flink.api.java.typeutils.TypeExtractor - No fields were detected for 
class org.apache.beam.sdk.util.WindowedValue so it cannot be used as a POJO 
type and must be processed as GenericType. Please read the Flink documentation 
on "Data Types & Serialization" for details of the effect on performance.
[flink-runner-job-server] INFO 
org.apache.flink.api.java.typeutils.TypeExtractor - No fields were detected for 
class org.apache.beam.sdk.util.WindowedValue so it cannot be used as a POJO 
type and must be processed as GenericType. Please read the Flink documentation 
on "Data Types & Serialization" for details of the effect on performance.
[flink-runner-job-server] INFO 
org.apache.flink.api.java.typeutils.TypeExtractor - No fields were detected for 
class org.apache.beam.sdk.util.WindowedValue so it cannot be used as a POJO 
type and must be processed as GenericType. Please read the Flink documentation 
on "Data Types & Serialization" for details of the effect on performance.
[flink-runner-job-server] INFO 
org.apache.flink.api.java.typeutils.TypeExtractor - No fields were detected for 
class org.apache.beam.sdk.util.WindowedValue so it cannot be used as a POJO 
type and must be processed as GenericType. Please read the Flink documentation 
on "Data Types & Serialization" for details of the effect on performance.
[flink-runner-job-server] INFO 
org.apache.flink.api.java.typeutils.TypeExtractor - No fields were detected for 
class org.apache.beam.sdk.util.WindowedValue so it cannot be used as a POJO 
type and must be processed as GenericType. Please read the Flink documentation 
on "Data Types & Serialization" for details of the effect on performance.
[flink-runner-job-server] INFO 
org.apache.flink.api.java.typeutils.TypeExtractor - class 
org.apache.beam.sdk.transforms.join.RawUnionValue does not contain a setter for 
field unionTag
[flink-runner-job-server] INFO 
org.apache.flink.api.java.typeutils.TypeExtractor - Class class 
org.apache.beam.sdk.transforms.join.RawUnionValue cannot be used as a POJO type 
because not all fields are valid POJO fields, and must be processed as 
GenericType. Please read the Flink documentation on "Data Types & 
Serialization" for details of the effect on performance.
[flink-runner-job-server] INFO 
org.apache.flink.api.java.typeutils.TypeExtractor - No fields were detected for 
class org.apache.beam.sdk.util.WindowedValue so it cannot be used as a POJO 
type and must be processed as GenericType. Please read the Flink documentation 
on "Data Types & Serialization" for details of the effect on performance.
[flink-runner-job-server] INFO 
org.apache.flink.api.java.typeutils.TypeExtractor - No fields were detected for 
class org.apache.beam.sdk.util.WindowedValue so it cannot be used as a POJO 
type and must be processed as GenericType. Please read the Flink documentation 
on "Data Types & Serialization" for details of the effect on performance.
[flink-runner-job-server] INFO 
org.apache.flink.api.java.typeutils.TypeExtractor - No fields were detected for 
class org.apache.beam.sdk.util.WindowedValue so it cannot be used as a POJO 
type and 

Re: Our jenkins beam1 server is down

2019-01-22 Thread Yifan Zou
Thanks for reporting the failures. Just disconnect and reconnect beam1. I
am creating a PR that force run a job on that agent to verify.

On Tue, Jan 22, 2019 at 11:08 AM Ankur Goenka  wrote:

> Beam 1 seems to be down again
>
> https://builds.apache.org/job/beam_PreCommit_Portable_Python_Phrase/88/console
>
> https://builds.apache.org/job/beam_PostCommit_Python_VR_Flink_PR/141/console
>
> On Tue, Jan 22, 2019 at 10:53 AM Yifan Zou  wrote:
>
>> The beam1 and 14 are back and building.
>>
>> On Thu, Jan 17, 2019 at 7:04 AM Ismaël Mejía  wrote:
>>
>>> Thanks Yifan for taking care.
>>>
>>> On Thu, Jan 17, 2019 at 1:24 AM Yifan Zou  wrote:
>>> >
>>> > Yes, beam14 is offline as well. We're on it.
>>> >
>>> > On Wed, Jan 16, 2019 at 4:11 PM Ruoyun Huang 
>>> wrote:
>>> >>
>>> >> With another try, succeeding on beam10.
>>> >>
>>> >> Thanks for the fix.
>>> >>
>>> >> On Wed, Jan 16, 2019 at 3:53 PM Ruoyun Huang 
>>> wrote:
>>> >>>
>>> >>> Just did a rerun, got error saying "10:12:21 ERROR: beam14 is
>>> offline; cannot locate JDK 1.8 (latest)".
>>> >>>
>>> >>> Beam1 is not the only one broken?
>>> >>>
>>> >>> On Wed, Jan 16, 2019 at 3:45 PM Yifan Zou 
>>> wrote:
>>> 
>>>  The beam1 was still accepting jobs and breaking them after reset
>>> this morning. We temporarily disconnect it so that jobs could be scheduled
>>> on healthy nodes. Infra is making efforts to fix beam1.
>>> 
>>>  On Wed, Jan 16, 2019 at 11:15 AM Yifan Zou 
>>> wrote:
>>> >
>>> > The VM instance was reset and Infra is trying to repuppetize it.
>>> https://issues.apache.org/jira/browse/INFRA-17672 is created to track
>>> this issue.
>>> >
>>> > On Wed, Jan 16, 2019 at 10:51 AM Mark Liu 
>>> wrote:
>>> >>
>>> >> Thanks you Yifan!
>>> >>
>>> >> Looks like following precommits are affected according to my PR:
>>> >>
>>> >> Java_Examples_Dataflow,
>>> >> Portable_Python,
>>> >> Website_Stage_GCS
>>> >>
>>> >> On Wed, Jan 16, 2019 at 9:25 AM Yifan Zou 
>>> wrote:
>>> >>>
>>> >>> I am looking on it.
>>> >>>
>>> >>> On Wed, Jan 16, 2019 at 8:18 AM Ismaël Mejía 
>>> wrote:
>>> 
>>>  Can somebody PTAL. Sadly the poor jenkins shuffling algorithm is
>>>  sending most builds to it so there are issues to validate some
>>> PRs.
>>> >>>
>>> >>>
>>> >>>
>>> >>> --
>>> >>> 
>>> >>> Ruoyun  Huang
>>> >>>
>>> >>
>>> >>
>>> >> --
>>> >> 
>>> >> Ruoyun  Huang
>>> >>
>>>
>>


Re: Cross-language pipelines

2019-01-22 Thread Udi Meiri
Also debugability: collecting logs from each of these systems.

On Tue, Jan 22, 2019 at 10:53 AM Chamikara Jayalath 
wrote:

> Thanks Robert.
>
> On Tue, Jan 22, 2019 at 4:39 AM Robert Bradshaw 
> wrote:
>
>> Now that we have the FnAPI, I started playing around with support for
>> cross-language pipelines. This will allow things like IOs to be shared
>> across all languages, SQL to be invoked from non-Java, TFX tensorflow
>> transforms to be invoked from non-Python, etc. and I think is the next
>> step in extending (and taking advantage of) the portability layer
>> we've developed. These are often composite transforms whose inner
>> structure depends in non-trivial ways on their configuration.
>>
>
> Some additional benefits of cross-language transforms are given below.
>
> (1) Current large collection of Java IO connectors will be become
> available to other languages.
> (2) Current Java and Python transforms will be available for Go and any
> other future SDKs.
> (3) New transform authors will be able to pick their language of choice
> and make their transform available to all Beam SDKs. For example, this can
> be the language the transform author is most familiar with or the only
> language for which a client library is available for connecting to an
> external data store.
>
>
>> I created a PR [1] that basically follows the "expand via an external
>> process" over RPC alternative from the proposals we came up with when
>> we were discussing this last time [2]. There are still some unknowns,
>> e.g. how to handle artifacts supplied by an alternative SDK (they
>> currently must be provided by the environment), but I think this is a
>> good incremental step forward that will already be useful in a large
>> number of cases. It would be good to validate the general direction
>> and I would be interested in any feedback others may have on it.
>>
>
> I think there are multiple semi-dependent problems we have to tackle to
> reach the final goal of supporting fully-fledged cross-language transforms
> in Beam. I agree with taking an incremental approach here with overall
> vision in mind. Some other problems we have to tackle involve following.
>
> * Defining a user API that will allow pipelines defined in a SDK X to use
> transforms defined in SDK Y.
> * Update various runners to use URN/payload based environment definition
> [1]
> * Updating various runners to support starting containers for multiple
> environments/languages for the same pipeline and supporting executing
> pipeline steps in containers started for multiple environments.
>
> Thanks,
> Cham
>
> [1]
> https://github.com/apache/beam/blob/master/model/pipeline/src/main/proto/beam_runner_api.proto#L952
>
>
>
>
>
>
>
>
>>
>> - Robert
>>
>> [1] https://github.com/apache/beam/pull/7316
>> [2] https://s.apache.org/beam-mixed-language-pipelines
>>
>


smime.p7s
Description: S/MIME Cryptographic Signature


Re: Our jenkins beam1 server is down

2019-01-22 Thread Ankur Goenka
Beam 1 seems to be down again
https://builds.apache.org/job/beam_PreCommit_Portable_Python_Phrase/88/console
https://builds.apache.org/job/beam_PostCommit_Python_VR_Flink_PR/141/console

On Tue, Jan 22, 2019 at 10:53 AM Yifan Zou  wrote:

> The beam1 and 14 are back and building.
>
> On Thu, Jan 17, 2019 at 7:04 AM Ismaël Mejía  wrote:
>
>> Thanks Yifan for taking care.
>>
>> On Thu, Jan 17, 2019 at 1:24 AM Yifan Zou  wrote:
>> >
>> > Yes, beam14 is offline as well. We're on it.
>> >
>> > On Wed, Jan 16, 2019 at 4:11 PM Ruoyun Huang  wrote:
>> >>
>> >> With another try, succeeding on beam10.
>> >>
>> >> Thanks for the fix.
>> >>
>> >> On Wed, Jan 16, 2019 at 3:53 PM Ruoyun Huang 
>> wrote:
>> >>>
>> >>> Just did a rerun, got error saying "10:12:21 ERROR: beam14 is
>> offline; cannot locate JDK 1.8 (latest)".
>> >>>
>> >>> Beam1 is not the only one broken?
>> >>>
>> >>> On Wed, Jan 16, 2019 at 3:45 PM Yifan Zou 
>> wrote:
>> 
>>  The beam1 was still accepting jobs and breaking them after reset
>> this morning. We temporarily disconnect it so that jobs could be scheduled
>> on healthy nodes. Infra is making efforts to fix beam1.
>> 
>>  On Wed, Jan 16, 2019 at 11:15 AM Yifan Zou 
>> wrote:
>> >
>> > The VM instance was reset and Infra is trying to repuppetize it.
>> https://issues.apache.org/jira/browse/INFRA-17672 is created to track
>> this issue.
>> >
>> > On Wed, Jan 16, 2019 at 10:51 AM Mark Liu 
>> wrote:
>> >>
>> >> Thanks you Yifan!
>> >>
>> >> Looks like following precommits are affected according to my PR:
>> >>
>> >> Java_Examples_Dataflow,
>> >> Portable_Python,
>> >> Website_Stage_GCS
>> >>
>> >> On Wed, Jan 16, 2019 at 9:25 AM Yifan Zou 
>> wrote:
>> >>>
>> >>> I am looking on it.
>> >>>
>> >>> On Wed, Jan 16, 2019 at 8:18 AM Ismaël Mejía 
>> wrote:
>> 
>>  Can somebody PTAL. Sadly the poor jenkins shuffling algorithm is
>>  sending most builds to it so there are issues to validate some
>> PRs.
>> >>>
>> >>>
>> >>>
>> >>> --
>> >>> 
>> >>> Ruoyun  Huang
>> >>>
>> >>
>> >>
>> >> --
>> >> 
>> >> Ruoyun  Huang
>> >>
>>
>


Re: Our jenkins beam1 server is down

2019-01-22 Thread Yifan Zou
The beam1 and 14 are back and building.

On Thu, Jan 17, 2019 at 7:04 AM Ismaël Mejía  wrote:

> Thanks Yifan for taking care.
>
> On Thu, Jan 17, 2019 at 1:24 AM Yifan Zou  wrote:
> >
> > Yes, beam14 is offline as well. We're on it.
> >
> > On Wed, Jan 16, 2019 at 4:11 PM Ruoyun Huang  wrote:
> >>
> >> With another try, succeeding on beam10.
> >>
> >> Thanks for the fix.
> >>
> >> On Wed, Jan 16, 2019 at 3:53 PM Ruoyun Huang  wrote:
> >>>
> >>> Just did a rerun, got error saying "10:12:21 ERROR: beam14 is offline;
> cannot locate JDK 1.8 (latest)".
> >>>
> >>> Beam1 is not the only one broken?
> >>>
> >>> On Wed, Jan 16, 2019 at 3:45 PM Yifan Zou  wrote:
> 
>  The beam1 was still accepting jobs and breaking them after reset this
> morning. We temporarily disconnect it so that jobs could be scheduled on
> healthy nodes. Infra is making efforts to fix beam1.
> 
>  On Wed, Jan 16, 2019 at 11:15 AM Yifan Zou 
> wrote:
> >
> > The VM instance was reset and Infra is trying to repuppetize it.
> https://issues.apache.org/jira/browse/INFRA-17672 is created to track
> this issue.
> >
> > On Wed, Jan 16, 2019 at 10:51 AM Mark Liu 
> wrote:
> >>
> >> Thanks you Yifan!
> >>
> >> Looks like following precommits are affected according to my PR:
> >>
> >> Java_Examples_Dataflow,
> >> Portable_Python,
> >> Website_Stage_GCS
> >>
> >> On Wed, Jan 16, 2019 at 9:25 AM Yifan Zou 
> wrote:
> >>>
> >>> I am looking on it.
> >>>
> >>> On Wed, Jan 16, 2019 at 8:18 AM Ismaël Mejía 
> wrote:
> 
>  Can somebody PTAL. Sadly the poor jenkins shuffling algorithm is
>  sending most builds to it so there are issues to validate some
> PRs.
> >>>
> >>>
> >>>
> >>> --
> >>> 
> >>> Ruoyun  Huang
> >>>
> >>
> >>
> >> --
> >> 
> >> Ruoyun  Huang
> >>
>


Re: Cross-language pipelines

2019-01-22 Thread Chamikara Jayalath
Thanks Robert.

On Tue, Jan 22, 2019 at 4:39 AM Robert Bradshaw  wrote:

> Now that we have the FnAPI, I started playing around with support for
> cross-language pipelines. This will allow things like IOs to be shared
> across all languages, SQL to be invoked from non-Java, TFX tensorflow
> transforms to be invoked from non-Python, etc. and I think is the next
> step in extending (and taking advantage of) the portability layer
> we've developed. These are often composite transforms whose inner
> structure depends in non-trivial ways on their configuration.
>

Some additional benefits of cross-language transforms are given below.

(1) Current large collection of Java IO connectors will be become available
to other languages.
(2) Current Java and Python transforms will be available for Go and any
other future SDKs.
(3) New transform authors will be able to pick their language of choice and
make their transform available to all Beam SDKs. For example, this can be
the language the transform author is most familiar with or the only
language for which a client library is available for connecting to an
external data store.


> I created a PR [1] that basically follows the "expand via an external
> process" over RPC alternative from the proposals we came up with when
> we were discussing this last time [2]. There are still some unknowns,
> e.g. how to handle artifacts supplied by an alternative SDK (they
> currently must be provided by the environment), but I think this is a
> good incremental step forward that will already be useful in a large
> number of cases. It would be good to validate the general direction
> and I would be interested in any feedback others may have on it.
>

I think there are multiple semi-dependent problems we have to tackle to
reach the final goal of supporting fully-fledged cross-language transforms
in Beam. I agree with taking an incremental approach here with overall
vision in mind. Some other problems we have to tackle involve following.

* Defining a user API that will allow pipelines defined in a SDK X to use
transforms defined in SDK Y.
* Update various runners to use URN/payload based environment definition [1]
* Updating various runners to support starting containers for multiple
environments/languages for the same pipeline and supporting executing
pipeline steps in containers started for multiple environments.

Thanks,
Cham

[1]
https://github.com/apache/beam/blob/master/model/pipeline/src/main/proto/beam_runner_api.proto#L952








>
> - Robert
>
> [1] https://github.com/apache/beam/pull/7316
> [2] https://s.apache.org/beam-mixed-language-pipelines
>


Re: Magic number explanation in ParDoTest.java

2019-01-22 Thread Kenneth Knowles
The commit comes from this PR: https://github.com/apache/beam/pull/2273

Kenn

On Tue, Jan 22, 2019 at 10:21 AM Sam Rohde  wrote:

> Hi all,
>
> Does anyone have context why there is a magic number of "1774"
> milliseconds in the ParDoTest.java on line 2618? This is in
> the testEventTimeTimerAlignBounded method.
>
> File at master:
> https://github.com/apache/beam/blob/master/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java#L2618
>
> First added commit:
> https://github.com/apache/beam/commit/4f934923d28798dfe7cd18c86ff4bcf8eebc27e5
>
> Regards,
> Sam
>


Re: Proposal: Portability SDKHarness Docker Image Release with Beam Version Release.

2019-01-22 Thread Mark Liu
+1 to have an official Beam released container image.

Also I would propose to add a verification step to (or after) the release
process to do smoke check. Python have ValidatesContainer test that runs
basic pipeline using newly built container for verification. Other sdk
languages can do similar thing or add a common framework.

Mark

On Thu, Jan 17, 2019 at 5:56 AM Alan Myrvold  wrote:

> +1 This would be great. gcr.io seems like a good option for snapshots due
> to the permissions from jenkins to upload and ability to keep snapshots
> around.
>
> On Wed, Jan 16, 2019 at 6:51 PM Ruoyun Huang  wrote:
>
>> +1 This would be a great thing to have.
>>
>> On Wed, Jan 16, 2019 at 6:11 PM Ankur Goenka  wrote:
>>
>>> grc.io seems to be a good option. Given that we don't need the hosting
>>> server name in the image name makes it easily changeable later.
>>>
>>> Docker container for Apache Flink is named "flink" and they have
>>> different tags for different releases and configurations
>>> https://hub.docker.com/_/flink .We can follow a similar model and can
>>> name the image as "beam" (beam doesn't seem to be taken on docker hub) and
>>> use tags to distinguish Java/Python/Go and versions etc.
>>>
>>> Tags will look like:
>>> java-SNAPSHOT
>>> java-2.10.1
>>> python2-SNAPSHOT
>>> python2-2.10.1
>>> go-SNAPSHOT
>>> go-2.10.1
>>>
>>>
>>> On Wed, Jan 16, 2019 at 5:56 PM Ahmet Altay  wrote:
>>>
 For snapshots, we could use gcr.io. Permission would not be a problem
 since Jenkins is already correctly setup. The cost will be covered under
 apache-beam-testing project. And since this is only for snapshots, it will
 be only for temporary artifacts not for release artifacts.

 On Wed, Jan 16, 2019 at 5:50 PM Valentyn Tymofieiev <
 valen...@google.com> wrote:

> +1, releasing containers is a useful process that we need to build in
> Beam and it is required for FnApi users. Among other reasons, having
> officially-released Beam SDK harness container images will make it easier
> for users to do simple customizations to  container images, as they will 
> be
> able to use container image released by Beam as a base image.
>
> Good point about potential storage limitations on Bintray. With Beam
> Release cadence we may quickly exceed the 10 GB quota. It may also affect
> our decisions as to which images we want to release, for example: do we
> want to only release one container image with Python 3 interpreter, or do
> we want to release a container image for each Python 3 minor version that
> Beam is compatible with.
>

 Probably worth a separate discussion. I would favor first releasing a
 python 3 compatible version before figuring out how we would target
 multiple python 3 versions.

>>>

>
> On Wed, Jan 16, 2019 at 5:48 PM Ankur Goenka 
> wrote:
>
>>
>>
>> On Wed, Jan 16, 2019 at 5:37 PM Ahmet Altay  wrote:
>>
>>>
>>>
>>> On Wed, Jan 16, 2019 at 5:28 PM Ankur Goenka 
>>> wrote:
>>>
 - Could we start from snapshots first and then do it for releases?
 +1, releasing snapsots first makes sense to me.
 - For snapshots, do we need to clean old containers after a while?
 Otherwise I guess we will accumulate lots of containers.
 For snap shots we can maintain a single snapshot image from git
 HEAD daily. Docker has the internal image container id which changes
 everytime an image is changed and pulls new images as needed.

>>>
>>> There is a potential use this may not work with. If a user picks up
>>> a snaphsot build and want to use it until the next release arrives. I 
>>> guess
>>> in that case the user can copy the snapshotted container image and rely 
>>> on
>>> that.
>>>
>>>
>> Yes, that should be reasonable.
>>
>>> - Do we also need additional code changes for snapshots and releases
 to default to these specific containers? There could be a version based
 mechanism to resolve the correct container to use.
 The current image defaults have username in it. We should be ok by
 just updating the default image url to published image url.

 We should also check for pricing and details about Apache-Bintray
 agreement before pushing images and changing defaults.

>>>
>>> There is information on bintray's pricing page about open source
>>> projects [1]. I do not know if there is a special apache-bintray 
>>> agreement
>>> or not. If there is no special agreement there is a 10GB storage limit 
>>> for
>>> using bintray.
>>>
>> As each image can easily run into Gigs, 10GB might not be sufficient
>> for future proofing.
>> We can also register docker image to docker image registry and not
>> have bintray in the name to later host images on a different 

Magic number explanation in ParDoTest.java

2019-01-22 Thread Sam Rohde
Hi all,

Does anyone have context why there is a magic number of "1774" milliseconds
in the ParDoTest.java on line 2618? This is in
the testEventTimeTimerAlignBounded method.

File at master:
https://github.com/apache/beam/blob/master/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java#L2618

First added commit:
https://github.com/apache/beam/commit/4f934923d28798dfe7cd18c86ff4bcf8eebc27e5

Regards,
Sam


Re: [PROPOSAL] Prepare Beam 2.10.0 release

2019-01-22 Thread Scott Wegner
The rollback for BEAM-6352 is now in and cherry-picked into the release
branch.

On Fri, Jan 18, 2019 at 9:04 AM Scott Wegner  wrote:

> For BEAM-6352, I have a rollback ready for review:
> https://github.com/apache/beam/pull/7540
> Conversation about the decision to rollback vs. roll-forward for this
> change is on the JIRA issue.
>
> On Fri, Jan 18, 2019 at 8:22 AM Maximilian Michels  wrote:
>
>> I've created the revert for the pipeline options parsing which we agreed
>> on:
>> https://github.com/apache/beam/pull/7564
>>
>> On 17.01.19 15:16, Maximilian Michels wrote:
>> > An issue with the Flink Runner when restarting streaming pipelines:
>> > https://jira.apache.org/jira/browse/BEAM-6460
>> >
>> > Looks like it will be easy to fix by invalidating the Jackson cache.
>> >
>> > -Max
>> >
>> > On 16.01.19 23:00, Kenneth Knowles wrote:
>> >> Quick update on this. There are three remaining issues:
>> >>
>> >>   - https://issues.apache.org/jira/browse/BEAM-6407: A DirectRunner
>> self-check
>> >> was broken from 2.8.0 to 2.9.0 - PR looks good modulo our infra flakes
>> >>   - https://issues.apache.org/jira/browse/BEAM-6354: PAssert +
>> DirectRunner +
>> >> Unbounded data busted? Investigation not started
>> >>   - https://issues.apache.org/jira/browse/BEAM-6352: Watch was broken
>> from
>> >> 2.8.0 to 2.9.0 - will rollback if no forward fix by the time
>> everything else
>> >> is resolved
>> >>
>> >> Kenn
>> >>
>> >> On Wed, Jan 16, 2019 at 6:00 AM Kenneth Knowles > >> > wrote:
>> >>
>> >> Thanks, Ismaël!
>> >>
>> >> On Wed, Jan 16, 2019 at 2:13 AM Ismaël Mejía > >> > wrote:
>> >>
>> >> Ok since there were not many issues I did the 'update' for the
>> >> misplaced issues to version 2.10. We are good to go. New
>> resolved
>> >> issues in master musg go now into 2.11.0
>> >>
>> >> On Wed, Jan 16, 2019 at 10:38 AM Ismaël Mejía <
>> ieme...@gmail.com
>> >> > wrote:
>> >>  >
>> >>  > This means that the tickets resolved and marked for 2.11
>> since January
>> >>  > 2 should be reviewed and retargetted to version 2.10.
>> >>  > So this is a call for action for committers who have merged
>> fixes
>> >>  > after the cut to update the tickets if required.
>> >>  >
>> >>  > Ismaël
>> >>  >
>> >>  > On Tue, Jan 15, 2019 at 9:22 PM Kenneth Knowles <
>> k...@apache.org
>> >> > wrote:
>> >>  > >
>> >>  > > As a heads up, I did not realize that the release guide
>> specified a
>> >> custom process for starting a release branch. It makes sense;
>> >> cut_release_branch.sh consolidates knowledge about all the
>> places the
>> >> version is hardcoded in the codebase. To keep the history
>> simple, I will
>> >> re-cut the release branch at the point where master moved from
>> >> 2.10.0-SNAPSHOT to 2.11.0-SNAPSHOT. All PRs to the branch have
>> been
>> >> cherry-picked from master, so they will all be incorporated
>> without any
>> >> action by their authors.
>> >>  > >
>> >>  > > Kenn
>> >>  > >
>> >>  > > On Tue, Jan 15, 2019 at 10:31 AM Kenneth Knowles <
>> k...@google.com
>> >> > wrote:
>> >>  > >>
>> >>  > >> I'm on it.
>> >>  > >>
>> >>  > >> On Tue, Jan 15, 2019 at 8:10 AM Ismaël Mejía <
>> ieme...@gmail.com
>> >> > wrote:
>> >>  > >>>
>> >>  > >>> There is also another issue, after the 2.10.0 branch
>> cut some
>> >>  > >>> identifier in the build was not changed and the Apache
>> Beam
>> >> Snapshots
>> >>  > >>> keep generating SNAPSHOTS for 2.10.0 instead of the now
>> current
>> >>  > >>> 2.11.0-SNAPSHOT. Can somebody PTAL?
>> >>  > >>>
>> >>  > >>> On Thu, Jan 3, 2019 at 6:17 PM Maximilian Michels <
>> m...@apache.org
>> >> > wrote:
>> >>  > >>> >
>> >>  > >>> > Thanks for driving this Kenn! I'm in favor of a
>> strict cut off,
>> >> but I'd like to
>> >>  > >>> > propose a week for cherry-picking relevant changes to
>> the
>> >> release branch. It
>> >>  > >>> > looks like many people are returning from holidays or
>> are still
>> >> off.
>> >>  > >>> >
>> >>  > >>> > Cheers,
>> >>  > >>> > Max
>> >>  > >>> >
>> >>  > >>> > On 02.01.19 17:20, Kenneth Knowles wrote:
>> >>  > >>> > > Done. I've created the Jira tag for 2.11.0.
>> >>  > >>> > >
>> >>  > >>> > > Previously, there was a few days warning to get
>> things in
>> >> before the branch is
>> >>  > >>> > > cut. You can just cherry-pick them. This is a bit
>> better for
>> >> release stability
>> >>   

Cross-language pipelines

2019-01-22 Thread Robert Bradshaw
Now that we have the FnAPI, I started playing around with support for
cross-language pipelines. This will allow things like IOs to be shared
across all languages, SQL to be invoked from non-Java, TFX tensorflow
transforms to be invoked from non-Python, etc. and I think is the next
step in extending (and taking advantage of) the portability layer
we've developed. These are often composite transforms whose inner
structure depends in non-trivial ways on their configuration.

I created a PR [1] that basically follows the "expand via an external
process" over RPC alternative from the proposals we came up with when
we were discussing this last time [2]. There are still some unknowns,
e.g. how to handle artifacts supplied by an alternative SDK (they
currently must be provided by the environment), but I think this is a
good incremental step forward that will already be useful in a large
number of cases. It would be good to validate the general direction
and I would be interested in any feedback others may have on it.

- Robert

[1] https://github.com/apache/beam/pull/7316
[2] https://s.apache.org/beam-mixed-language-pipelines


Re: [DISCUSSION] UTests and embedded backends

2019-01-22 Thread Robert Bradshaw
On Mon, Jan 21, 2019 at 10:42 PM Kenneth Knowles  wrote:
>
> Robert - you meant this as a mostly-automatic thing that we would engineer, 
> yes?

Yes, something like TestPipeline that buffers up the pipelines and
then executes on class teardown (details TBD).

> A lighter-weight fake, like using something in-process sharing a Java 
> interface (versus today a locally running service sharing an RPC interface) 
> is still much better than a mock.

+1

>
> Kenn
>
> On Mon, Jan 21, 2019 at 7:17 AM Jean-Baptiste Onofré  
> wrote:
>>
>> Hi,
>>
>> it makes sense to use embedded backend when:
>>
>> 1. it's possible to easily embed the backend
>> 2. when the backend is "predictable".
>>
>> If it's easy to embed and the backend behavior is predictable, then it
>> makes sense.
>> In other cases, we can fallback to mock.
>>
>> Regards
>> JB
>>
>> On 21/01/2019 10:07, Etienne Chauchot wrote:
>> > Hi guys,
>> >
>> > Lately I have been fixing various Elasticsearch flakiness issues in the
>> > UTests by: introducing timeouts, countdown latches, force refresh,
>> > embedded cluster size decrease ...
>> >
>> > These flakiness issues are due to the embedded Elasticsearch not coping
>> > well with the jenkins overload. Still, IMHO I believe that having
>> > embedded backend for UTests are a lot better than mocks. Even if they
>> > are less tolerant to load, I prefer having UTests 100% representative of
>> > real backend and add countermeasures to protect against jenkins overload.
>> >
>> > WDYT ?
>> >
>> > Etienne
>> >
>> >
>>
>> --
>> Jean-Baptiste Onofré
>> jbono...@apache.org
>> http://blog.nanthrax.net
>> Talend - http://www.talend.com