Re: @StateId uniqueness across DoFn(s)

2020-08-21 Thread Robert Bradshaw
We should be using PTransform Labels (aka Names), not ids, for naming
state. This is why the names must line up when doing, for example, a
Dataflow update operation with Stateful DoFns.

(And, yes, if the user does not specify the transform name, and it is
autogenerated differently, this will be an error. This is why we throw
exceptions in the SDK if a name is re-used rather than just appending
a counter or similar.)


On Fri, Aug 21, 2020 at 4:12 PM Ke Wu  wrote:
>
> If user does not explicitly specify transform name, in which case a 
> autogenerated name will be used when generating the unique id, does it mean, 
> the id could change when the pipeline changes, such as adding extra 
> transforms etc?
>
> On Aug 21, 2020, at 11:43 AM, Luke Cwik  wrote:
>
> The DoFn is associated with a PTransform and in the pipeline proto there is a 
> unique id associated with each PTransform. You can use that to generate a 
> composite key (ptransformid, stateid) which will be unique within the 
> pipeline.
>
> On Fri, Aug 21, 2020 at 11:26 AM Ke Wu  wrote:
>>
>> Thank you Reuven for the confirmation. Do you know what is the recommended 
>> way for underlying runners to distinguish same state id in different DoFn(s)?
>>
>> On Aug 21, 2020, at 10:27 AM, Reuven Lax  wrote:
>>
>> StateId is scoped to the DoFn. You can use the same string in different 
>> DoFns for completely different states.
>>
>> On Fri, Aug 21, 2020 at 10:21 AM Ke Wu  wrote:
>>>
>>> Hello everyone,
>>>
>>> After reading through Stateful processing with Apache Beam and 
>>> DoFn.StateId, I understand that each state id must be unique and must be 
>>> the same type at least in the same DoFn, however, it does not explicitly 
>>> mention whether or not it is expected and supported that the same state id 
>>> to be declared in different DoFn(s). If Yes, is the state supposed to be a 
>>> shared state or is supposed to completed separate, therefore it could even 
>>> be different types. If No, it seems that the validation in Beam SDK only 
>>> validates uniqueness in the same DoFn.
>>>
>>> Thanks,
>>> Ke
>>
>>
>


Re: @StateId uniqueness across DoFn(s)

2020-08-21 Thread Ke Wu
If user does not explicitly specify transform name, in which case a 
autogenerated name will be used when generating the unique id, does it mean, 
the id could change when the pipeline changes, such as adding extra transforms 
etc?

> On Aug 21, 2020, at 11:43 AM, Luke Cwik  wrote:
> 
> The DoFn is associated with a PTransform and in the pipeline proto there is a 
> unique id associated with each PTransform. You can use that to generate a 
> composite key (ptransformid, stateid) which will be unique within the 
> pipeline.
> 
> On Fri, Aug 21, 2020 at 11:26 AM Ke Wu  > wrote:
> Thank you Reuven for the confirmation. Do you know what is the recommended 
> way for underlying runners to distinguish same state id in different DoFn(s)?
> 
>> On Aug 21, 2020, at 10:27 AM, Reuven Lax > > wrote:
>> 
>> StateId is scoped to the DoFn. You can use the same string in different 
>> DoFns for completely different states.
>> 
>> On Fri, Aug 21, 2020 at 10:21 AM Ke Wu > > wrote:
>> Hello everyone,
>> 
>> After reading through Stateful processing with Apache Beam 
>>  and DoFn.StateId 
>> ,
>>  I understand that each state id must be unique and must be the same type at 
>> least in the same DoFn, however, it does not explicitly mention whether or 
>> not it is expected and supported that the same state id to be declared in 
>> different DoFn(s). If Yes, is the state supposed to be a shared state or is 
>> supposed to completed separate, therefore it could even be different types. 
>> If No, it seems that the validation in Beam SDK only validates uniqueness in 
>> the same DoFn.
>> 
>> Thanks,
>> Ke
> 



Re: Splittable-Dofn not distributing the work to multiple workers

2020-08-21 Thread Luke Cwik
Yes it does.

There should be a reshuffle between the initial splitting and the
processing portion.

On Fri, Aug 21, 2020 at 11:04 AM Jiadai Xia  wrote:

> I am using v1. Does v1 support the initial splitting and distribution?
> since I expect it to distribute the initial splitting to multiple workers.
>
> On Fri, Aug 21, 2020 at 11:00 AM Luke Cwik  wrote:
>
>> Are you using Dataflow runner v2[1] since the default for Beam Java still
>> uses Dataflow runner v1?
>> Dataflow runner v2 is the only one that supports autoscaling and dynamic
>> splitting of splittable dofns in bounded pipelines.
>>
>> 1:
>> https://cloud.google.com/dataflow/docs/guides/deploying-a-pipeline#dataflow-runner-v2
>>
>> On Fri, Aug 21, 2020 at 10:54 AM Jiadai Xia  wrote:
>>
>>> Hi,
>>> As stated in the title, I tried to implement a SDF for reading the
>>> Parquet file and I am trying to run it with Dataflow runner. As the initial
>>> split outputs a bunch of ranges but the number of workers are not scaled up
>>> and the work is not distributed. Any suggestion on what can be the problem?
>>> I have tested it with Direct runner and the parallelism looks fine on
>>> small samples on Direct Runner.
>>> Below is my implementation of the SDF
>>> https://github.com/apache/beam/pull/12223
>>> --
>>>
>>>
>>>
>>>
>>>
>>> *Jiadai Xia*
>>>
>>> SWE Intern
>>>
>>> 1 (646) 413 8071 <(646)%20413-8071>
>>>
>>> daniel...@google.com
>>>
>>> 
>>> 
>>> 
>>> 
>>>
>>> 
>>>
>>>
>>>
>
> --
>
>
>
>
>
> *Jiadai Xia*
>
> SWE Intern
>
> 1 (646) 413 8071 <(646)%20413-8071>
>
> daniel...@google.com
>
> 
> 
> 
> 
>
> 
>
>
>


Re: @StateId uniqueness across DoFn(s)

2020-08-21 Thread Luke Cwik
The DoFn is associated with a PTransform and in the pipeline proto there is
a unique id associated with each PTransform. You can use that to generate a
composite key (ptransformid, stateid) which will be unique within the
pipeline.

On Fri, Aug 21, 2020 at 11:26 AM Ke Wu  wrote:

> Thank you Reuven for the confirmation. Do you know what is the recommended
> way for underlying runners to distinguish same state id in different
> DoFn(s)?
>
> On Aug 21, 2020, at 10:27 AM, Reuven Lax  wrote:
>
> StateId is scoped to the DoFn. You can use the same string in different
> DoFns for completely different states.
>
> On Fri, Aug 21, 2020 at 10:21 AM Ke Wu  wrote:
>
>> Hello everyone,
>>
>> After reading through Stateful processing with Apache Beam
>>  and DoFn.StateId
>> ,
>> I understand that each state id must be unique and must be the same type at
>> least in the same DoFn, however, it does not explicitly mention whether or
>> not it is expected and supported that the same state id to be declared in
>> different DoFn(s). If Yes, is the state supposed to be a shared state or is
>> supposed to completed separate, therefore it could even be different types.
>> If No, it seems that the validation in Beam SDK only validates uniqueness
>> in the same DoFn.
>>
>> Thanks,
>> Ke
>>
>
>


Re: @StateId uniqueness across DoFn(s)

2020-08-21 Thread Ke Wu
Thank you Reuven for the confirmation. Do you know what is the recommended way 
for underlying runners to distinguish same state id in different DoFn(s)?

> On Aug 21, 2020, at 10:27 AM, Reuven Lax  wrote:
> 
> StateId is scoped to the DoFn. You can use the same string in different DoFns 
> for completely different states.
> 
> On Fri, Aug 21, 2020 at 10:21 AM Ke Wu  > wrote:
> Hello everyone,
> 
> After reading through Stateful processing with Apache Beam 
>  and DoFn.StateId 
> ,
>  I understand that each state id must be unique and must be the same type at 
> least in the same DoFn, however, it does not explicitly mention whether or 
> not it is expected and supported that the same state id to be declared in 
> different DoFn(s). If Yes, is the state supposed to be a shared state or is 
> supposed to completed separate, therefore it could even be different types. 
> If No, it seems that the validation in Beam SDK only validates uniqueness in 
> the same DoFn.
> 
> Thanks,
> Ke



Re: Splittable-Dofn not distributing the work to multiple workers

2020-08-21 Thread Jiadai Xia
I am using v1. Does v1 support the initial splitting and distribution?
since I expect it to distribute the initial splitting to multiple workers.

On Fri, Aug 21, 2020 at 11:00 AM Luke Cwik  wrote:

> Are you using Dataflow runner v2[1] since the default for Beam Java still
> uses Dataflow runner v1?
> Dataflow runner v2 is the only one that supports autoscaling and dynamic
> splitting of splittable dofns in bounded pipelines.
>
> 1:
> https://cloud.google.com/dataflow/docs/guides/deploying-a-pipeline#dataflow-runner-v2
>
> On Fri, Aug 21, 2020 at 10:54 AM Jiadai Xia  wrote:
>
>> Hi,
>> As stated in the title, I tried to implement a SDF for reading the
>> Parquet file and I am trying to run it with Dataflow runner. As the initial
>> split outputs a bunch of ranges but the number of workers are not scaled up
>> and the work is not distributed. Any suggestion on what can be the problem?
>> I have tested it with Direct runner and the parallelism looks fine on
>> small samples on Direct Runner.
>> Below is my implementation of the SDF
>> https://github.com/apache/beam/pull/12223
>> --
>>
>>
>>
>>
>>
>> *Jiadai Xia*
>>
>> SWE Intern
>>
>> 1 (646) 413 8071 <(646)%20413-8071>
>>
>> daniel...@google.com
>>
>> 
>> 
>> 
>> 
>>
>> 
>>
>>
>>

-- 





*Jiadai Xia*

SWE Intern

1 (646) 413 8071

daniel...@google.com



 




Re: Splittable-Dofn not distributing the work to multiple workers

2020-08-21 Thread Luke Cwik
Are you using Dataflow runner v2[1] since the default for Beam Java still
uses Dataflow runner v1?
Dataflow runner v2 is the only one that supports autoscaling and dynamic
splitting of splittable dofns in bounded pipelines.

1:
https://cloud.google.com/dataflow/docs/guides/deploying-a-pipeline#dataflow-runner-v2

On Fri, Aug 21, 2020 at 10:54 AM Jiadai Xia  wrote:

> Hi,
> As stated in the title, I tried to implement a SDF for reading the Parquet
> file and I am trying to run it with Dataflow runner. As the initial split
> outputs a bunch of ranges but the number of workers are not scaled up and
> the work is not distributed. Any suggestion on what can be the problem?
> I have tested it with Direct runner and the parallelism looks fine on
> small samples on Direct Runner.
> Below is my implementation of the SDF
> https://github.com/apache/beam/pull/12223
> --
>
>
>
>
>
> *Jiadai Xia*
>
> SWE Intern
>
> 1 (646) 413 8071 <(646)%20413-8071>
>
> daniel...@google.com
>
> 
> 
> 
> 
>
> 
>
>
>


Splittable-Dofn not distributing the work to multiple workers

2020-08-21 Thread Jiadai Xia
Hi,
As stated in the title, I tried to implement a SDF for reading the Parquet
file and I am trying to run it with Dataflow runner. As the initial split
outputs a bunch of ranges but the number of workers are not scaled up and
the work is not distributed. Any suggestion on what can be the problem?
I have tested it with Direct runner and the parallelism looks fine on small
samples on Direct Runner.
Below is my implementation of the SDF
https://github.com/apache/beam/pull/12223
-- 





*Jiadai Xia*

SWE Intern

1 (646) 413 8071

daniel...@google.com



 




Re: @StateId uniqueness across DoFn(s)

2020-08-21 Thread Reuven Lax
StateId is scoped to the DoFn. You can use the same string in different
DoFns for completely different states.

On Fri, Aug 21, 2020 at 10:21 AM Ke Wu  wrote:

> Hello everyone,
>
> After reading through Stateful processing with Apache Beam
>  and DoFn.StateId
> ,
> I understand that each state id must be unique and must be the same type at
> least in the same DoFn, however, it does not explicitly mention whether or
> not it is expected and supported that the same state id to be declared in
> different DoFn(s). If Yes, is the state supposed to be a shared state or is
> supposed to completed separate, therefore it could even be different types.
> If No, it seems that the validation in Beam SDK only validates uniqueness
> in the same DoFn.
>
> Thanks,
> Ke
>


Re: Is there an equivalent for --numberOfWorkerHarnessThreads in Python SDK?

2020-08-21 Thread Reuven Lax
Streaming Dataflow relies on high thread count for performance. Streaming
threads spend a high percentage of time blocked on IO, so in order to get
decent CPU utilization we need a lot of threads. Limiting the thread count
risks causing performance issues.

On Fri, Aug 21, 2020 at 8:00 AM Kamil Wasilewski <
kamil.wasilew...@polidea.com> wrote:

> No, I'm not. But thanks anyway, I totally missed that option!
>
> It occurs in a simple pipeline that executes CoGroupByKey over two
> PCollections. Reading from a bounded source, 20 millions and 2 millions
> elements, respectively. One global window. Here's a link to the code, it's
> one of our tests:
> https://github.com/apache/beam/blob/master/sdks/python/apache_beam/testing/load_tests/co_group_by_key_test.py
>
>
> On Thu, Aug 20, 2020 at 6:48 PM Luke Cwik  wrote:
>
>> +user 
>>
>> On Thu, Aug 20, 2020 at 9:47 AM Luke Cwik  wrote:
>>
>>> Are you using Dataflow runner v2[1]?
>>>
>>> If so, then you can use:
>>> --number_of_worker_harness_threads=X
>>>
>>> Do you know where/why the OOM is occurring?
>>>
>>> 1:
>>> https://cloud.google.com/dataflow/docs/guides/deploying-a-pipeline#dataflow-runner-v2
>>> 2:
>>> https://github.com/apache/beam/blob/017936f637b119f0b0c0279a226c9f92a2cf4f15/sdks/python/apache_beam/options/pipeline_options.py#L834
>>>
>>> On Thu, Aug 20, 2020 at 7:33 AM Kamil Wasilewski <
>>> kamil.wasilew...@polidea.com> wrote:
>>>
 Hi all,

 As I stated in the title, is there an equivalent for
 --numberOfWorkerHarnessThreads in Python SDK? I've got a streaming pipeline
 in Python which suffers from OutOfMemory exceptions (I'm using Dataflow).
 Switching to highmem workers solved the issue, but I wonder if I can set a
 limit of threads that will be used in a single worker to decrease memory
 usage.

 Regards,
 Kamil




@StateId uniqueness across DoFn(s)

2020-08-21 Thread Ke Wu
Hello everyone,

After reading through Stateful processing with Apache Beam 
 and DoFn.StateId 
,
 I understand that each state id must be unique and must be the same type at 
least in the same DoFn, however, it does not explicitly mention whether or not 
it is expected and supported that the same state id to be declared in different 
DoFn(s). If Yes, is the state supposed to be a shared state or is supposed to 
completed separate, therefore it could even be different types. If No, it seems 
that the validation in Beam SDK only validates uniqueness in the same DoFn.

Thanks,
Ke

Re: Suggestion to let KafkaIO support the deserializer API with headers

2020-08-21 Thread Luke Cwik
Sounds good.

Note that you'll also want to update ReadFromKafkaDoFn[1] and provide tests
that cover both to make sure we don't regress and stop providing headers.

1:
https://github.com/apache/beam/blob/cfa448d121297398312d09c531258a72b413488b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java#L309

On Fri, Aug 21, 2020 at 8:29 AM Lourens Naude 
wrote:

> Hi everyone,
>
> We bumped into an API issue with the deserializer called on constructing
> KafaRecord instances in the KafkaIO module.
>
> I wanted to float this past the mailing list for discussion first before
> exploring further.
>
> The callsite referenced: KafkaIO only calls the deserializer with the
> simplified API that does not include Kafka record headers (even though they
> are available to pass as an argument):
> https://github.com/apache/beam/blob/release-2.20.0/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java#L202-L203
>
> Our SerDes implementaton relies on Kafka Headers support and it was added
> to Kafka records via KIP as a means to include metadata cleanly and not
> abuse keys or values for such purposes.
>
> It is also a valid Deserializer API as per the official Kafka interface:
>
> *
> https://github.com/apache/kafka/blob/35a0de32ee3823dfb548a1cd5d5faf4f7c99e4e0/clients/src/main/java/org/apache/kafka/common/serialization/Deserializer.java#L59-L61
> * It delegates to the simplified version as it's default implementation
> (which requires a formal implementation) in
> https://github.com/apache/kafka/blob/35a0de32ee3823dfb548a1cd5d5faf4f7c99e4e0/clients/src/main/java/org/apache/kafka/common/serialization/Deserializer.java#L60
> * The default behaviour is thus backwards compatible, with a preference
> for the header specific API
>
> We've used the custom SerDes without issues in a complex Connect and
> Streams pipeline, but bumped into this API divergence of not preferring the
> deserializer API with headers as the primary deserializer mechanism.
>
> The same API used elsewhere.
>
> * It's the default for the stock Java consumer:
> https://github.com/apache/kafka/blob/4cd2396db31418c90005c998d9107ad40df055b2/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java#L1362
> (header enabled calls simplified API)
> * Ditto Kafka Connect:
> https://github.com/apache/kafka/blob/b399a731a39c28bdd89998edc7c9fd732c56eee1/connect/api/src/main/java/org/apache/kafka/connect/storage/Converter.java#L48-L64
> * And Kafka Streams:
> https://github.com/apache/kafka/blob/92828d53b18703000159f4dd7dc8b3170667db25/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordDeserializer.java#L65-L66
>
> Any thoughts on the proposed change with the additional headers argument
> passed on deserialization?
>
> Best,
> Lourens
>


Suggestion to let KafkaIO support the deserializer API with headers

2020-08-21 Thread Lourens Naude
Hi everyone,

We bumped into an API issue with the deserializer called on constructing
KafaRecord instances in the KafkaIO module.

I wanted to float this past the mailing list for discussion first before
exploring further.

The callsite referenced: KafkaIO only calls the deserializer with the
simplified API that does not include Kafka record headers (even though they
are available to pass as an argument):
https://github.com/apache/beam/blob/release-2.20.0/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java#L202-L203

Our SerDes implementaton relies on Kafka Headers support and it was added
to Kafka records via KIP as a means to include metadata cleanly and not
abuse keys or values for such purposes.

It is also a valid Deserializer API as per the official Kafka interface:

*
https://github.com/apache/kafka/blob/35a0de32ee3823dfb548a1cd5d5faf4f7c99e4e0/clients/src/main/java/org/apache/kafka/common/serialization/Deserializer.java#L59-L61
* It delegates to the simplified version as it's default implementation
(which requires a formal implementation) in
https://github.com/apache/kafka/blob/35a0de32ee3823dfb548a1cd5d5faf4f7c99e4e0/clients/src/main/java/org/apache/kafka/common/serialization/Deserializer.java#L60
* The default behaviour is thus backwards compatible, with a preference for
the header specific API

We've used the custom SerDes without issues in a complex Connect and
Streams pipeline, but bumped into this API divergence of not preferring the
deserializer API with headers as the primary deserializer mechanism.

The same API used elsewhere.

* It's the default for the stock Java consumer:
https://github.com/apache/kafka/blob/4cd2396db31418c90005c998d9107ad40df055b2/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java#L1362
(header enabled calls simplified API)
* Ditto Kafka Connect:
https://github.com/apache/kafka/blob/b399a731a39c28bdd89998edc7c9fd732c56eee1/connect/api/src/main/java/org/apache/kafka/connect/storage/Converter.java#L48-L64
* And Kafka Streams:
https://github.com/apache/kafka/blob/92828d53b18703000159f4dd7dc8b3170667db25/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordDeserializer.java#L65-L66

Any thoughts on the proposed change with the additional headers argument
passed on deserialization?

Best,
Lourens


Re: Is there an equivalent for --numberOfWorkerHarnessThreads in Python SDK?

2020-08-21 Thread Kamil Wasilewski
No, I'm not. But thanks anyway, I totally missed that option!

It occurs in a simple pipeline that executes CoGroupByKey over two
PCollections. Reading from a bounded source, 20 millions and 2 millions
elements, respectively. One global window. Here's a link to the code, it's
one of our tests:
https://github.com/apache/beam/blob/master/sdks/python/apache_beam/testing/load_tests/co_group_by_key_test.py


On Thu, Aug 20, 2020 at 6:48 PM Luke Cwik  wrote:

> +user 
>
> On Thu, Aug 20, 2020 at 9:47 AM Luke Cwik  wrote:
>
>> Are you using Dataflow runner v2[1]?
>>
>> If so, then you can use:
>> --number_of_worker_harness_threads=X
>>
>> Do you know where/why the OOM is occurring?
>>
>> 1:
>> https://cloud.google.com/dataflow/docs/guides/deploying-a-pipeline#dataflow-runner-v2
>> 2:
>> https://github.com/apache/beam/blob/017936f637b119f0b0c0279a226c9f92a2cf4f15/sdks/python/apache_beam/options/pipeline_options.py#L834
>>
>> On Thu, Aug 20, 2020 at 7:33 AM Kamil Wasilewski <
>> kamil.wasilew...@polidea.com> wrote:
>>
>>> Hi all,
>>>
>>> As I stated in the title, is there an equivalent for
>>> --numberOfWorkerHarnessThreads in Python SDK? I've got a streaming pipeline
>>> in Python which suffers from OutOfMemory exceptions (I'm using Dataflow).
>>> Switching to highmem workers solved the issue, but I wonder if I can set a
>>> limit of threads that will be used in a single worker to decrease memory
>>> usage.
>>>
>>> Regards,
>>> Kamil
>>>
>>>