Thanks Rafal and Lukasz! These are great suggestions! One quick question
about using semaphore though, would it be possible for multiple elements to
pile up in a particular worker instance, waiting to acquire the semaphore
but can't? I'll definitely test it though.

Lukasz, let me try to explain why I feel this autoscaling might not be the
ideal solution first. I'll definitely contact dataflow-feedb...@google.com
as well but I'll try to give some of my [probably incorrect] thoughts.

So basically based on my understanding if Beam tries to allocate multiple
elements to a single machine, let's assume an ideal computational model
where each single core takes T time to finish processing an element but if
all 32 cores can be used to process this element then it takes T/32 time.

Therefore, if we have 32 incoming elements, if Beam allocates 32 threads on
a worker instance for this DoFn, each element using a single core will be
finished in T time and therefore there would be no back log during this
time since all the elements are being processed. But if we can tune the
parameter to say Beam should allocate fewer elements per worker instance,
then this creates a backlog and autoscaling might trigger earlier, so
technically the overall system lag might actually be better?

I haven't tested this hypothesis yet but basically the above is my
reasoning.

Thanks,

Derek

On Tue, Oct 17, 2017 at 8:49 AM, Lukasz Cwik <lc...@google.com> wrote:

> The `numberOfWorkerHarnessThreads` is worker wide and not per DoFn.
> Setting this value to constrain how many threads are executing will impact
> all parts of your pipeline. One idea is to use a Semaphore as a static
> object within your DoFn with a fixed number of allowed actors that can
> enter and execute your Tensorflow.
>
> class TfDoFn<X, Y> {
>   private static final int MAX_TF_ACTORS = 4;
>   private static final Semaphore semaphore = new Semaphore(MAX_TF_ACTORS,
> true);
>
>   @ProcessElement
>   public void processElement(X x) {
>     try {
>       semaphore.acquire();
>       // Do TF work
>     } finally {
>       semaphore.release();
>     }
>   }
> }
>
> This will ensure that your processing each TF item in a more timely manner
> but it will still mean that there could be many other TF items which are
> still sitting around waiting for the semaphore to be acquired.
>
> As an alternative, I would recommend contacting
> dataflow-feedb...@google.com specifically referencing how you believe
> autoscaling is not working well for your usecase/pipeline. Also provide a
> description of your pipeline and some job ids (if possible).
>
>
> On Mon, Oct 16, 2017 at 6:26 PM, Rafal Wojdyla <r...@spotify.com> wrote:
>
>> Hi.
>> To answer your question: if we limit ourselves to DataflowRunner, you
>> could use `numberOfWorkerHarnessThreads`. See more here
>> <https://github.com/spotify/scio/wiki/FAQ#how-do-i-control-concurrency-number-of-dofn-threads-in-dataflow-workers>.
>> That said, I'm not gonna comment whether that is a good remedy for your
>> actual problem.
>> - rav
>>
>> On Mon, Oct 16, 2017 at 8:48 PM, Derek Hao Hu <phoenixin...@gmail.com>
>> wrote:
>>
>>> Hi,
>>>
>>> ​Is there an easy way to limit the number of DoFn instances per worker?
>>>
>>> The use case is like this: we are calling TensorFlow in our DoFn and
>>> each TensorFlow call would automatically try to allocate the available CPU
>>> resources. So in a streaming pipeline, what I'm seeing is the inference
>>> time will become longer over time if autoscaling didn't catch up. My
>>> hypothesis is that Beam is trying to allocate a specific number of elements
>>> (maybe the number of cores?) on each worker for a particular DoFn and then
>>> these TensorFlow threads contend for CPU cycles. Therefore, I would like to
>>> know whether it's possible to limit the number of threads a pipeline runner
>>> can allocate for a DoFn per worker. By doing this, we can ensure we are
>>> accumulating backlogs in the streaming pipeline earlier and autoscaling
>>> would probably happen earlier as well.
>>>
>>> Thanks!​
>>> --
>>> Derek Hao Hu
>>>
>>> Software Engineer | Snapchat
>>> Snap Inc.
>>>
>>
>>
>


-- 
Derek Hao Hu

Software Engineer | Snapchat
Snap Inc.

Reply via email to