[
https://issues.apache.org/jira/browse/BEAM-6550?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16756377#comment-16756377
]
Xinyu Liu commented on BEAM-6550:
-
Summary from email thread:
>From [~kenn]:
If the input is a CompletionStage then the output should also be a
CompletionStage, since all you should do is async chaining. We could
enforce this by giving the DoFn an OutputReceiver(CompletionStage).
Another possibility that might be even more robust against poor future use
could be process(@Element InputT element, @Output
OutputReceiver>). In this way, the process method
itself will be async chained, rather than counting on the user to do the right
thing.
When executed over the portable APIs, it will be primarily the Java SDK harness
that makes all of these decisions. If we wanted runners to have some insight
into it we would have to add it to the Beam model protos. I don't have any
suggestions there, so I would leave it out of this discussion until there's
good ideas. We could learn a lot by trying it out just in the SDK harness.
>From [~SteveNiemitz]:
I'd love to see something like this as well. Also +1 to process(@Element
InputT element, @Output OutputReceiver>). I don't
know if there's much benefit to passing a future in, since the framework itself
could hook up the process function to complete when the future completes.
I feel like I've spent a bunch of time writing very similar "kick off a future
in ProcessElement, join it in FinishBundle" code, and looking around beam
itself a lot of built-in transforms do it as well. Scio provides a few
AsyncDoFn implementations [1] but it'd be great to see this as a first-class
concept in beam itself. Doing error handling, concurrency, etc correctly can
be tricky.
[1]
https://github.com/spotify/scio/blob/master/scio-core/src/main/java/com/spotify/scio/transforms/BaseAsyncDoFn.java
>From [~swegner]:
A related question is how to make execution observable such that a runner can
make proper scaling decisions. Runners decide how to schedule bundles within
and across multiple worker instances, and can use information about execution
to make dynamic scaling decisions. First-class async APIs seem like they would
encourage DoFn authors to implement their own parallelization, rather than
deferring to the runner that should be more capable of providing the right
level of parallelism.
In the Dataflow worker harness, we estimate execution time to PTransform steps
by sampling execution time on the execution thread and attributing it to the
currently invoked method. This approach is fairly simple and possible because
we assume that execution happens within the thread controlled by the runner.
Some DoFn's already implement their own async logic and break this assumption;
I would expect more if we make async built into the DoFn APIs.
So: this isn't an argument against async APIs, but rather: does this break
execution observability, and are there other lightweight mechanisms for
attributing execution time of async work?
[~robertwb]:
If I understand correctly, the end goal is to process input elements
of a DoFn asynchronously. Were I to do this naively, I would implement
DoFns that simply take and receive [Serializable?]CompletionStages as
element types, followed by a DoFn that adds a callback to emit on
completion (possibly via a queue to avoid being-on-the-wrong-thread
issues) and whose finalize forces all completions. This would, of
course, interact poorly with processing time tracking, fusion breaks,
watermark tracking, counter attribution, window propagation, etc. so
it is desirable to make it part of the system itself.
Taking a OutputReceiver> seems like a decent
API. The invoking of the downstream process could be chained onto
this, with all the implicit tracking and tracing set up correctly.
Taking a CompletionStage as input means a DoFn would not have to
create its output CompletionStage ex nihilo and possibly allow for
better chaining (depending on the asynchronous APIs used).
Even better might be to simply let the invocation of all
DoFn.process() methods be asynchronous, but as Java doesn't offer an
await primitive to relinquish control in the middle of a function body
this might be hard.
I think for correctness, completion would have to be forced at the end
of each bundle. If your bundles are large enough, this may not be that
big of a deal. In this case you could also start executing subsequent
bundles while waiting for prior ones to complete.
> ParDo Async Java API
>
>
> Key: BEAM-6550
> URL: https://issues.apache.org/jira/browse/BEAM-6550
> Project: Beam
> Issue Type: Improvement
> Components: sdk-java-core
>Reporter: Xinyu Liu
>Assignee: Xinyu Liu
>Priority: Major
>
> This ticket is to track the work on adding the ParDo