***UNCHECKED*** Re: Discussion: Scheduling across runner and SDKHarness in Portability framework

2018-09-19 Thread Robert Bradshaw
On Fri, Sep 14, 2018 at 3:01 PM Thomas Weise wrote: > That's actually how the Flink runner already works - bundle processing > starts when elements are available (see FlinkExecutableStageFunction for > batch mode). > > But we still have the possibility of the SDK getting concurrent requests > due

Re: Discussion: Scheduling across runner and SDKHarness in Portability framework

2018-09-14 Thread Thomas Weise
That's actually how the Flink runner already works - bundle processing starts when elements are available (see FlinkExecutableStageFunction for batch mode). But we still have the possibility of the SDK getting concurrent requests due to parallelism (and pipelined execution). Thanks, Thomas On F

Re: Discussion: Scheduling across runner and SDKHarness in Portability framework

2018-09-14 Thread Robert Bradshaw
Currently the best solution we've come up with is that we must process an unbounded number of bundles concurrently to avoid deadlock. Especially in the batch case, this may be wasteful as we bring up workers for many stages that are not actually executable until upstream stages finish. Since it may

Re: Discussion: Scheduling across runner and SDKHarness in Portability framework

2018-08-29 Thread Ankur Goenka
I managed to write a small document based on the discussion. Please take a look at https://docs.google.com/document/d/1oAXVPbJ0dzj2_8LXEWFAgqCP5Tpld3q5B3QU254PQ6A/edit?usp=sharing On Tue, Aug 21, 2018 at 11:01 PM Henning Rohde wrote: > Sending bundles that cannot be executed, i.e., the situatio

Re: Discussion: Scheduling across runner and SDKHarness in Portability framework

2018-08-21 Thread Henning Rohde
Sending bundles that cannot be executed, i.e., the situation described to cause deadlock in Flink in the beginning of the thread with mapB. The discussion of exposing (or assuming an infinitely large) concurrency level -- while a useful concept in its own right -- came around as a way to unblock ma

Re: Discussion: Scheduling across runner and SDKHarness in Portability framework

2018-08-21 Thread Lukasz Cwik
Henning, can you clarify by what you mean with send non-executable bundles to the SDK harness and how it is useful for Flink? On Tue, Aug 21, 2018 at 2:01 PM Henning Rohde wrote: > I think it will be useful to the runner to know upfront what the > fundamental threading capabilities are for the S

Re: Discussion: Scheduling across runner and SDKHarness in Portability framework

2018-08-21 Thread Henning Rohde
I think it will be useful to the runner to know upfront what the fundamental threading capabilities are for the SDK harness (say, "fixed", "linear", "dynamic", ..) so that the runner can upfront make a good static decision on #harnesses and how many resources they should each have. It's wasteful to

Re: Discussion: Scheduling across runner and SDKHarness in Portability framework

2018-08-20 Thread Ankur Goenka
That's right. To add to it. We added multi threading to python streaming as a single thread is sub optimal for streaming use case. Shall we move towards a conclusion on the SDK bundle processing upper bound? On Mon, Aug 20, 2018 at 1:54 PM Lukasz Cwik wrote: > Ankur, I can see where you are goin

Re: Discussion: Scheduling across runner and SDKHarness in Portability framework

2018-08-20 Thread Lukasz Cwik
Ankur, I can see where you are going with your argument. I believe there is certain information which is static and won't change at pipeline creation time (such as Python SDK is most efficient doing one bundle at a time) and some stuff which is best at runtime, like memory and CPU limits, worker co

Re: Discussion: Scheduling across runner and SDKHarness in Portability framework

2018-08-20 Thread Ankur Goenka
I would prefer to to keep it dynamic as it can be changed by the infrastructure or the pipeline author. Like in case of Python, number of concurrent bundle can be changed by setting pipeline option worker_count. And for Java it can be computed based on the cpus on the machine. For Flink runner, we

Re: Discussion: Scheduling across runner and SDKHarness in Portability framework

2018-08-20 Thread Lukasz Cwik
+1 on making the resources part of a proto. Based upon what Henning linked to, the provisioning API seems like an appropriate place to provide this information. Thomas, I believe the environment proto is the best place to add information that a runner may want to know about upfront during pipeline

Re: Discussion: Scheduling across runner and SDKHarness in Portability framework

2018-08-19 Thread Thomas Weise
For SDKs where the upper limit is constant and known upfront, why not communicate this along with the other harness resource info as part of the job submission? Regarding use of GRPC headers: Why not make this explicit in the proto instead? WRT runner dictating resource constraints: The runner ac

Re: Discussion: Scheduling across runner and SDKHarness in Portability framework

2018-08-18 Thread Ankur Goenka
Sounds good to me. GRPC Header of the control channel seems to be a good place to add upper bound information. Added jiras: https://issues.apache.org/jira/browse/BEAM-5166 https://issues.apache.org/jira/browse/BEAM-5167 On Fri, Aug 17, 2018 at 10:51 PM Henning Rohde wrote: > Regarding resources:

Re: Discussion: Scheduling across runner and SDKHarness in Portability framework

2018-08-17 Thread Henning Rohde
Regarding resources: the runner can currently dictate the mem/cpu/disk resources that the harness is allowed to use via the provisioning api. The SDK harness need not -- and should not -- speculate on what else might be running on the machine: https://github.com/apache/beam/blob/0e14965707b5d48a3d

Re: Discussion: Scheduling across runner and SDKHarness in Portability framework

2018-08-17 Thread Ankur Goenka
I am thinking upper bound to be more on the lines of theocratical upper limit or any other static high value beyond which the SDK will reject bundle verbosely. The idea is that SDK will not keep bundles in queue while waiting on current bundles to finish. It will simply reject any additional bundle

Re: Discussion: Scheduling across runner and SDKHarness in Portability framework

2018-08-17 Thread Lukasz Cwik
Ankur, how would you expect an SDK to compute a realistic upper bound (upfront or during pipeline computation)? First thought that came to my mind was that the SDK would provide CPU/memory/... resourcing information and the runner making a judgement call as to whether it should ask the SDK to do m

Re: Discussion: Scheduling across runner and SDKHarness in Portability framework

2018-08-17 Thread Ankur Goenka
Makes sense. Having exposed upper bound on concurrency with optimum concurrency can give a good balance. This is good information to expose while keeping the requirements from the SDK simple. SDK can publish 1 as the optimum concurrency and upper bound to keep things simple. Runner introspection o

Re: Discussion: Scheduling across runner and SDKHarness in Portability framework

2018-08-17 Thread Henning Rohde
I agree with Luke's observation, with the caveat that "infinite amount of bundles in parallel" is limited by the available resources. For example, the Go SDK harness will accept an arbitrary amount of parallel work, but too much work will cause either excessive GC pressure with crippling slowness o

Re: Discussion: Scheduling across runner and SDKHarness in Portability framework

2018-08-17 Thread Ankur Goenka
The later case of having a of supporting single bundle execution at a time on SDK and runner not using this flag is exactly the reason we got into the Dead Lock here. I agree with exposing SDK optimum concurrency level ( 1 in later case ) and let runner decide to use it or not. But at the same time

Re: Discussion: Scheduling across runner and SDKHarness in Portability framework

2018-08-17 Thread Lukasz Cwik
I believe in practice SDK harnesses will fall into one of two capabilities, can process effectively an infinite amount of bundles in parallel or can only process a single bundle at a time. I believe it is more difficult for a runner to handle the latter case well and to perform all the environment

Re: Discussion: Scheduling across runner and SDKHarness in Portability framework

2018-08-17 Thread Ankur Goenka
To recap the discussion it seems that we have come-up with following point. SDKHarness Management and initialization. 1. Runner completely own the work assignment to SDKHarness. 2. Runner should know the capabilities and capacity of SDKHarness and should assign work accordingly. 3. Spi

Re: Discussion: Scheduling across runner and SDKHarness in Portability framework

2018-08-17 Thread Henning Rohde
Finding a good balance is indeed the art of portability, because the range of capability (and assumptions) on both sides is wide. It was originally the idea to allow the SDK harness to be an extremely simple bundle executer (specifically, single-threaded execution one instruction at a time) howeve

Re: Discussion: Scheduling across runner and SDKHarness in Portability framework

2018-08-17 Thread Thomas Weise
It is good to see this discussed! I think there needs to be a good balance between the SDK harness capabilities/complexity and responsibilities. Additionally the user will need to be able to adjust the runner behavior, since the type of workload executed in the harness also is a factor. Elsewhere

Re: Discussion: Scheduling across runner and SDKHarness in Portability framework

2018-08-17 Thread Lukasz Cwik
SDK harnesses were always responsible for executing all work given to it concurrently. Runners have been responsible for choosing how much work to give to the SDK harness in such a way that best utilizes the SDK harness. I understand that multithreading in python is inefficient due to the global i

Re: Discussion: Scheduling across runner and SDKHarness in Portability framework

2018-08-17 Thread Maximilian Michels
Hi Ankur, Thanks for looking into this problem. The cause seems to be Flink's pipelined execution mode. It runs multiple tasks in one task slot and produces a deadlock when the pipelined operators schedule the SDK harness DoFns in non-topological order. The problem would be resolved if we schedul

Discussion: Scheduling across runner and SDKHarness in Portability framework

2018-08-16 Thread Ankur Goenka
Hi, tl;dr Dead Lock in task execution caused by limited task parallelism on SDKHarness. *Setup:* - Job type: *Beam Portable Python Batch* Job on Flink standalone cluster. - Only a single job is scheduled on the cluster. - Everything is running on a single machine with single Flink task