On Fri, Sep 14, 2018 at 3:01 PM Thomas Weise wrote:
> That's actually how the Flink runner already works - bundle processing
> starts when elements are available (see FlinkExecutableStageFunction for
> batch mode).
>
> But we still have the possibility of the SDK getting concurrent requests
> due
That's actually how the Flink runner already works - bundle processing
starts when elements are available (see FlinkExecutableStageFunction for
batch mode).
But we still have the possibility of the SDK getting concurrent requests
due to parallelism (and pipelined execution).
Thanks,
Thomas
On F
Currently the best solution we've come up with is that we must process an
unbounded number of bundles concurrently to avoid deadlock. Especially in
the batch case, this may be wasteful as we bring up workers for many stages
that are not actually executable until upstream stages finish. Since it may
I managed to write a small document based on the discussion.
Please take a look at
https://docs.google.com/document/d/1oAXVPbJ0dzj2_8LXEWFAgqCP5Tpld3q5B3QU254PQ6A/edit?usp=sharing
On Tue, Aug 21, 2018 at 11:01 PM Henning Rohde wrote:
> Sending bundles that cannot be executed, i.e., the situatio
Sending bundles that cannot be executed, i.e., the situation described to
cause deadlock in Flink in the beginning of the thread with mapB. The
discussion of exposing (or assuming an infinitely large) concurrency level
-- while a useful concept in its own right -- came around as a way to
unblock ma
Henning, can you clarify by what you mean with send non-executable bundles
to the SDK harness and how it is useful for Flink?
On Tue, Aug 21, 2018 at 2:01 PM Henning Rohde wrote:
> I think it will be useful to the runner to know upfront what the
> fundamental threading capabilities are for the S
I think it will be useful to the runner to know upfront what the
fundamental threading capabilities are for the SDK harness (say, "fixed",
"linear", "dynamic", ..) so that the runner can upfront make a good static
decision on #harnesses and how many resources they should each have. It's
wasteful to
That's right.
To add to it. We added multi threading to python streaming as a single
thread is sub optimal for streaming use case.
Shall we move towards a conclusion on the SDK bundle processing upper bound?
On Mon, Aug 20, 2018 at 1:54 PM Lukasz Cwik wrote:
> Ankur, I can see where you are goin
Ankur, I can see where you are going with your argument. I believe there is
certain information which is static and won't change at pipeline creation
time (such as Python SDK is most efficient doing one bundle at a time) and
some stuff which is best at runtime, like memory and CPU limits, worker
co
I would prefer to to keep it dynamic as it can be changed by the
infrastructure or the pipeline author.
Like in case of Python, number of concurrent bundle can be changed by
setting pipeline option worker_count. And for Java it can be computed based
on the cpus on the machine.
For Flink runner, we
+1 on making the resources part of a proto. Based upon what Henning linked
to, the provisioning API seems like an appropriate place to provide this
information.
Thomas, I believe the environment proto is the best place to add
information that a runner may want to know about upfront during pipeline
For SDKs where the upper limit is constant and known upfront, why not
communicate this along with the other harness resource info as part of the
job submission?
Regarding use of GRPC headers: Why not make this explicit in the proto
instead?
WRT runner dictating resource constraints: The runner ac
Sounds good to me.
GRPC Header of the control channel seems to be a good place to add upper
bound information.
Added jiras:
https://issues.apache.org/jira/browse/BEAM-5166
https://issues.apache.org/jira/browse/BEAM-5167
On Fri, Aug 17, 2018 at 10:51 PM Henning Rohde wrote:
> Regarding resources:
Regarding resources: the runner can currently dictate the mem/cpu/disk
resources that the harness is allowed to use via the provisioning api. The
SDK harness need not -- and should not -- speculate on what else might be
running on the machine:
https://github.com/apache/beam/blob/0e14965707b5d48a3d
I am thinking upper bound to be more on the lines of theocratical upper
limit or any other static high value beyond which the SDK will reject
bundle verbosely. The idea is that SDK will not keep bundles in queue while
waiting on current bundles to finish. It will simply reject any additional
bundle
Ankur, how would you expect an SDK to compute a realistic upper bound
(upfront or during pipeline computation)?
First thought that came to my mind was that the SDK would provide
CPU/memory/... resourcing information and the runner making a judgement
call as to whether it should ask the SDK to do m
Makes sense. Having exposed upper bound on concurrency with optimum
concurrency can give a good balance. This is good information to expose
while keeping the requirements from the SDK simple. SDK can publish 1 as
the optimum concurrency and upper bound to keep things simple.
Runner introspection o
I agree with Luke's observation, with the caveat that "infinite amount of
bundles in parallel" is limited by the available resources. For example,
the Go SDK harness will accept an arbitrary amount of parallel work, but
too much work will cause either excessive GC pressure with crippling
slowness o
The later case of having a of supporting single bundle execution at a time
on SDK and runner not using this flag is exactly the reason we got into the
Dead Lock here.
I agree with exposing SDK optimum concurrency level ( 1 in later case ) and
let runner decide to use it or not. But at the same time
I believe in practice SDK harnesses will fall into one of two capabilities,
can process effectively an infinite amount of bundles in parallel or can
only process a single bundle at a time.
I believe it is more difficult for a runner to handle the latter case well
and to perform all the environment
To recap the discussion it seems that we have come-up with following point.
SDKHarness Management and initialization.
1. Runner completely own the work assignment to SDKHarness.
2. Runner should know the capabilities and capacity of SDKHarness and
should assign work accordingly.
3. Spi
Finding a good balance is indeed the art of portability, because the range
of capability (and assumptions) on both sides is wide.
It was originally the idea to allow the SDK harness to be an extremely
simple bundle executer (specifically, single-threaded execution one
instruction at a time) howeve
It is good to see this discussed!
I think there needs to be a good balance between the SDK harness
capabilities/complexity and responsibilities. Additionally the user will
need to be able to adjust the runner behavior, since the type of workload
executed in the harness also is a factor. Elsewhere
SDK harnesses were always responsible for executing all work given to it
concurrently. Runners have been responsible for choosing how much work to
give to the SDK harness in such a way that best utilizes the SDK harness.
I understand that multithreading in python is inefficient due to the global
i
Hi Ankur,
Thanks for looking into this problem. The cause seems to be Flink's
pipelined execution mode. It runs multiple tasks in one task slot and
produces a deadlock when the pipelined operators schedule the SDK
harness DoFns in non-topological order.
The problem would be resolved if we schedul
Hi,
tl;dr Dead Lock in task execution caused by limited task parallelism on
SDKHarness.
*Setup:*
- Job type: *Beam Portable Python Batch* Job on Flink standalone cluster.
- Only a single job is scheduled on the cluster.
- Everything is running on a single machine with single Flink task
26 matches
Mail list logo