The `numberOfWorkerHarnessThreads` is worker wide and not per DoFn. Setting
this value to constrain how many threads are executing will impact all
parts of your pipeline. One idea is to use a Semaphore as a static object
within your DoFn with a fixed number of allowed actors that can enter and
execute your Tensorflow.
class TfDoFn<X, Y> {
private static final int MAX_TF_ACTORS = 4;
private static final Semaphore semaphore = new Semaphore(MAX_TF_ACTORS,
true);
@ProcessElement
public void processElement(X x) {
try {
semaphore.acquire();
// Do TF work
} finally {
semaphore.release();
}
}
}
This will ensure that your processing each TF item in a more timely manner
but it will still mean that there could be many other TF items which are
still sitting around waiting for the semaphore to be acquired.
As an alternative, I would recommend contacting [email protected]
specifically referencing how you believe autoscaling is not working well
for your usecase/pipeline. Also provide a description of your pipeline and
some job ids (if possible).
On Mon, Oct 16, 2017 at 6:26 PM, Rafal Wojdyla <[email protected]> wrote:
> Hi.
> To answer your question: if we limit ourselves to DataflowRunner, you
> could use `numberOfWorkerHarnessThreads`. See more here
> <https://github.com/spotify/scio/wiki/FAQ#how-do-i-control-concurrency-number-of-dofn-threads-in-dataflow-workers>.
> That said, I'm not gonna comment whether that is a good remedy for your
> actual problem.
> - rav
>
> On Mon, Oct 16, 2017 at 8:48 PM, Derek Hao Hu <[email protected]>
> wrote:
>
>> Hi,
>>
>> Is there an easy way to limit the number of DoFn instances per worker?
>>
>> The use case is like this: we are calling TensorFlow in our DoFn and each
>> TensorFlow call would automatically try to allocate the available CPU
>> resources. So in a streaming pipeline, what I'm seeing is the inference
>> time will become longer over time if autoscaling didn't catch up. My
>> hypothesis is that Beam is trying to allocate a specific number of elements
>> (maybe the number of cores?) on each worker for a particular DoFn and then
>> these TensorFlow threads contend for CPU cycles. Therefore, I would like to
>> know whether it's possible to limit the number of threads a pipeline runner
>> can allocate for a DoFn per worker. By doing this, we can ensure we are
>> accumulating backlogs in the streaming pipeline earlier and autoscaling
>> would probably happen earlier as well.
>>
>> Thanks!
>> --
>> Derek Hao Hu
>>
>> Software Engineer | Snapchat
>> Snap Inc.
>>
>
>