Suggestion to let KafkaIO support the deserializer API with headers

2020-08-21 Thread Lourens Naude
Hi everyone, We bumped into an API issue with the deserializer called on constructing KafaRecord instances in the KafkaIO module. I wanted to float this past the mailing list for discussion first before exploring further. The callsite referenced: KafkaIO only calls the deserializer with the

@StateId uniqueness across DoFn(s)

2020-08-21 Thread Ke Wu
Hello everyone, After reading through Stateful processing with Apache Beam and DoFn.StateId , I understand that each state id must be unique and

Re: Suggestion to let KafkaIO support the deserializer API with headers

2020-08-21 Thread Luke Cwik
Sounds good. Note that you'll also want to update ReadFromKafkaDoFn[1] and provide tests that cover both to make sure we don't regress and stop providing headers. 1:

Re: Is there an equivalent for --numberOfWorkerHarnessThreads in Python SDK?

2020-08-21 Thread Reuven Lax
Streaming Dataflow relies on high thread count for performance. Streaming threads spend a high percentage of time blocked on IO, so in order to get decent CPU utilization we need a lot of threads. Limiting the thread count risks causing performance issues. On Fri, Aug 21, 2020 at 8:00 AM Kamil

Re: Is there an equivalent for --numberOfWorkerHarnessThreads in Python SDK?

2020-08-21 Thread Kamil Wasilewski
No, I'm not. But thanks anyway, I totally missed that option! It occurs in a simple pipeline that executes CoGroupByKey over two PCollections. Reading from a bounded source, 20 millions and 2 millions elements, respectively. One global window. Here's a link to the code, it's one of our tests:

Re: @StateId uniqueness across DoFn(s)

2020-08-21 Thread Reuven Lax
StateId is scoped to the DoFn. You can use the same string in different DoFns for completely different states. On Fri, Aug 21, 2020 at 10:21 AM Ke Wu wrote: > Hello everyone, > > After reading through Stateful processing with Apache Beam > and

Re: @StateId uniqueness across DoFn(s)

2020-08-21 Thread Ke Wu
If user does not explicitly specify transform name, in which case a autogenerated name will be used when generating the unique id, does it mean, the id could change when the pipeline changes, such as adding extra transforms etc? > On Aug 21, 2020, at 11:43 AM, Luke Cwik wrote: > > The DoFn

Re: Splittable-Dofn not distributing the work to multiple workers

2020-08-21 Thread Jiadai Xia
I am using v1. Does v1 support the initial splitting and distribution? since I expect it to distribute the initial splitting to multiple workers. On Fri, Aug 21, 2020 at 11:00 AM Luke Cwik wrote: > Are you using Dataflow runner v2[1] since the default for Beam Java still > uses Dataflow runner

Splittable-Dofn not distributing the work to multiple workers

2020-08-21 Thread Jiadai Xia
Hi, As stated in the title, I tried to implement a SDF for reading the Parquet file and I am trying to run it with Dataflow runner. As the initial split outputs a bunch of ranges but the number of workers are not scaled up and the work is not distributed. Any suggestion on what can be the problem?

Re: @StateId uniqueness across DoFn(s)

2020-08-21 Thread Ke Wu
Thank you Reuven for the confirmation. Do you know what is the recommended way for underlying runners to distinguish same state id in different DoFn(s)? > On Aug 21, 2020, at 10:27 AM, Reuven Lax wrote: > > StateId is scoped to the DoFn. You can use the same string in different DoFns > for

Re: Splittable-Dofn not distributing the work to multiple workers

2020-08-21 Thread Luke Cwik
Yes it does. There should be a reshuffle between the initial splitting and the processing portion. On Fri, Aug 21, 2020 at 11:04 AM Jiadai Xia wrote: > I am using v1. Does v1 support the initial splitting and distribution? > since I expect it to distribute the initial splitting to multiple

Re: @StateId uniqueness across DoFn(s)

2020-08-21 Thread Robert Bradshaw
We should be using PTransform Labels (aka Names), not ids, for naming state. This is why the names must line up when doing, for example, a Dataflow update operation with Stateful DoFns. (And, yes, if the user does not specify the transform name, and it is autogenerated differently, this will be

Re: Splittable-Dofn not distributing the work to multiple workers

2020-08-21 Thread Luke Cwik
Are you using Dataflow runner v2[1] since the default for Beam Java still uses Dataflow runner v1? Dataflow runner v2 is the only one that supports autoscaling and dynamic splitting of splittable dofns in bounded pipelines. 1:

Re: @StateId uniqueness across DoFn(s)

2020-08-21 Thread Luke Cwik
The DoFn is associated with a PTransform and in the pipeline proto there is a unique id associated with each PTransform. You can use that to generate a composite key (ptransformid, stateid) which will be unique within the pipeline. On Fri, Aug 21, 2020 at 11:26 AM Ke Wu wrote: > Thank you