[jira] [Commented] (BEAM-6550) ParDo Async Java API

2020-06-02 Thread Bharath Kumarasubramanian (Jira)


[ 
https://issues.apache.org/jira/browse/BEAM-6550?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17124313#comment-17124313
 ] 

Bharath Kumarasubramanian commented on BEAM-6550:
-

Thanks for following up.


I have a change in progress that covers API, Direct & Samza runner 
implementations. I ran into bunch of direct runner test failures. Will reopen 
the PR once I fix the tests.

 

> ParDo Async Java API
> 
>
> Key: BEAM-6550
> URL: https://issues.apache.org/jira/browse/BEAM-6550
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-java-core
>Reporter: Xinyu Liu
>Assignee: Bharath Kumarasubramanian
>Priority: P2
>  Labels: stale-assigned
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> This ticket is to track the work on adding the ParDo async API. The 
> motivation for this is:
> - Many users are experienced in asynchronous programming. With async 
> frameworks such as Netty and ParSeq and libs like async jersey client, they 
> are able to make remote calls efficiently and the libraries help manage the 
> execution threads underneath. Async remote calls are very common in most of 
> our streaming applications today.
> - Many jobs are running on a multi-tenancy cluster. Async processing helps 
> for less resource usage and fast computation (less context switch).
> This API has become one of the most asked Java api from SamzaRunner users.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (BEAM-6550) ParDo Async Java API

2020-06-01 Thread Kenneth Knowles (Jira)


[ 
https://issues.apache.org/jira/browse/BEAM-6550?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17122126#comment-17122126
 ] 

Kenneth Knowles commented on BEAM-6550:
---

This issue is assigned but has not received an update in 30 days so it has been 
labeled "stale-assigned". If you are still working on the issue, please give an 
update and remove the label. If you are no longer working on the issue, please 
unassign so someone else may work on it. In 7 days the issue will be 
automatically unassigned.

> ParDo Async Java API
> 
>
> Key: BEAM-6550
> URL: https://issues.apache.org/jira/browse/BEAM-6550
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-java-core
>Reporter: Xinyu Liu
>Assignee: Bharath Kumarasubramanian
>Priority: P2
>  Labels: stale-assigned
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> This ticket is to track the work on adding the ParDo async API. The 
> motivation for this is:
> - Many users are experienced in asynchronous programming. With async 
> frameworks such as Netty and ParSeq and libs like async jersey client, they 
> are able to make remote calls efficiently and the libraries help manage the 
> execution threads underneath. Async remote calls are very common in most of 
> our streaming applications today.
> - Many jobs are running on a multi-tenancy cluster. Async processing helps 
> for less resource usage and fast computation (less context switch).
> This API has become one of the most asked Java api from SamzaRunner users.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (BEAM-6550) ParDo Async Java API

2019-01-30 Thread Xinyu Liu (JIRA)


[ 
https://issues.apache.org/jira/browse/BEAM-6550?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16756377#comment-16756377
 ] 

Xinyu Liu commented on BEAM-6550:
-

Summary from email thread:

>From [~kenn]: 

If the input is a CompletionStage then the output should also be a 
CompletionStage, since all you should do is async chaining. We could 
enforce this by giving the DoFn an OutputReceiver(CompletionStage).

Another possibility that might be even more robust against poor future use 
could be process(@Element InputT element, @Output 
OutputReceiver>). In this way, the process method 
itself will be async chained, rather than counting on the user to do the right 
thing.

When executed over the portable APIs, it will be primarily the Java SDK harness 
that makes all of these decisions. If we wanted runners to have some insight 
into it we would have to add it to the Beam model protos. I don't have any 
suggestions there, so I would leave it out of this discussion until there's 
good ideas. We could learn a lot by trying it out just in the SDK harness.

>From [~SteveNiemitz]: 

I'd love to see something like this as well.  Also +1 to process(@Element 
InputT element, @Output OutputReceiver>).  I don't 
know if there's much benefit to passing a future in, since the framework itself 
could hook up the process function to complete when the future completes.

I feel like I've spent a bunch of time writing very similar "kick off a future 
in ProcessElement, join it in FinishBundle" code, and looking around beam 
itself a lot of built-in transforms do it as well.  Scio provides a few 
AsyncDoFn implementations [1] but it'd be great to see this as a first-class 
concept in beam itself.  Doing error handling, concurrency, etc correctly can 
be tricky.
[1] 
https://github.com/spotify/scio/blob/master/scio-core/src/main/java/com/spotify/scio/transforms/BaseAsyncDoFn.java

>From [~swegner]:

A related question is how to make execution observable such that a runner can 
make proper scaling decisions. Runners decide how to schedule bundles within 
and across multiple worker instances, and can use information about execution 
to make dynamic scaling decisions. First-class async APIs seem like they would 
encourage DoFn authors to implement their own parallelization, rather than 
deferring to the runner that should be more capable of providing the right 
level of parallelism.

In the Dataflow worker harness, we estimate execution time to PTransform steps 
by sampling execution time on the execution thread and attributing it to the 
currently invoked method. This approach is fairly simple and possible because 
we assume that execution happens within the thread controlled by the runner. 
Some DoFn's already implement their own async logic and break this assumption; 
I would expect more if we make async built into the DoFn APIs.

So: this isn't an argument against async APIs, but rather: does this break 
execution observability, and are there other lightweight mechanisms for 
attributing execution time of async work?

[~robertwb]:
If I understand correctly, the end goal is to process input elements
of a DoFn asynchronously. Were I to do this naively, I would implement
DoFns that simply take and receive [Serializable?]CompletionStages as
element types, followed by a DoFn that adds a callback to emit on
completion (possibly via a queue to avoid being-on-the-wrong-thread
issues) and whose finalize forces all completions. This would, of
course, interact poorly with processing time tracking, fusion breaks,
watermark tracking, counter attribution, window propagation, etc. so
it is desirable to make it part of the system itself.

Taking a OutputReceiver> seems like a decent
API. The invoking of the downstream process could be chained onto
this, with all the implicit tracking and tracing set up correctly.
Taking a CompletionStage as input means a DoFn would not have to
create its output CompletionStage ex nihilo and possibly allow for
better chaining (depending on the asynchronous APIs used).

Even better might be to simply let the invocation of all
DoFn.process() methods be asynchronous, but as Java doesn't offer an
await primitive to relinquish control in the middle of a function body
this might be hard.

I think for correctness, completion would have to be forced at the end
of each bundle. If your bundles are large enough, this may not be that
big of a deal. In this case you could also start executing subsequent
bundles while waiting for prior ones to complete.






> ParDo Async Java API
> 
>
> Key: BEAM-6550
> URL: https://issues.apache.org/jira/browse/BEAM-6550
> Project: Beam
>  Issue Type: Improvement
>  Components: sdk-java-core
>Reporter: Xinyu Liu
>Assignee: Xinyu Liu
>Priority: Major
>
> This ticket is to track the work on adding the ParDo