Re: [ANNOUNCE] New PMC Member: Valentyn Tymofieiev

2023-10-03 Thread Boyuan Zhang
Well deserved!! Congrats 

On Tue, Oct 3, 2023 at 12:47 Byron Ellis via dev 
wrote:

> Congrats!
>
> On Tue, Oct 3, 2023 at 12:40 PM Danielle Syse via dev 
> wrote:
>
>> Congrats Valentyn!!
>>
>> On Tue, Oct 3, 2023 at 2:59 PM Ahmet Altay via dev 
>> wrote:
>>
>>> Congratulations Valentyn! Well deserved!
>>>
>>> On Tue, Oct 3, 2023 at 11:54 AM Ritesh Ghorse via dev <
>>> dev@beam.apache.org> wrote:
>>>
 Congratulations Valentyn!

 On Tue, Oct 3, 2023 at 2:53 PM Jack McCluskey via dev <
 dev@beam.apache.org> wrote:

> Congrats Valentyn!
>
> On Tue, Oct 3, 2023 at 2:40 PM Kenneth Knowles 
> wrote:
>
>> Hi all,
>>
>> Please join me and the rest of the Beam PMC in welcoming Valentyn
>> Tymofieiev  as our newest PMC member.
>>
>> Valentyn has been contributing to Beam since 2017. Notable highlights
>> include his work on the Python SDK and also in our container management.
>> Valentyn also is involved in many discussions around Beam's 
>> infrastructure
>> and community processes. If you look through Valentyn's history, you will
>> see an abundance of the most critical maintenance work that is the 
>> beating
>> heart of any project.
>>
>> Congratulations Valentyn and thanks for being a part of Apache Beam!
>>
>> Kenn, on behalf of the Beam PMC (which now includes Valentyn)
>>
>


Re: [ANNOUNCE] New PMC Member: Robert Burke

2023-10-03 Thread Boyuan Zhang
Congratulations  Well deserved!

On Tue, Oct 3, 2023 at 12:48 Byron Ellis via dev 
wrote:

> Congrats!
>
> On Tue, Oct 3, 2023 at 12:40 PM Danielle Syse via dev 
> wrote:
>
>> Congrats Rebo!!! Can't wait to work more closely with you.
>>
>> On Tue, Oct 3, 2023 at 2:56 PM Ahmet Altay via dev 
>> wrote:
>>
>>> Congratulations Robert! Well deserved!
>>>
>>> On Tue, Oct 3, 2023 at 11:54 AM Ritesh Ghorse via dev <
>>> dev@beam.apache.org> wrote:
>>>
 Congratulations Robert!

 On Tue, Oct 3, 2023 at 2:52 PM Danny McCormick via dev <
 dev@beam.apache.org> wrote:

> Congrats Robert, this is very well deserved!
>
> On Tue, Oct 3, 2023 at 2:50 PM Anand Inguva via dev <
> dev@beam.apache.org> wrote:
>
>> Congratulations!!
>>
>> On Tue, Oct 3, 2023 at 2:49 PM XQ Hu via dev 
>> wrote:
>>
>>> Congratulations, Robert!
>>>
>>> On Tue, Oct 3, 2023 at 2:40 PM Kenneth Knowles 
>>> wrote:
>>>
 Hi all,

 Please join me and the rest of the Beam PMC in welcoming Robert
 Burke  as our newest PMC member.

 Robert has been a part of the Beam community since 2017. He is our
 resident Gopher, producing the Go SDK and most recently the local,
 portable, Prism runner. Robert has presented on Beam many times, having
 written not just core Beam code but quite interesting pipelines too :-)

 Congratulations Robert and thanks for being a part of Apache Beam!

 Kenn, on behalf of the Beam PMC (which now includes Robert)

>>>


Re: KafkaIO SDF reader not respecting consumer group persisted offsets

2021-07-20 Thread Boyuan Zhang
Hi Alisdair,

There are several ways to configure SDF implementation
(ReadSourceDescriptors/ReadFromKafkaDoFn)  to commit the offset:

   - Set `enable.auto.commit` in your consumer config, or
   - Configure your KafkaIO with commitOffsetsInFinalize
   
.
   It will expand the Kafka read with a Kafka commit transform
   
,
   which commits offset every 5 mins.

I believe this behavior is the same as KafkaUnboundedReader. That means,
even with KafkaUnboundedReader, you need to provide these commit
configurations to your Kafka read to let it commit the offset when it's
time to do so.

On Tue, Jul 20, 2021 at 9:33 PM Alisdair Sullivan <
alisdair.sulli...@unity3d.com> wrote:

> With the KafkaUnboundedReader when a consumer is configured with a
> consumer group id a call is made to get the current position of the
> consumer group for each topic/partition pair. This offset is used as the
> initial offset for the read transform. This allows pipelines that persist
> offsets (either via auto commit configured via the consumer or via the
> commit offsets in finalize option) to later resume at the same point.
>
> With the SDF implementation (ReadSourceDescriptors/ReadFromKafkaDoFn) of
> the Kafka reader no equivalent call is made to retrieve offsets. The SDF
> implementation uses instead the offset consumer to retrieve the initial
> offset. Because the offset consumer is configured to never commit offsets
> this will always be one of earliest, latest or an offset calculated from a
> specific time.
>
> Users can provide to ReadFromKafkaDoFn a KafkaSourceDescriptor with an
> initial offset retrieved explicitly but on a failure of the processElement
> transform there is no way to resume from the last persisted offset. Instead
> I believe the original offset will be used. In long lived pipelines this
> will result in data duplication.
>
> Is this an intended change, or should I open an issue marking this as a
> regression in functionality? Thanks.
>
>


Re: PreCommit tests not running

2021-06-07 Thread Boyuan Zhang
There is an infra issue ongoing:
https://issues.apache.org/jira/browse/INFRA-21976, which affects all beam
tests.

On Mon, Jun 7, 2021 at 9:21 PM Reuven Lax  wrote:

> https://github.com/apache/beam/pull/14949
>
> Java PreCommit has been pending for 2 days. Is something wrong
> with Jenkins?
>
> Reuven
>


Re: [Question] Best Practice of Handling Null Key for KafkaRecordCoder

2021-06-03 Thread Boyuan Zhang
Considering the problem of populating KafkaRecord metadata(BEAM-12076
<https://issues.apache.org/jira/projects/BEAM/issues/BEAM-12076>) together,
what's the plan there? Are we going to make KafkaRecordCoder as a
well-known coder as well? The reason why I ask is because it might be a
good chance to revisit the KafkaRecordCoder implementation.

On Thu, Jun 3, 2021 at 2:17 PM Chamikara Jayalath 
wrote:

>
>
> On Thu, Jun 3, 2021 at 2:06 PM Boyuan Zhang  wrote:
>
>> Supporting the x-lang boundary is a good point. So you are suggesting
>> that:
>>
>>1. We make NullableCoder as a standard coder.
>>2. KafkaIO wraps the keyCoder with NullabeCoder directly if it
>>requires.
>>
>> Is that correct?
>>
>
> Yeah.
>
>
>>
>>
>> On Wed, Jun 2, 2021 at 6:47 PM Chamikara Jayalath 
>> wrote:
>>
>>> I think we should make NullableCoder a standard coder for Beam [1] and
>>> use a standard Nullablecoder(KeyCoder) for Kafka keys (where KeyCoder might
>>> be the standard ByteArrayCoder for example)
>>> I think we have compatible Java and Python NullableCoder implementations
>>> already so implementing this should be relatively straightforward.
>>>
>>> Non-standard coders may not be supported by runners at the
>>> cross-language boundary.
>>>
>>> Thanks,
>>> Cham
>>>
>>> [1]
>>> https://github.com/apache/beam/blob/d0c3dd72874fada03cc601f30cde022a8dd6aa9c/model/pipeline/src/main/proto/beam_runner_api.proto#L784
>>>
>>> On Wed, Jun 2, 2021 at 6:25 PM Ahmet Altay  wrote:
>>>
>>>> /cc folks who commented on the issue: @Robin Qiu  
>>>> @Chamikara
>>>> Jayalath  @Alexey Romanenko
>>>>  @Daniel Collins 
>>>>
>>>> On Tue, Jun 1, 2021 at 2:03 PM Weiwen Xu  wrote:
>>>>
>>>>> Hello,
>>>>>
>>>>> I'm working on [this issue](
>>>>> https://issues.apache.org/jira/browse/BEAM-12008) with Boyuan. She
>>>>> was very helpful in identifying the issue which is that KafkaRecordCoder
>>>>> couldn't handle the case when key is null.
>>>>>
>>>>> We came out with two potential solutions. Yet both have its pros and
>>>>> cons so I'm hoping to gather some suggestions/opinions or ideas of how to
>>>>> handle this issue. For our solutions:
>>>>>
>>>>> 1. directly wrapping the keyCoder with Nullablecoder i.e.
>>>>> NullableCoder.of(keyCoder)
>>>>> cons: backwards compatibility problem
>>>>>
>>>>> 2. writing a completely new class named something like
>>>>> NullableKeyKafkaRecordCoder
>>>>> instead of using KVCoder and encode/decode KVs, we have KeyCoder
>>>>> and ValueCoder as fields and another BooleanCoder to encode/decode T/F for
>>>>> present of null key. If key is null, KeyCoder will not encode/decode.
>>>>>
>>>>>   - [L63] encode(...){
>>>>>stringCoder.encode(topic, ...);
>>>>>intCoder.encode(partition, ...);
>>>>>longCoder.encode(offset, ...);
>>>>>longCoder.encode(timestamp, ...);
>>>>>intCoder.encode(timestamptype, ...);
>>>>>headerCoder.encode(...)
>>>>>if(Key!=null){
>>>>>   BooleanCoder.encode(false, ...);
>>>>>   KeyCoder.encode(key, ...);
>>>>>}else{
>>>>>   BooleanCoder.encode(true, ...);
>>>>>   // skips KeyCoder when key is null
>>>>>}
>>>>>   ValueCoder.encode(value, ...);
>>>>> }
>>>>>
>>>>>   - [L74] decode(...){
>>>>>   return new KafkaRecord<>(
>>>>>
>>>>> stringCoder.decode(inStream),
>>>>> intCoder.decode(inStream),
>>>>> longCoder.decode(inStream),
>>>>> longCoder.decode(inStream),
>>>>>
>>>>> KafkaTimestampType.forOrdinal(intCoder.decode(inStream)),
>>>>> (Headers)
>>>>> toHeaders(headerCoder.decode(inStream)),
>>>>>
>>>>> BooleanCoder.decode(inStream)? null:KeyCoder.decode(inStream),
>>>>> ValueCoder.decode(inStream)
>>>>> );
>>>>> }
>>>>>
>>>>> Best regards,
>>>>> Weiwen
>>>>>
>>>>


Re: [Question] Best Practice of Handling Null Key for KafkaRecordCoder

2021-06-03 Thread Boyuan Zhang
Supporting the x-lang boundary is a good point. So you are suggesting that:

   1. We make NullableCoder as a standard coder.
   2. KafkaIO wraps the keyCoder with NullabeCoder directly if it requires.

Is that correct?


On Wed, Jun 2, 2021 at 6:47 PM Chamikara Jayalath 
wrote:

> I think we should make NullableCoder a standard coder for Beam [1] and use
> a standard Nullablecoder(KeyCoder) for Kafka keys (where KeyCoder might be
> the standard ByteArrayCoder for example)
> I think we have compatible Java and Python NullableCoder implementations
> already so implementing this should be relatively straightforward.
>
> Non-standard coders may not be supported by runners at the cross-language
> boundary.
>
> Thanks,
> Cham
>
> [1]
> https://github.com/apache/beam/blob/d0c3dd72874fada03cc601f30cde022a8dd6aa9c/model/pipeline/src/main/proto/beam_runner_api.proto#L784
>
> On Wed, Jun 2, 2021 at 6:25 PM Ahmet Altay  wrote:
>
>> /cc folks who commented on the issue: @Robin Qiu  
>> @Chamikara
>> Jayalath  @Alexey Romanenko
>>  @Daniel Collins 
>>
>> On Tue, Jun 1, 2021 at 2:03 PM Weiwen Xu  wrote:
>>
>>> Hello,
>>>
>>> I'm working on [this issue](
>>> https://issues.apache.org/jira/browse/BEAM-12008) with Boyuan. She was
>>> very helpful in identifying the issue which is that KafkaRecordCoder
>>> couldn't handle the case when key is null.
>>>
>>> We came out with two potential solutions. Yet both have its pros and
>>> cons so I'm hoping to gather some suggestions/opinions or ideas of how to
>>> handle this issue. For our solutions:
>>>
>>> 1. directly wrapping the keyCoder with Nullablecoder i.e.
>>> NullableCoder.of(keyCoder)
>>> cons: backwards compatibility problem
>>>
>>> 2. writing a completely new class named something like
>>> NullableKeyKafkaRecordCoder
>>> instead of using KVCoder and encode/decode KVs, we have KeyCoder and
>>> ValueCoder as fields and another BooleanCoder to encode/decode T/F for
>>> present of null key. If key is null, KeyCoder will not encode/decode.
>>>
>>>   - [L63] encode(...){
>>>stringCoder.encode(topic, ...);
>>>intCoder.encode(partition, ...);
>>>longCoder.encode(offset, ...);
>>>longCoder.encode(timestamp, ...);
>>>intCoder.encode(timestamptype, ...);
>>>headerCoder.encode(...)
>>>if(Key!=null){
>>>   BooleanCoder.encode(false, ...);
>>>   KeyCoder.encode(key, ...);
>>>}else{
>>>   BooleanCoder.encode(true, ...);
>>>   // skips KeyCoder when key is null
>>>}
>>>   ValueCoder.encode(value, ...);
>>> }
>>>
>>>   - [L74] decode(...){
>>>   return new KafkaRecord<>(
>>> stringCoder.decode(inStream),
>>> intCoder.decode(inStream),
>>> longCoder.decode(inStream),
>>> longCoder.decode(inStream),
>>>
>>> KafkaTimestampType.forOrdinal(intCoder.decode(inStream)),
>>> (Headers)
>>> toHeaders(headerCoder.decode(inStream)),
>>>
>>> BooleanCoder.decode(inStream)? null:KeyCoder.decode(inStream),
>>> ValueCoder.decode(inStream)
>>> );
>>> }
>>>
>>> Best regards,
>>> Weiwen
>>>
>>


Re: beam dev community

2021-05-24 Thread Boyuan Zhang
Welcome! Please checkout the contribution guide if you plan to contribute
to beam: https://beam.apache.org/contribute/

On Mon, May 24, 2021 at 11:30 AM shahnawaz aziz 
wrote:

> Please add me to this community as we are using the beam for our big data
> project.
>
> Thanks
>


Re: Question about SplittableDoFn

2021-05-19 Thread Boyuan Zhang
Thanks for sharing. I'll comment on the PR.

On Tue, May 18, 2021 at 3:44 PM Miguel Anzo Palomo 
wrote:

> Boyuan Zhang, It's about this issue
> <https://issues.apache.org/jira/browse/BEAM-11996>, the code can be found
> here <https://github.com/apache/beam/pull/14811/files>
>
> On Tue, May 18, 2021 at 5:29 PM Boyuan Zhang  wrote:
>
>> Would you like to share your draft code? Iterating on the code might be
>> easier to figure out the issue.
>>
>> On Tue, May 18, 2021 at 3:28 PM Robert Burke  wrote:
>>
>>> IIRC the Initial Restrictions method gives you an element and you return
>>> the restrictions relative to that element.
>>>
>>> It's entirely appropriate to stat files or query databases in order to
>>> determine the initial restrictions and partitions of the data.
>>>
>>>
>>> On Tue, May 18, 2021, 3:21 PM Miguel Anzo Palomo <
>>> miguel.a...@wizeline.com> wrote:
>>>
>>>> Hi, I’m looking at how to implement a reader as a SplittableDoFn and
>>>> I'm having some problems with the initial restriction, specifically, how do
>>>> you set the initial restriction if you don’t know the size of the data?
>>>> The DoFn that I'm working on takes a PCollection of Spanner *ReadOperations
>>>> *and splits the read operation query into a list of *Partitions* to
>>>> query against the database.
>>>> I’m currently setting the *InitialRestriction* to an OffsetRange(0L,
>>>> Long.MAX_VALUE); which is currently giving me this error in unit tests Last
>>>> attempted offset was 0 in range [0, 9223372036854775807), claiming work in
>>>> [1, 9223372036854775807) was not attempted. and it makes sense I
>>>> think, because I am setting up the range to max long value.
>>>> So, if I don't know how many partitions are going to be created until
>>>> it's being processed, how can I set the initial restriction or what initial
>>>> restriction do I need to set?
>>>>
>>>> --
>>>>
>>>> Miguel Angel Anzo Palomo | WIZELINE
>>>>
>>>> Software Engineer
>>>>
>>>> miguel.a...@wizeline.com
>>>>
>>>> Remote Office
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> *This email and its contents (including any attachments) are being sent
>>>> toyou on the condition of confidentiality and may be protected by
>>>> legalprivilege. Access to this email by anyone other than the intended
>>>> recipientis unauthorized. If you are not the intended recipient, please
>>>> immediatelynotify the sender by replying to this message and delete the
>>>> materialimmediately from your system. Any further use, dissemination,
>>>> distributionor reproduction of this email is strictly prohibited. Further,
>>>> norepresentation is made with respect to any content contained in this
>>>> email.*
>>>
>>>
>
> --
>
> Miguel Angel Anzo Palomo | WIZELINE
>
> Software Engineer
>
> miguel.a...@wizeline.com
>
> Remote Office
>
>
>
>
>
>
>
>
> *This email and its contents (including any attachments) are being sent
> toyou on the condition of confidentiality and may be protected by
> legalprivilege. Access to this email by anyone other than the intended
> recipientis unauthorized. If you are not the intended recipient, please
> immediatelynotify the sender by replying to this message and delete the
> materialimmediately from your system. Any further use, dissemination,
> distributionor reproduction of this email is strictly prohibited. Further,
> norepresentation is made with respect to any content contained in this
> email.*


Re: Question about SplittableDoFn

2021-05-18 Thread Boyuan Zhang
Would you like to share your draft code? Iterating on the code might be
easier to figure out the issue.

On Tue, May 18, 2021 at 3:28 PM Robert Burke  wrote:

> IIRC the Initial Restrictions method gives you an element and you return
> the restrictions relative to that element.
>
> It's entirely appropriate to stat files or query databases in order to
> determine the initial restrictions and partitions of the data.
>
>
> On Tue, May 18, 2021, 3:21 PM Miguel Anzo Palomo 
> wrote:
>
>> Hi, I’m looking at how to implement a reader as a SplittableDoFn and I'm
>> having some problems with the initial restriction, specifically, how do you
>> set the initial restriction if you don’t know the size of the data?
>> The DoFn that I'm working on takes a PCollection of Spanner *ReadOperations
>> *and splits the read operation query into a list of *Partitions* to
>> query against the database.
>> I’m currently setting the *InitialRestriction* to an OffsetRange(0L,
>> Long.MAX_VALUE); which is currently giving me this error in unit tests Last
>> attempted offset was 0 in range [0, 9223372036854775807), claiming work in
>> [1, 9223372036854775807) was not attempted. and it makes sense I think,
>> because I am setting up the range to max long value.
>> So, if I don't know how many partitions are going to be created until
>> it's being processed, how can I set the initial restriction or what initial
>> restriction do I need to set?
>>
>> --
>>
>> Miguel Angel Anzo Palomo | WIZELINE
>>
>> Software Engineer
>>
>> miguel.a...@wizeline.com
>>
>> Remote Office
>>
>>
>>
>>
>>
>>
>>
>>
>> *This email and its contents (including any attachments) are being sent
>> toyou on the condition of confidentiality and may be protected by
>> legalprivilege. Access to this email by anyone other than the intended
>> recipientis unauthorized. If you are not the intended recipient, please
>> immediatelynotify the sender by replying to this message and delete the
>> materialimmediately from your system. Any further use, dissemination,
>> distributionor reproduction of this email is strictly prohibited. Further,
>> norepresentation is made with respect to any content contained in this
>> email.*
>
>


Re: Extremely Slow DirectRunner

2021-05-12 Thread Boyuan Zhang
Hi Evan,

It seems like the slow step is not the read that use_deprecated_read
targets for. Would you like to share your pipeline code if possible?

On Wed, May 12, 2021 at 1:35 PM Evan Galpin  wrote:

> I just tried with v2.29.0 and use_deprecated_read but unfortunately I
> observed slow behavior again. Is it possible that use_deprecated_read is
> broken in 2.29.0 as well?
>
> Thanks,
> Evan
>
> On Wed, May 12, 2021 at 3:21 PM Steve Niemitz  wrote:
>
>> oops sorry I was off by 10...I meant 2.29 not 2.19.
>>
>> On Wed, May 12, 2021 at 2:55 PM Evan Galpin 
>> wrote:
>>
>>> Thanks for the link/info. v2.19.0 and v2.21.0 did exhibit the "faster"
>>> behavior, as did v2.23.0. But that "fast" behavior stopped at v2.25.0 (for
>>> my use case at least) regardless of use_deprecated_read setting.
>>>
>>> Thanks,
>>> Evan
>>>
>>>
>>> On Wed, May 12, 2021 at 2:47 PM Steve Niemitz 
>>> wrote:
>>>
>>>> use_deprecated_read was broken in 2.19 on the direct runner and didn't
>>>> do anything. [1]  I don't think the fix is in 2.20 either, but will be in
>>>> 2.21.
>>>>
>>>> [1] https://github.com/apache/beam/pull/14469
>>>>
>>>> On Wed, May 12, 2021 at 1:41 PM Evan Galpin 
>>>> wrote:
>>>>
>>>>> I forgot to also mention that in all tests I was setting
>>>>> --experiments=use_deprecated_read
>>>>>
>>>>> Thanks,
>>>>> Evan
>>>>>
>>>>> On Wed, May 12, 2021 at 1:39 PM Evan Galpin 
>>>>> wrote:
>>>>>
>>>>>> Hmm, I think I spoke too soon. I'm still seeing an issue of overall
>>>>>> DirectRunner slowness, not just pubsub. I have a pipeline like so:
>>>>>>
>>>>>> Read pubsub  |  extract GCS glob patterns  |  FileIO.matchAll()  |
>>>>>> FileIO.readMatches()  |  Read file contents  |  etc
>>>>>>
>>>>>> I have temporarily set up a transform between each step to log what's
>>>>>> going on and illustrate timing issues.  I ran a series of tests changing
>>>>>> only the SDK version each time since I hadn't noticed this performance
>>>>>> issue with 2.19.0 (effectively git-bisect). Before each test, I seeded 
>>>>>> the
>>>>>> pubsub subscription with the exact same contents.
>>>>>>
>>>>>> SDK version 2.25.0 (I had a build issue with 2.24.0 that I couldn't
>>>>>> seem to resolve) and onward show a significant slowdown.
>>>>>>
>>>>>> Here is a snippet of logging from v2.25.0:
>>>>>>
>>>>>> *May 12, 2021 11:11:52 A.M.* com.myOrg.myPipeline.PipelineLeg$1
>>>>>> processElement
>>>>>> INFO: Got file pattern: gs://my-bucket/my-dir/5004728247517184/**
>>>>>> May 12, 2021 11:16:59 A.M.
>>>>>> org.apache.beam.sdk.io.FileIO$MatchAll$MatchFn process
>>>>>> INFO: Matched 2 files for pattern
>>>>>> gs://my-bucket/my-dir/5004728247517184/**
>>>>>> May 12, 2021 11:23:32 A.M. com.myOrg.myPipeline.PipelineLeg$3
>>>>>> processElement
>>>>>> INFO: Got ReadableFile: my-file1.json
>>>>>> May 12, 2021 11:23:32 A.M. com.myOrg.myPipeline.PipelineLeg$3
>>>>>> processElement
>>>>>> INFO: Got ReadableFile: my-file2.json
>>>>>> May 12, 2021 11:24:35 A.M. com.myOrg.myPipeline.PipelineLeg$4
>>>>>> processElement
>>>>>> INFO: Got file contents for document_id my-file1.json
>>>>>> *May 12, 2021 11:24:35 A.M*. com.myOrg.myPipeline.PipelineLeg$4
>>>>>> processElement
>>>>>> INFO: Got file contents for document_id my-file2.json
>>>>>>
>>>>>> Note that end-to-end, these steps took about *13 minutes*. With SDK
>>>>>> 2.23.0 and identical user code, the same section of the pipeline took *2
>>>>>> seconds*:
>>>>>>
>>>>>> *May 12, 2021 11:03:39 A.M.* com.myOrg.myPipeline.PipelineLeg$1
>>>>>> processElement
>>>>>> INFO: Got file pattern: gs://my-bucket/my-dir/5004728247517184/**
>>>>>> May 12, 2021 11:03:40 A.M.
>>>>>> org.apache.beam.sdk.io.FileIO$MatchAll$MatchFn process
>>>>>> INFO: Matched 2 files

Re: [DISCUSS] Warn when KafkaIO is used as a bounded source

2021-05-10 Thread Boyuan Zhang
Just added more details on BEAM-6466
.  In short, BEAM-6466
 looks more like a FR
instead of a bug to me.

On Fri, Apr 30, 2021 at 12:48 PM Pablo Estrada  wrote:

> I suppose a production-ready bounded KafkaIO may fetch until reaching the
> end of each partition(?), or receive a final offset for each partition?
>
> Let's definitely add the warning.
> Best
> -P.
>
> On Fri, Apr 30, 2021 at 11:33 AM Brian Hulette 
> wrote:
>
>> I guess that is the question. [2] and [3] above make me think that this
>> is experimental and just not labeled as such.
>>
>> It doesn't seem reasonable to have both an open feature request for
>> bounded KafkaIO (BEAM-2185), and a bug report regarding bounded KafkaIO
>> (BEAM-6466).
>>
>> On Fri, Apr 30, 2021 at 11:26 AM Pablo Estrada 
>> wrote:
>>
>>> Are they experimental? I suppose this is a valid use case, right? I am
>>> in favor of adding a warning, but I don't know if I would call them
>>> experimental.
>>>
>>> I suppose a repeated-batch use case may do this repeatedly (though then
>>> users would need to recover the latest offsets for each partition, which I
>>> guess is not possible at the moment?)
>>>
>>> On Thu, Apr 29, 2021 at 4:17 PM Brian Hulette 
>>> wrote:
>>>
 Our oldest open P1 issue is BEAM-6466 - "KafkaIO doesn't commit offsets
 while being used as bounded source" [1]. I'm not sure this is an actual
 issue since KafkaIO doesn't seem to officially support this use-case. The
 relevant parameters indicate they are "mainly used for tests and demo
 applications" [2], and BEAM-2185 - "KafkaIO bounded source" [3] is still
 open.

 I think we should close out BEAM-6466 by more clearly indicating that
 withMaxReadTime() and withMaxRecords() are experimental, and/or logging a
 warning when they are used.

 I'm happy to make such a change, but I wanted to check if there are any
 objections to this first.

 Thanks,
 Brian

 [1] https://issues.apache.org/jira/browse/BEAM-6466
 [2]
 https://github.com/apache/beam/blob/3d4db26cfa4ace0a0f2fbb602f422fe30670c35f/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L960
 [3] https://issues.apache.org/jira/browse/BEAM-2185

>>>


Re: Extremely Slow DirectRunner

2021-05-10 Thread Boyuan Zhang
Hi Evan,

What do you mean startup delay? Is it the time that from you start the
pipeline to the time that you notice the first output record from PubSub?

On Sat, May 8, 2021 at 12:50 AM Ismaël Mejía  wrote:

> Can you try running direct runner with the option
> `--experiments=use_deprecated_read`
>
> Seems like an instance of
> https://issues.apache.org/jira/browse/BEAM-10670?focusedCommentId=17316858=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-17316858
> also reported in
> https://lists.apache.org/thread.html/re6b0941a8b4951293a0327ce9b25e607cafd6e45b69783f65290edee%40%3Cdev.beam.apache.org%3E
>
> We should rollback using the SDF wrapper by default because of the
> usability and performance issues reported.
>
>
> On Sat, May 8, 2021 at 12:57 AM Evan Galpin  wrote:
>
>> Hi all,
>>
>> I’m experiencing very slow performance and startup delay when testing a
>> pipeline locally. I’m reading data from a Google PubSub subscription as the
>> data source, and before each pipeline execution I ensure that data is
>> present in the subscription (readable from GCP console).
>>
>> I’m seeing startup delay on the order of minutes with DirectRunner (5-10
>> min). Is that expected? I did find a Jira ticket[1] that at first seemed
>> related, but I think it has more to do with BQ than DirectRunner.
>>
>> I’ve run the pipeline with a debugger connected and confirmed that it’s
>> minutes before the first DoFn in my pipeline receives any data. Is there a
>> way I can profile the direct runner to see what it’s churning on?
>>
>> Thanks,
>> Evan
>>
>> [1]
>> https://issues.apache.org/jira/plugins/servlet/mobile#issue/BEAM-4548
>>
>


Re: Window Assignment Across SplittableDoFn

2021-05-05 Thread Boyuan Zhang
Hi,
Yes, just like normal DoFn, Splittable DoFn preserves the window
information as well.

On Wed, May 5, 2021 at 8:04 PM Evan Galpin  wrote:

> Hi folks,
>
> I’d just like to confirm what happens to window assignments through a
> SplittableDoFn. Are output elements automatically assigned to the same
> window as input elements?
>
> Thanks,
> Evan
>


Re: Question about transformOverride

2021-04-20 Thread Boyuan Zhang
+1 to use pipeline options.

 Alternatively, you can also change your KafkaReadTransform to perform
different expansion(override expand()) based on your pipeline options.

On Tue, Apr 20, 2021 at 9:51 PM Reuven Lax  wrote:

> It would be simpler to create a custom pipeline option, and swap out the
> read transform in your code. For example
>
> PCollection pc;
> if (options.getLocalTest()) {
>   pc = pipeline.apply(new ReadFromLocalFile());
> } else {
>   pc = pipeline.apply(new KafkaReadTrasnform());
> }
>
> pc.apply(/* rest of pipeline */);
>
> On Tue, Apr 20, 2021 at 9:41 PM Yuhong Cheng 
> wrote:
>
>> We want to support transform override when doing tests locally.  For
>> example, in real pipelines, we read from Kafka, but when doing tests
>> locally, we want to read from a local file to help test whether the
>> pipeline works fine. So we want to override the Kafka read transform
>> directly instead of writing the pipeline twice.
>>
>> code example:
>>
>> public Pipeline createPipeline(Pipeline pipeline) {
>>
>>pipeline.apply(new KafkaReadTransform()).apply(// other functions..);
>> }
>> In test, we will use the same createPipeline() function to create a
>> pipeline but meanwhile we want to override KafkaReadTransform with another
>> transform to avoid reading from Kafka.
>>
>> Thanks,
>> Yuhong
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> On Tue, Apr 20, 2021 at 9:02 PM Chamikara Jayalath 
>> wrote:
>>
>>> In general, TransformOverrides are expected to be per-runner
>>> implementation details and are not expected to be directly used by
>>> end-users.
>>> What is the exact use-case you are trying to achieve ? Are you running
>>> into a missing feature of an existing transform ?
>>>
>>> Thanks,
>>> Cham
>>>
>>> On Tue, Apr 20, 2021 at 5:58 PM Yuhong Cheng 
>>> wrote:
>>>
 Hi Beam,
 We have a use case when creating a pipeline, we want to replace the IO
 read/write transform when testing using `pipeline.replaceAll(overrides)`.
 However, we met some problems when doing tests:
 1. Are there any ways we can avoid calling expand() of a transform when
 it is going to be replaced?  The reason we want to override a transform is
 because that the expand() of this transform is somehow not available in
 some situations. It seems not reasonable enough to call the expand() of the
 originalTransform and then call the expand() of the overrideTransform
 again?
 2. When trying to implement `PTransformOverrideFactory`, we realize
 that the inputs are `TaggedPValue`, which can only make {Tuple,
 PCollection} pairs. Then if we want to override a write transform whose
 output type is `PDone`, what's the best way to implement this factory?

 Thanks in advance for answers! This is quite important to our pipelines.

 Thanks,
 Yuhong

>>>


Re: BEAM-3415 JUnit5 support

2021-03-22 Thread Boyuan Zhang
Hi Emils,

Thanks for your contribution. Luke is on vacation till June. I'm adding
some java folks into your PR, who might be able to help review your PR as
well.

On Mon, Mar 22, 2021 at 9:17 AM Emils Solmanis 
wrote:

> Hi all,
>
> The JUnit5 (BEAM-3415 )
> has been open for quite some time, and it's been of interest to me and our
> organisation as well, since Beam is the only thing keeping JUnit4 around
> for us
>
> I took a stab at it and made a PR (#14272
> ), I'd like to think it's
> clean and doesn't interfere with much else, the only outside observable
> change would be opening up the TestPipeline interface a bit (protected
> constructor to allow inheritance). The build is green minus 1 Jenkins job
> that timed out and I can't restart.
>
> The original ticket claims it's "a lot of work", but it didn't end up
> being that, I suspect JUnit5 has evolved a bit in the 2-3 years since the
> original issue.
>
> The OWNERS file on the 'core' module also only lists Luke as the sole
> maintainer, I've tagged him in the PR, but have no idea what the timeline
> expectations are. It's been 4 days now, if we're saying the turnaround is a
> couple weeks it's not urgent, but I also appreciate the OWNERS files
> sometimes get stale, so is it the case that @lukecwik is the only
> maintainer?
>
> Best,
> Emil
>


Re: User-related questions in dev@ list

2021-03-10 Thread Boyuan Zhang
I subscribed to both user@ and dev@. It seems to me that we have more
active folks who have knowledge to offer help on the dev@ than the user@.
It's always good to have dev@ and user@ for different usages as long as dev@
and user@ are almost equivalent helpful for dev/users to look for help.

On Wed, Mar 10, 2021 at 1:14 PM Onur Ozer  wrote:

> One of the sample mails belongs to me, sorry for that. I thought the dev
> list was a better place. Will ask similar to the other list as well.
>
> On Wed, Mar 10, 2021 at 12:13 PM Steve Niemitz 
> wrote:
>
>> As a frequent emailer of dev@, I'll admit that it's often very difficult
>> to figure out if I should be emailing user@ or dev@, and typically just
>> chose dev@ because it seems more likely to get an answer there.  Having
>> clearer guidelines around what is a "dev" topic would be very useful to
>> better guide people towards the correct list.
>>
>> An example here was my recent email about schemas. [1]  Should this have
>> gone to users@?  I count myself as a "developer" so I feel like it fits
>> into "developer and contributor discussions", but I can certainly also see
>> how it would fit into "general discussions" for users@ as well.
>>
>> [1]
>> https://lists.apache.org/thread.html/r881ab4d0ccbc7dc2e8c478f9b68b18b313f3740b419fdf7e91a17a83%40%3Cdev.beam.apache.org%3E
>>
>> On Wed, Mar 10, 2021 at 2:52 PM Ahmet Altay  wrote:
>>
>>>
>>>
>>> On Wed, Mar 10, 2021 at 11:16 AM Alexey Romanenko <
>>> aromanenko@gmail.com> wrote:
>>>


 What do you think should be the right behaviour for managing such
> emails? Forward this email to user@ (and remove dev@ address from
> copy) and ask politely to continue a discussion there? I tried it several
> times but sometimes it happened that discussion was "forked” and continued
> in two different lists which is even worse, imho.


 I like your proposal but I do share the same concern of forked threads.
 One suggestion, instead of forking the thread we can ask users to ask on
 user@ list next time and still answer the question in the original
 thread. Hopefully that can reinforce good habits over time.


 Agree with asking and not to fork, since it usually won’t help.

 Anything else? What do you believe should work better in such cases
> (maybe some experience for other projects)?


 I wonder if there is a reason for people to ask on dev@ instead of
 user@? Web site instructions look pretty clear to me. There is a good
 amount of activity and engagement on user@ list as well. I am not sure
 about why users pick one list over another.


 Maybe we need to make it even more clear on web page that dev@ list is
 _only_ for dev-related questions, that are supposed to have any
 relationship with project development in any sense (new features/
 infrastructure/ bugs/ testing/ documentation/ etc) and provide some
 examples for both of the lists?

>>>
>>> +1 this makes sense to me. And reading the website again "review
>>> proposed design ideas on dev@" might imply that you can bring your
>>> design ideas about your own use cases/issues to the dev list.
>>>
>>>
>>


Re: Java Postcommit: MapClassIntegrationIT has been failing

2021-03-04 Thread Boyuan Zhang
+Reuven Lax 
Reuven, do you have some insights on why the test is failing?

On Thu, Mar 4, 2021 at 12:28 PM Tomo Suzuki  wrote:

> Hi Beam developers,
>
> MapClassIntegrationIT in the Java Postcommit job has been failing since
> Feb 27th (5 days ago).
>
> https://ci-beam.apache.org/job/beam_PostCommit_Java/7269/testReport/org.apache.beam.examples.cookbook/MapClassIntegrationIT/testDataflowMapState/history/
>
> I checked the logs (see the ticket below) but could not identify error.
> The underlying Dataflow job seem to become unresponsive.
>
> Created a ticket: https://issues.apache.org/jira/browse/BEAM-11922
>
> Does anyone know how to troubleshoot this issue? (I think we need to read
> the Dataflow log via Google Cloud console)
>
> --
> Regards,
> Tomo
>


Re: Issue when FnApiDoFnRunner executes Read.BoundedSourceAsSDFWrapperFn

2021-02-04 Thread Boyuan Zhang
That's great. Thanks for pushing forward this effort : )

On Thu, Feb 4, 2021 at 3:29 PM Ke Wu  wrote:

> Thank you Boyuan for the explanation! This explains why it did not work
> since Samza does not wire in SamzaPipelineRunner when executing in portable
> mode yet.
>
> I will create a ticket to update Samza runner.
>
> Best,
> Ke
>
> On Feb 4, 2021, at 12:07 PM, Boyuan Zhang  wrote:
>
> Hi Ke,
>
>  is it expected that Create.of will be expanded to a SDF
>
> In Java SDK, Create.of will be expanded into CreateSource, which will be
> wrapped into SDF implementation.
>
>  with regular pardo:v1 urn?
>
> No, the runner should run SplittableParDoExpander[1] to expand SDF
> into SPLITTABLE_PAIR_WITH_RESTRICTION_URN, 
> SPLITTABLE_SPLIT_AND_SIZE_RESTRICTIONS_URN
> and SPLITTABLE_PROCESS_SIZED_ELEMENTS_AND_RESTRICTIONS_URN.
>
> I do see that SamzaPipelineRunner running the expansion[2]. Can you double
> check whether your job invokes that code path?
> [1]
> https://github.com/apache/beam/blob/master/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/SplittableParDoExpander.java
> [2]
> https://github.com/apache/beam/blob/master/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaPipelineRunner.java#L42-L47
>
>
> On Thu, Feb 4, 2021 at 11:31 AM Ke Wu  wrote:
>
>> Hello Beamers,
>>
>> I am trying out a simple pipeline to be executed on PortableRunner:
>>
>> 
>> PortablePipelineOptions options =
>> PipelineOptionsFactory.fromArgs(args).as(PortablePipelineOptions.class);
>> options.setJobEndpoint(some_url);
>> options.setDefaultEnvironmentType("LOOPBACK");
>> options.setRunner(PortableRunner.class);
>>
>> Pipeline pipeline = Pipeline.create(options);
>>
>> pipeline
>> .apply(Create.of("1", "2", "3”))
>> .apply(…print to console...);
>>
>> pipeline.run()
>> ```
>>
>> This pipeline works with runners such as SamzaRunner, however, when in
>> portable mode, it does not work.
>>
>> I did some debugging and it turns out that it failed because
>> when Read.BoundedSourceAsSDFWrapperFn processElement(), the corresponding
>> RestrictionTracke is null. This seems to be caused the expanded SDF
>> transform has urn of "beam:transform:pardo:v1”, in which case
>> FnApiDoFnRunner is created with
>>
>> ```
>> mainInputConsumer = this::processElementForParDo;
>> ```
>>
>> which does not create tracker at all. I do see the other processing
>> method such as
>>
>>- processElementForSplitRestriction()
>>- processElementForWindowObservingSplitRestriction()
>>- processElementForTruncateRestriction()
>>
>>
>> etc are creating trackers properly before invoking DoFn, however, they
>> are requiring a different Urn for the Transform.
>>
>> My questions here are, did I miss anything? is it expected that Create.of
>> will be expanded to a SDF with regular pardo:v1 urn? If Yes, then what is
>> the expected behavior when FnApiDoFnRunner
>> invokes Read.BoundedSourceAsSDFWrapperFn?
>>
>> Best,
>> Ke
>>
>>
>>
>


Re: Issue when FnApiDoFnRunner executes Read.BoundedSourceAsSDFWrapperFn

2021-02-04 Thread Boyuan Zhang
Hi Ke,

 is it expected that Create.of will be expanded to a SDF

In Java SDK, Create.of will be expanded into CreateSource, which will be
wrapped into SDF implementation.

 with regular pardo:v1 urn?

No, the runner should run SplittableParDoExpander[1] to expand SDF
into SPLITTABLE_PAIR_WITH_RESTRICTION_URN,
SPLITTABLE_SPLIT_AND_SIZE_RESTRICTIONS_URN
and SPLITTABLE_PROCESS_SIZED_ELEMENTS_AND_RESTRICTIONS_URN.

I do see that SamzaPipelineRunner running the expansion[2]. Can you double
check whether your job invokes that code path?
[1]
https://github.com/apache/beam/blob/master/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/SplittableParDoExpander.java
[2]
https://github.com/apache/beam/blob/master/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaPipelineRunner.java#L42-L47


On Thu, Feb 4, 2021 at 11:31 AM Ke Wu  wrote:

> Hello Beamers,
>
> I am trying out a simple pipeline to be executed on PortableRunner:
>
> 
> PortablePipelineOptions options =
> PipelineOptionsFactory.fromArgs(args).as(PortablePipelineOptions.class);
> options.setJobEndpoint(some_url);
> options.setDefaultEnvironmentType("LOOPBACK");
> options.setRunner(PortableRunner.class);
>
> Pipeline pipeline = Pipeline.create(options);
>
> pipeline
> .apply(Create.of("1", "2", "3”))
> .apply(…print to console...);
>
> pipeline.run()
> ```
>
> This pipeline works with runners such as SamzaRunner, however, when in
> portable mode, it does not work.
>
> I did some debugging and it turns out that it failed because
> when Read.BoundedSourceAsSDFWrapperFn processElement(), the corresponding
> RestrictionTracke is null. This seems to be caused the expanded SDF
> transform has urn of "beam:transform:pardo:v1”, in which case
> FnApiDoFnRunner is created with
>
> ```
> mainInputConsumer = this::processElementForParDo;
> ```
>
> which does not create tracker at all. I do see the other processing method
> such as
>
>- processElementForSplitRestriction()
>- processElementForWindowObservingSplitRestriction()
>- processElementForTruncateRestriction()
>
>
> etc are creating trackers properly before invoking DoFn, however, they are
> requiring a different Urn for the Transform.
>
> My questions here are, did I miss anything? is it expected that Create.of
> will be expanded to a SDF with regular pardo:v1 urn? If Yes, then what is
> the expected behavior when FnApiDoFnRunner
> invokes Read.BoundedSourceAsSDFWrapperFn?
>
> Best,
> Ke
>
>
>


[Proposal] Portable OrderedListState

2021-02-03 Thread Boyuan Zhang
Hi team,

I'm working on supporting OrderedListState over fnapi and I'm starting the
design proposal here: doc

.

This doc focuses on fnapi proto changes and Java SDK harness support.
Please feel free to drop any ideas/concerns/suggestions there. If the
design looks good, I'll start to work on released code changes.

Thanks for your help!


Re: Separating Data from Kafka by Keying Strategy in a Kafka Splittable DoFn

2021-02-01 Thread Boyuan Zhang
Hi Rion,

Thanks for the explanation. I can see the case now. To my knowledge,
Splittable DoFn cannot help on this case and if you want watermark from
sources to be separated, I believe you have to have them in deperated
pipelines. I don't think we support per-key watermark like ifeature n one
pipeline.

On Mon, Feb 1, 2021 at 12:26 PM Rion Williams  wrote:

> Hi again Boyuan,
>
> Close, I believe. I'll describe the scenario a bit more specifically.
> Basically, I have a Kafka topic with 10 partitions and each of these
> contains records for various combinations of tenants and sources that come
> in interspersed across these partitions. This pipeline applies some
> windowing downstream, however I think for that to work properly the
> pipelines would need to be segregated in some fashion so data coming in for
> one tenant or source doesn't interfere with windowing for another.
>
> The pipeline itself looks like this:
>
> val pipeline = Pipeline.create(options)
>
> // Partition Events according to their data sources
> val partitionedEvents = pipeline
> .apply("Read Events from Kafka",
> KafkaIO
> .read()
> .withBootstrapServers(options.brokerUrl)
> .withTopic(options.logsTopic)
> .withKeyDeserializer(StringDeserializer::class.java)
> .withValueDeserializerAndCoder(
> SpecificAvroDeserializer()::class.java,
> AvroCoder.of(Log::class.java)
> )
> .withReadCommitted()
> .commitOffsetsInFinalize()
> .withTimestampPolicyFactory { _, previousWatermark -> 
> WatermarkPolicy(previousWatermark) }
> .withConsumerConfigUpdates(
> ImmutableMap.of(
> ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest",
> ConsumerConfig.GROUP_ID_CONFIG, "log-processor-pipeline",
> "schema.registry.url", options.schemaRegistryUrl
> )
> ).withoutMetadata()
> )
> .apply("Log Events", ParDo.of(Logs.log()))
> .apply("Rekey Logs by Tenant", ParDo.of(Logs.key()))
> .apply("Partition Logs by Source", Partition.of(sources.size, 
> Events.partition>(sources)))
>
> dataSources.forEach { dataSource ->
> // Store a reference to the data source name to avoid serialization issues
> val sourceName = dataSource.name
>
> // Apply source-specific windowing strategies
> partitionedLogs[dataSource.partition]
> .apply("Building Windows for $sourceName", 
> SourceSpecificWindow.of>(dataSource))
> .apply("Group Windowed Logs by Key for $sourceName", 
> GroupByKey.create())
> .apply("Log After Windowing for $sourceName", 
> ParDo.of(Logs.logAfterWindowing()))
> .apply(
> "Writing Windowed Logs to Files for $sourceName",
> FileIO.writeDynamic>>()
> .withNumShards(1)
> .by { row -> "${row.key}/${sourceName}" }
> .withDestinationCoder(StringUtf8Coder.of())
> .via(Contextful.fn(SerializableFunction { logs -> 
> Files.stringify(logs.value) }), TextIO.sink())
> .to(options.output)
> .withNaming { partition -> Files.name(partition)}
> )
> }
>
> pipeline.run().waitUntilFinish()
>
> Sorry - I know that's a lot, but in a nutshell I'm attempting to:
>
>- Read from a multi-tenant/source topic (10 partitions)
>- Partition those events by source
>- Window events according to their defined source (according to
>event-time fields within the records)
>- Write out files on windows closing to the appropriate tenant/source
>directory
>
> At present, it seems that because the WatermarkPolicy is only capable of
> keeping a separate watermark per partition and since it is using an
> event-time property to handle that, that multiple tenants/source
> combinations could impact others, cause windows to close unexpected/early,
> data to be missed, etc. This is why I believe that perhaps a SDF that was
> evaluated prior to reading from Kafka could allow me to treat each of these
> tenant-source pairs as separate pipelines without a major architectural
> overhaul.
>
> Is this something that an SDF might excel at or is there some other
> mechanism that I might consider to accomplish this?
>
>
>
> On Mon, Feb 1, 2021 at 1:09 PM Boyuan Zhang  wrote:
>
>> Hi Rion,
>>
>> Let's say that you have topic with 3 partitions and what you want to do
>> is to read from these 3 partitions and each partiti

Re: Beam support Flink Async I/O operator

2021-01-26 Thread Boyuan Zhang
+dev 

On Tue, Jan 26, 2021 at 1:07 PM Eleanore Jin  wrote:

> Hi community,
>
> Does Beam support Flink Async I/O operator? if so, can you please share
> the doc, and if not, is there any workaround to achieve the same in Beam
> semantics?
>
>
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/asyncio.html
>
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=65870673
>
> Thanks a lot!
> Eleanore
>


Re: outputReceiver.output() does not emit the result immediately

2021-01-26 Thread Boyuan Zhang
+dev 

Hi Yu,
Which runner are you using for your pipeline? Also it would be helpful to
share your pipeline code as well.

On Mon, Jan 25, 2021 at 10:19 PM  wrote:

> Hi Beam Community,
>
> I have a splittable `DoFn` that reads message from some stream and output
> the result to down stream. The pseudo code looks like:
>
> @DoFn.ProcessElement
> public DoFn.ProcessContinuation processElement(@DoFn.Element SourceDescriptor 
> sourceDescriptor,
>
> RestrictionTracker tracker,
>WatermarkEstimator 
> watermarkEstimator,
>DoFn.OutputReceiver 
> receiver) throws Exception {
> while(true){
> messages = getMessageFromStream();
> if (messages.isEmpty()) {
> return DoFn.ProcessContinuation.resume();
> }
> for(message: messages){
> if (!tracker.tryClaim(message)) {
> return DoFn.ProcessContinuation.stop();
> }
> record = Record(message);
> receiver.outputWithTimestamp(record, message.getTimestamp);
> }
> }
> }
>
>
> I expected to see the output in downstream immediately, but the results
> are grouped into batch (4, 5 output) and emitted to down stream. Is this
> size configurable in `DoFn` or runner?
>
> Thanks for any answer,
> Yu
>
>
>
>


Re: [ANNOUNCE] New PMC Member: Chamikara Jayalath

2021-01-22 Thread Boyuan Zhang
Congrats Cham!

On Fri, Jan 22, 2021 at 9:42 AM Yichi Zhang  wrote:

> Congrats Cham!
>
> On Fri, Jan 22, 2021 at 8:34 AM Alexey Romanenko 
> wrote:
>
>> Congrats, Cham! Thank you for your work!
>>
>> On 22 Jan 2021, at 09:53, Gleb Kanterov  wrote:
>>
>> Congratulations!
>>
>> On Fri, Jan 22, 2021 at 9:29 AM Ismaël Mejía  wrote:
>>
>>> Congrats Cham, well deserved!
>>>
>>>
>>> On Fri, Jan 22, 2021 at 9:02 AM Michał Walenia <
>>> michal.wale...@polidea.com> wrote:
>>>
 Congratulations, Cham! Thanks for your work!


 On Fri, Jan 22, 2021 at 3:13 AM Charles Chen  wrote:

> Congrats Cham!
>
> On Thu, Jan 21, 2021, 5:39 PM Chamikara Jayalath 
> wrote:
>
>> Thanks everybody :)
>>
>> - Cham
>>
>> On Thu, Jan 21, 2021 at 5:22 PM Pablo Estrada 
>> wrote:
>>
>>> Yoohoo Cham : )
>>>
>>> On Thu, Jan 21, 2021 at 5:20 PM Udi Meiri  wrote:
>>>
 Congrats Cham!

 On Thu, Jan 21, 2021 at 4:25 PM Griselda Cuevas 
 wrote:

> Congratulations Cham!!! Well deserved :)
>
> On Thu, 21 Jan 2021 at 15:23, Connell O'Callaghan <
> conne...@google.com> wrote:
>
>> Well done Cham!!! Thank you for all your contributions to date!!!
>>
>>
>> On Thu, Jan 21, 2021 at 3:18 PM Rui Wang 
>> wrote:
>>
>>> Congratulations, Cham!
>>>
>>> -Rui
>>>
>>> On Thu, Jan 21, 2021 at 3:15 PM Robert Bradshaw <
>>> rober...@google.com> wrote:
>>>
 Congratulations, Cham!

 On Thu, Jan 21, 2021 at 3:13 PM Brian Hulette <
 bhule...@google.com> wrote:

> Great news, congratulations Cham!
>
> On Thu, Jan 21, 2021 at 3:08 PM Robin Qiu 
> wrote:
>
>> Congratulations, Cham!
>>
>> On Thu, Jan 21, 2021 at 3:05 PM Tyson Hamilton <
>> tyso...@google.com> wrote:
>>
>>> Woo! Congrats Cham!
>>>
>>> On Thu, Jan 21, 2021 at 3:02 PM Robert Burke <
>>> rob...@frantil.com> wrote:
>>>
 Congratulations! That's fantastic news.

 On Thu, Jan 21, 2021, 2:59 PM Reza Rokni 
 wrote:

> Congratulations!
>
> On Fri, Jan 22, 2021 at 6:58 AM Ankur Goenka <
> goe...@google.com> wrote:
>
>> Congrats Cham!
>>
>> On Thu, Jan 21, 2021 at 2:57 PM Ahmet Altay <
>> al...@google.com> wrote:
>>
>>> Hi all,
>>>
>>> Please join me and the rest of Beam PMC in welcoming
>>> Chamikara Jayalath as our
>>> newest PMC member.
>>>
>>> Cham has been part of the Beam community from its early
>>> days and contributed to the project in significant ways, 
>>> including
>>> contributing new features and improvements especially 
>>> related Beam IOs,
>>> advocating for users, and mentoring new community members.
>>>
>>> Congratulations Cham! And thanks for being a part of
>>> Beam!
>>>
>>> Ahmet
>>>
>>

 --
 Michał Walenia
 Polidea  | Software Engineer
 M: +48 791 432 002 <+48791432002>
 E: michal.wale...@polidea.com
 Unique Tech
 Check out our projects! 

>>>
>>


Re: [ANNOUNCE] New committer: Piotr Szuberski

2021-01-22 Thread Boyuan Zhang
Congrats Piotr!

On Fri, Jan 22, 2021 at 10:47 AM Yichi Zhang  wrote:

> Congrats Piotrek!
>
> On Fri, Jan 22, 2021 at 10:02 AM Robert Burke  wrote:
>
>> Congrats Piotr!
>>
>> On Fri, Jan 22, 2021, 10:00 AM Tobiasz Kędzierski <
>> tobiasz.kedzier...@polidea.com> wrote:
>>
>>> Congrats Piotrek!
>>>
>>> On Fri, Jan 22, 2021 at 6:54 PM Brian Hulette 
>>> wrote:
>>>
 Congratulations Piotr, and thank you for all your contributions!

 On Fri, Jan 22, 2021 at 9:45 AM Robert Bradshaw 
 wrote:

> Thanks, Piotr. Well deserved.
>
> On Fri, Jan 22, 2021 at 9:22 AM Tyson Hamilton 
> wrote:
>
>> Congrats Piotr! Well deserved.
>>
>> On Fri, Jan 22, 2021 at 9:18 AM Ismaël Mejía 
>> wrote:
>>
>>> Congratulations Piotr ! Thanks for all your work !
>>>
>>> On Fri, Jan 22, 2021 at 5:33 PM Alexey Romanenko
>>>  wrote:
>>> >
>>> > Hi everyone,
>>> >
>>> > Please join me and the rest of the Beam PMC in welcoming a new
>>> committer: Piotr Szuberski .
>>> >
>>> > Piotr started to contribute to Beam about one year ago and he did
>>> it very actively since then. He contributed to the different areas, like
>>> adding a cross-language functionality to existing IOs, improving ITs and
>>> performance tests environment/runtime, he actively worked on dependency
>>> updates [1].
>>> >
>>> > In consideration of his contributions, the Beam PMC trusts him
>>> with the responsibilities of a Beam committer [2].
>>> >
>>> > Thank you for your contributions, Piotr!
>>> >
>>> > -Alexey, on behalf of the Apache Beam PMC
>>> >
>>> > [1]
>>> https://github.com/apache/beam/pulls?q=is%3Apr+author%3Apiotr-szuberski
>>> > [2]
>>> https://beam.apache.org/contribute/become-a-committer/#an-apache-beam-committer
>>> >
>>> >
>>>
>>


Re: Support Kafka Commit Transform for Python sdk x-lang

2021-01-20 Thread Boyuan Zhang
I opened https://github.com/apache/beam/pull/13779 for exxposing built-in
timestamp policy and commitOffsetInFinalize to ReadFromKafka.

On Fri, Jan 15, 2021 at 3:09 PM Chamikara Jayalath 
wrote:

>
>
> On Fri, Jan 15, 2021 at 2:55 PM Boyuan Zhang  wrote:
>
>> Re Cham,
>>
>>> Looking at https://github.com/apache/beam/pull/12572 seems like you
>>> just need support for FixedWindows, right ? This should work now I believe
>>> since Dataflow Python multi-language pipelines use portable job submission
>>> by default.
>>
>> The problem part is not the FixedWindows but the Reshuffle. In Java SDK,
>> Reshuffle will be expanded into
>> Window.>into(new
>> IdentityWindowFn<>(originalStrategy.getWindowFn().windowCoder()))
>> which is rejected by the python sdk.
>>
>
> Ah ok. It should not be rejected by Python SDK anymore but  Reshuffle
> ('IdentityWindowFn') indeed will fail when executing the pipeline using
> Dataflow Runner v2 currently.
>
>
>> I can have a simple PR to expose built-in timestamp policies. Brian,
>> would you like to identify how much work would need to output KafkaRecord
>> to python SDK?
>>
>> On Fri, Jan 15, 2021 at 2:44 PM Chamikara Jayalath 
>> wrote:
>>
>>> Thanks for bringing this up Sameer.
>>>
>>> On Fri, Jan 15, 2021 at 1:18 PM Boyuan Zhang  wrote:
>>>
>>>> +Chamikara Jayalath 
>>>>
>>>> Hi Sameer,
>>>>
>>>> Thanks for reaching out!
>>>>
>>>> We will expose *commitOffsetsInFinalize *to py ReadFromKafka transform
>>>> when we have CustomWindow support in python SDK, which should be coming
>>>> soon.
>>>>
>>>
>>> Looking at https://github.com/apache/beam/pull/12572 seems like you
>>> just need support for FixedWindows, right ? This should work now I believe
>>> since Dataflow Python multi-language pipelines use portable job submission
>>> by default.
>>>
>>>
>>>>
>>>> In terms of *TimestampPolicyFactory*,if you are using the built-in
>>>> types, like ProcessingTimePolicy
>>>> <https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/TimestampPolicyFactory.java#L58-L60>,
>>>> LogAppendTimePolicy
>>>> <https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/TimestampPolicyFactory.java#L68-L70>
>>>> and withCreateTime
>>>> <https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/TimestampPolicyFactory.java#L72-L89>,
>>>> it's not hard to expose them to ReadFromKafka.
>>>>
>>>
>>> Yeah, exposing existing policies should not be too hard but defining new
>>> policies (or any other option that requires second order functions)
>>> requires cross-language UDF support which is not available yet.
>>>
>>>
>>>>
>>>> Another interesting topic you have mentioned in
>>>> https://github.com/apache/beam/pull/12572 is that you also want to
>>>> retrieve KafkaRecord from ReadFromKafka instead of bytes. That requires the
>>>> KafkaRecord has the same coder in Python SDK as the coder in Java SDK. It
>>>> might be a future work for x-lang Kafka.
>>>>
>>>
>>> This is because current ReadFromKafka transform exposes
>>> Read.withoutMetaData() transform [1].
>>> I think current Beam Schema support in Python should be adequate to
>>> expand this and support a PCollection that represents a
>>> PCollectioin [2] in Python. +Brian Hulette
>>>  to confirm.
>>>
>>> [1]
>>> https://github.com/apache/beam/blob/release-2.27.0/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L595
>>> [2]
>>> https://github.com/apache/beam/blob/release-2.27.0/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaRecord.java
>>>
>>> Thanks,
>>> Cham
>>>
>>>
>>>>
>>>> On Fri, Jan 15, 2021 at 12:54 PM Sameer Bhadouria <
>>>> sameer.bhadou...@gmail.com> wrote:
>>>>
>>>>> Hello,
>>>>>
>>>>> I am using Beam's Cross language support for reading from Kafka but it
>>>>> is missing some features available in the java sdk that I would like to
>>>>> use. Specifically, I am interested in the Kafka Commit Transform
>>>>&g

Re: Problems with E2E test

2021-01-18 Thread Boyuan Zhang
It does seem like the Dataflow will do some validation around PubSub params
before actually creating the pipeline. That's fair for Dataflow because
Dataflow will swap the PubSubIO from beam implementation into Dataflow
native one.

I think if you really want to run your virtual PubSub with Dataflow, you
need to try out --experiments=enable_custom_pubsub_sink to enforce Dataflow
not to do the override.

Would you like to share your job id thus we can verify the failure. Also
I'm not sure about the motivation to test it against Dataflow. Would you
like to elaborate more on that?


On Mon, Jan 18, 2021 at 3:51 AM Ramazan Yapparov <
ramazan.yappa...@akvelon.com> wrote:

> Hi Beam!
> We've been writing E2E test for KafkaToPubsub example pipeline. Instead of
> depending on some real
> Cloud Pubsub and Kafka instances we decided to use Testcontainers.
> We launch Kafka and PubSub Emulator containers and after that we pass
> containers urls into pipeline options and run the pipeline.
> During PR review we received a request for turning this test into IT so it
> would run in Dataflow Runner
> instead of Direct Runner.
> Trying to do so, we've ran into some troubles with that:
> 1. While running the test all Docker containers start at the machine where
> the test is running,
>so in order for this test to work properly dataflow job should be able
> to reach test-runner machine by a public IP.
>I certainly can't do it on my local machine, not sure how it will
> behave when running in CI environment.
> 2. When we pass our fake PubSub url into the dataflow job we receive
> following error:
> json
> {
>   "code" : 400,
>   "errors" : [ {
> "domain" : "global",
> "message" : "(f214233f9dbe6968): The workflow could not be created.
> Causes: (f214233f9dbe6719): http://localhost:49169 is not a valid Pub/Sub
> URL.",
> "reason" : "badRequest"
>   } ],
>   "message" : "(f214233f9dbe6968): The workflow could not be created.
> Causes: (f214233f9dbe6719): http://localhost:49169 is not a valid Pub/Sub
> URL.",
>   "status" : "INVALID_ARGUMENT"
> }
>
> Not sure how this can be avoided, looks like the job will only accept the
> real Cloud PubSub url.
> It would be great if you share some thoughts or any suggestions how it can
> be solved!
>
>


Re: Support Kafka Commit Transform for Python sdk x-lang

2021-01-15 Thread Boyuan Zhang
Re Cham,

> Looking at https://github.com/apache/beam/pull/12572 seems like you just
> need support for FixedWindows, right ? This should work now I believe since
> Dataflow Python multi-language pipelines use portable job submission by
> default.

The problem part is not the FixedWindows but the Reshuffle. In Java SDK,
Reshuffle will be expanded into
Window.>into(new
IdentityWindowFn<>(originalStrategy.getWindowFn().windowCoder()))
which is rejected by the python sdk.

I can have a simple PR to expose built-in timestamp policies. Brian, would
you like to identify how much work would need to output KafkaRecord to
python SDK?

On Fri, Jan 15, 2021 at 2:44 PM Chamikara Jayalath 
wrote:

> Thanks for bringing this up Sameer.
>
> On Fri, Jan 15, 2021 at 1:18 PM Boyuan Zhang  wrote:
>
>> +Chamikara Jayalath 
>>
>> Hi Sameer,
>>
>> Thanks for reaching out!
>>
>> We will expose *commitOffsetsInFinalize *to py ReadFromKafka transform
>> when we have CustomWindow support in python SDK, which should be coming
>> soon.
>>
>
> Looking at https://github.com/apache/beam/pull/12572 seems like you just
> need support for FixedWindows, right ? This should work now I believe since
> Dataflow Python multi-language pipelines use portable job submission by
> default.
>
>
>>
>> In terms of *TimestampPolicyFactory*,if you are using the built-in
>> types, like ProcessingTimePolicy
>> <https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/TimestampPolicyFactory.java#L58-L60>,
>> LogAppendTimePolicy
>> <https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/TimestampPolicyFactory.java#L68-L70>
>> and withCreateTime
>> <https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/TimestampPolicyFactory.java#L72-L89>,
>> it's not hard to expose them to ReadFromKafka.
>>
>
> Yeah, exposing existing policies should not be too hard but defining new
> policies (or any other option that requires second order functions)
> requires cross-language UDF support which is not available yet.
>
>
>>
>> Another interesting topic you have mentioned in
>> https://github.com/apache/beam/pull/12572 is that you also want to
>> retrieve KafkaRecord from ReadFromKafka instead of bytes. That requires the
>> KafkaRecord has the same coder in Python SDK as the coder in Java SDK. It
>> might be a future work for x-lang Kafka.
>>
>
> This is because current ReadFromKafka transform exposes
> Read.withoutMetaData() transform [1].
> I think current Beam Schema support in Python should be adequate to expand
> this and support a PCollection that represents a
> PCollectioin [2] in Python. +Brian Hulette
>  to confirm.
>
> [1]
> https://github.com/apache/beam/blob/release-2.27.0/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L595
> [2]
> https://github.com/apache/beam/blob/release-2.27.0/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaRecord.java
>
> Thanks,
> Cham
>
>
>>
>> On Fri, Jan 15, 2021 at 12:54 PM Sameer Bhadouria <
>> sameer.bhadou...@gmail.com> wrote:
>>
>>> Hello,
>>>
>>> I am using Beam's Cross language support for reading from Kafka but it
>>> is missing some features available in the java sdk that I would like to
>>> use. Specifically, I am interested in the Kafka Commit Transform
>>> <https://github.com/apache/beam/pull/12572> feature in java sdk. This
>>> will also require being able to specify if the metadata is needed or not as
>>> part of the KafkaIO transform creation. In addition, the 
>>> `commitOffsetsInFinalize`
>>> and `TimestampPolicyFactory`parameters are also missing from the python
>>> wrapper.
>>>
>>> My use case is as follows:
>>> I have Kafka topics that have data corresponding to the Mongo change
>>> streams produced by the Mongo Source Kafka connector. In my pipeline, I
>>> read these updated mongo documents, apply some transformations and stream
>>> them to BigQuery.
>>>
>>> Currently, I can only use the `auto.commit` from Kafka consumer config
>>> and I believe the message is acked/offset committed after the consumer in
>>> KafkaIO finishes reading them. If there are any errors in later stages of
>>> my pipeline or if the pipeline is restarted and it can't be drained
>>> gracefully, I will lose the already acked messages. Hence, I want to commit
>>> the offsets only after they are succes

Re: Support Kafka Commit Transform for Python sdk x-lang

2021-01-15 Thread Boyuan Zhang
+Chamikara Jayalath 

Hi Sameer,

Thanks for reaching out!

We will expose *commitOffsetsInFinalize *to py ReadFromKafka transform when
we have CustomWindow support in python SDK, which should be coming soon.

In terms of *TimestampPolicyFactory*,if you are using the built-in types,
like ProcessingTimePolicy
,
LogAppendTimePolicy

and withCreateTime
,
it's not hard to expose them to ReadFromKafka.

Another interesting topic you have mentioned in
https://github.com/apache/beam/pull/12572 is that you also want to retrieve
KafkaRecord from ReadFromKafka instead of bytes. That requires the
KafkaRecord has the same coder in Python SDK as the coder in Java SDK. It
might be a future work for x-lang Kafka.

On Fri, Jan 15, 2021 at 12:54 PM Sameer Bhadouria <
sameer.bhadou...@gmail.com> wrote:

> Hello,
>
> I am using Beam's Cross language support for reading from Kafka but it is
> missing some features available in the java sdk that I would like to use.
> Specifically, I am interested in the Kafka Commit Transform
>  feature in java sdk. This
> will also require being able to specify if the metadata is needed or not as
> part of the KafkaIO transform creation. In addition, the 
> `commitOffsetsInFinalize`
> and `TimestampPolicyFactory`parameters are also missing from the python
> wrapper.
>
> My use case is as follows:
> I have Kafka topics that have data corresponding to the Mongo change
> streams produced by the Mongo Source Kafka connector. In my pipeline, I
> read these updated mongo documents, apply some transformations and stream
> them to BigQuery.
>
> Currently, I can only use the `auto.commit` from Kafka consumer config and
> I believe the message is acked/offset committed after the consumer in
> KafkaIO finishes reading them. If there are any errors in later stages of
> my pipeline or if the pipeline is restarted and it can't be drained
> gracefully, I will lose the already acked messages. Hence, I want to commit
> the offsets only after they are successfully written to BigQuery.
>
> Here is a snippet of my pipeline code.
>
>> def run(self, pipeline: Pipeline):
>>consumer_config = {
>>   'bootstrap.servers': self.bootstrap_servers,
>>   'auto.offset.reset': 'latest',
>>   # Ideally we want auto.commit disabled, but we need a way to 
>> acknowledge the messages manually
>>   # 'enable.auto.commit': 'false',  # messages must be acked explicitly 
>> but prevents loss in case of failures
>>   'auto.commit.interval.ms': '6',  # keep a high value since manual 
>> commits are not supported and messages will be lost if there is an error in 
>> the pipeline
>>   'group.id': 'dev_streaming_dr_beam_pipeline'
>>}
>>
>>streamed_dr_kvs_raw = (
>>  pipeline
>>  | 'Read from Kafka Stream' >>
>>  ReadFromKafka(
>> consumer_config=consumer_config,
>> topics=['mongo_kafka_connect.requests'],
>> max_num_records=1,
>>  )
>>)
>>
>>dr_data_stream = streamed_dr_kvs_raw | 'Kafka Message Deserializer' >> 
>> ParDo(MongoKafkaMessageDeserializer())
>>
>>filter_record_fn: Callable[[MongoKafkaMessage], MongoKafkaMessage] = 
>> lambda elem: elem.mongo_document is not None
>>filtered_dr_ds_with_record_ts = (
>>  dr_data_stream
>>  | 'Filter empty values' >> Filter(filter_record_fn)
>>  | 'Extract Timestamp' >> 
>> ParDo(MongoKafkaMessageTimestampExtractor())
>>)
>>
>># The lateness window defines how long the state is kept for older windows
>># and saving state for a longer duration can create memory pressure. 
>> Note, the state is
>># saved in persistent disk but is optimistically fetched in the local 
>> memory.
>>batched_dr_records = filtered_dr_ds_with_record_ts | 'Window Batch' >> 
>> WindowInto(
>>   FixedWindows(30),  # 30 seconds
>>   trigger=AfterWatermark(late=AfterProcessingTime(1 * 60)),  # any time 
>> late data arrives after 1 min
>>   allowed_lateness=24 * 60 * 60, # 24 hours for late data
>>   accumulation_mode=AccumulationMode.DISCARDING
>>)
>>
>>extract_mongo_doc_fn: Callable[[List[MongoKafkaMessage]], dict] = lambda 
>> elems: elems[0].mongo_document
>>de_duped_dr_records = (
>>  batched_dr_records
>>  | 'Group by Message Key' >> GroupBy('message_key')
>>  | 'Select Latest' >> Latest.PerKey()
>>  | 'Values' >> Values()
>>  | 'Extract mongo document' >> Map(extract_mongo_doc_fn)
>>)
>>
>>dr_with_features = 

Re: Huge memory usage increase after 2.26.

2021-01-14 Thread Boyuan Zhang
Thanks for reporting such an issue.

Do you happen to have a heap dump when OOM happens? Than might help us to
identify which part causes huge memory usage/loss.

On Thu, Jan 14, 2021 at 8:35 AM Yuhong Cheng 
wrote:

> Hi Beam,
>
>  We are running a beam pipeline on spark, whose pipeline is mainly like
> this:
>
> pipeline
>
>.apply(AvroIO
>
>.readGenericRecords(schema)
>
>.from(// a direction)
>
>.apply(ParDo.of(
>
>new DoFn() {
>
>  @DoFn.ProcessElement
>
>  public void process(ProcessContext c) {
>
>   // convert to another type
>
>  }
>
>}))
>
>.setCoder(..)
>
>.apply(// write to a file);
>
> It ran well before. But when we upgraded to beam 2.26. , the memory used
> by the job increased a lot and we met the GC limit exception:
>
> Container exited with a non-zero exit code 52. Error file: prelaunch.err.
>
> 13-01-2021 12:25:32 PST  INFO - Last 4096 bytes of prelaunch.err :
>
> 13-01-2021 12:25:32 PST  INFO - Last 4096 bytes of stderr :
>
> 13-01-2021 12:25:32 PST  INFO -
> e.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
>
> 13-01-2021 12:25:32 PST  INFO - at
> org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
>
> 13-01-2021 12:25:32 PST  INFO - at
> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
>
> 13-01-2021 12:25:32 PST  INFO - at
> org.apache.spark.scheduler.Task.run(Task.scala:109)
>
> 13-01-2021 12:25:32 PST  INFO - at
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:429)
>
> 13-01-2021 12:25:32 PST  INFO - at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>
> 13-01-2021 12:25:32 PST  INFO - at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>
> 13-01-2021 12:25:32 PST  INFO - at java.lang.Thread.run(Thread.java:748)
>
> 13-01-2021 12:25:32 PST  INFO - Caused by: java.lang.OutOfMemoryError: GC
> overhead limit exceeded
>
> 13-01-2021 12:25:32 PST  INFO - at
> java.util.HashMap.newNode(HashMap.java:1747)
>
> 13-01-2021 12:25:32 PST  INFO - at
> java.util.HashMap.putVal(HashMap.java:642)
>
> 13-01-2021 12:25:32 PST  INFO - at java.util.HashMap.put(HashMap.java:612)
>
> 13-01-2021 12:25:32 PST  INFO - at
> org.apache.avro.generic.GenericDatumReader.addToMap(GenericDatumReader.java:275)
>
> 13-01-2021 12:25:32 PST  INFO - at
> org.apache.avro.generic.GenericDatumReader.readMap(GenericDatumReader.java:256)
>
> 13-01-2021 12:25:32 PST  INFO - at
> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:154)
>
> 13-01-2021 12:25:32 PST  INFO - at
> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:155)
>
> 13-01-2021 12:25:32 PST  INFO - at
> org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:193)
>
> 13-01-2021 12:25:32 PST  INFO - at
> org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:183)
>
> 13-01-2021 12:25:32 PST  INFO - at
> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:151)
>
> 13-01-2021 12:25:32 PST  INFO - at
> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:142)
>
> 13-01-2021 12:25:32 PST  INFO - at
> org.apache.beam.sdk.io.AvroSource$AvroBlock.readNextRecord(AvroSource.java:647)
>
> 13-01-2021 12:25:32 PST  INFO - at
> org.apache.beam.sdk.io.BlockBasedSource$BlockBasedReader.readNextRecord(BlockBasedSource.java:212)
>
> 13-01-2021 12:25:32 PST  INFO - at
> org.apache.beam.sdk.io.FileBasedSource$FileBasedReader.advanceImpl(FileBasedSource.java:487)
>
> 13-01-2021 12:25:32 PST  INFO - at
> org.apache.beam.sdk.io.OffsetBasedSource$OffsetBasedReader.advance(OffsetBasedSource.java:258)
>
> 13-01-2021 12:25:32 PST  INFO - at
> org.apache.beam.sdk.io.Read$BoundedSourceAsSDFWrapperFn$BoundedSourceAsSDFRestrictionTracker.tryClaim(Read.java:347)
>
> 13-01-2021 12:25:32 PST  INFO - at
> org.apache.beam.sdk.io.Read$BoundedSourceAsSDFWrapperFn$BoundedSourceAsSDFRestrictionTracker.tryClaim(Read.java:312)
>
> 13-01-2021 12:25:32 PST  INFO - at
> org.apache.beam.sdk.fn.splittabledofn.RestrictionTrackers$RestrictionTrackerObserver.tryClaim(RestrictionTrackers.java:59)
>
> 13-01-2021 12:25:32 PST  INFO - at
> org.apache.beam.sdk.io.Read$BoundedSourceAsSDFWrapperFn.processElement(Read.java:298)
>
> 13-01-2021 12:25:32 PST  INFO - at
> org.apache.beam.sdk.io.Read$BoundedSourceAsSDFWrapperFn$DoFnInvoker.invokeProcessElement(Unknown
> Source)
>
> 13-01-2021 12:25:32 PST  INFO - at
> org.apache.beam.runners.core.construction.SplittableParDoNaiveBounded$NaiveProcessFn.process(SplittableParDoNaiveBounded.java:309)
>
> 13-01-2021 12:25:32 PST  INFO - at
> org.apache.beam.runners.core.construction.SplittableParDoNaiveBounded$NaiveProcessFn$DoFnInvoker.invokeProcessElement(Unknown
> Source)
>
> 13-01-2021 12:25:32 PST  INFO - at
> org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:232)
>
> 13-01-2021 12:25:32 PST  INFO - at
> 

Re: Usability regression using SDF Unbounded Source wrapper + DirectRunner

2021-01-12 Thread Boyuan Zhang
Hi,

I proposed to make runner-issue checkpoint frequency configurable for a
pipeline author here:
https://docs.google.com/document/d/18jNLtTyyApx0N2ytp1ytOMmUPLouj2h08N3-4SyWGgQ/edit?usp=sharing.
I believe it will also be helpful for the performance issue. Please feel
free to drop any comments there : )

On Wed, Jan 6, 2021 at 1:14 AM Jan Lukavský  wrote:

> Sorry for the typo in your name. :-)
>
> On 1/6/21 10:11 AM, Jan Lukavský wrote:
> > Hi Antonie,
> >
> > yes, for instance. I'd just like to rule out possibility that a single
> > DoFn processing multiple partitions (restrictions) brings some
> > overhead in your case.
> >
> > Jan
> >
> > On 12/31/20 10:36 PM, Antonio Si wrote:
> >> Hi Jan,
> >>
> >> Sorry for the late reply. My topic has 180 partitions. Do you mean
> >> run with a
> >> parallelism set to 900?
> >>
> >> Thanks.
> >>
> >> Antonio.
> >>
> >> On 2020/12/23 20:30:34, Jan Lukavský  wrote:
> >>> OK,
> >>>
> >>> could you make an experiment and increase the parallelism to something
> >>> significantly higher than the total number of partitions? Say 5 times
> >>> higher? Would that have impact on throughput in your case?
> >>>
> >>> Jan
> >>>
> >>> On 12/23/20 7:03 PM, Antonio Si wrote:
> >>>> Hi Jan,
> >>>>
> >>>> The performance data that I reported was run with parallelism = 8.
> >>>> We also ran with parallelism = 15 and we observed similar behaviors
> >>>> although I don't have the exact numbers. I can get you the numbers
> >>>> if needed.
> >>>>
> >>>> Regarding number of partitions, since we have multiple topics, the
> >>>> number of partitions varies from 180 to 12. The highest TPS topic
> >>>> has 180 partitions, while the lowest TPS topic has 12 partitions.
> >>>>
> >>>> Thanks.
> >>>>
> >>>> Antonio.
> >>>>
> >>>> On 2020/12/23 12:28:42, Jan Lukavský  wrote:
> >>>>> Hi Antonio,
> >>>>>
> >>>>> can you please clarify a few things:
> >>>>>
> >>>>> a) what parallelism you use for your sources
> >>>>>
> >>>>> b) how many partitions there is in your topic(s)
> >>>>>
> >>>>> Thanks,
> >>>>>
> >>>>> Jan
> >>>>>
> >>>>> On 12/22/20 10:07 PM, Antonio Si wrote:
> >>>>>> Hi Boyuan,
> >>>>>>
> >>>>>> Let me clarify, I have tried with and without using
> >>>>>> --experiments=beam_fn_api,use_sdf_kafka_read option:
> >>>>>>
> >>>>>> -  with --experiments=use_deprecated_read --fasterrCopy=true, I
> >>>>>> am able to achieve 13K TPS
> >>>>>> -  with --experiments="beam_fn_api,use_sdf_kafka_read"
> >>>>>> --fasterCopy=true, I am able to achieve 10K
> >>>>>> -  with --fasterCopy=true alone, I am only able to achieve 5K TPS
> >>>>>>
> >>>>>> In our testcase, we have multiple topics, checkpoint intervals is
> >>>>>> 60s. Some topics have a lot higher traffics than others. We look
> >>>>>> at the case with --experiments="beam_fn_api,use_sdf_kafka_read"
> >>>>>> --fasterCopy=true options a little. Based on our observation,
> >>>>>> each consumer poll() in ReadFromKafkaDoFn.processElement() takes
> >>>>>> about 0.8ms. So for topic with high traffics, it will continue in
> >>>>>> the loop because every poll() will return some records. Every
> >>>>>> poll returns about 200 records. So, it takes about 0.8ms for
> >>>>>> every 200 records. I am not sure if that is part of the reason
> >>>>>> for the performance.
> >>>>>>
> >>>>>> Thanks.
> >>>>>>
> >>>>>> Antonio.
> >>>>>>
> >>>>>> On 2020/12/21 19:03:19, Boyuan Zhang  wrote:
> >>>>>>> Hi Antonio,
> >>>>>>>
> >>>>>>> Thanks for the data point. That's very valuable information!
> >>>>>>>
> >>>>>>> I didn't use DirectRunner

Re: Null checking in Beam

2021-01-11 Thread Boyuan Zhang
Yeah it seems like the checker is enabled:
https://issues.apache.org/jira/browse/BEAM-10402. I used
@SuppressWarnings({"nullness" )}) to suppress the error when I think it's
not really a concern.

On Mon, Jan 11, 2021 at 8:28 PM Reuven Lax  wrote:

> Has extra Nullable checking been enabled in the Beam project? I have a PR
> that was on hold for several months, and I'm struggling now with compile
> failing to complaints about assigning something that is nullable to
> something that is not nullable. Even when the immediate control flow makes
> it absolutely impossible for the variable to be null.
>
> Has something changed here?
>
> Reuven
>


Re: Understanding KafkaIO.Read In Batch Mode

2021-01-11 Thread Boyuan Zhang
Would you like to share the whole pipeline code here? With what kind of
configuration you will notice OOM, like how many elements, how many shards
and how many parallelisms you have for your run?

On Mon, Jan 11, 2021 at 8:11 PM shrikant bang 
wrote:

> Hi Boyuan,
>
>Thank you for your response.  If I understood correctly, does each
> shard's output get batched into memory in  DoFnOutputManager [1] before
> passing to downstream operations?
>
>I am trying to understand the root cause of OOM in executors if I
> increase the max number of records to be read without changing the
> executor's memory, numbers or cpu cores assigned.
>
>   Below is a snippet for reading from Kafka:
>
>   PTransform>> kafka =
> KafkaIO.read()
> .withBootstrapServers(options.getBootstrap())
> .withTopic(options.getTopic())
> .withKeyDeserializer(StringDeserializer.class)
> .withValueDeserializer(StringDeserializer.class)
> .withMaxNumRecords(options.getKafkaRecordsToBeRead())
> .commitOffsetsInFinalize()
> .withConsumerConfigUpdates(map)
> .commitOffsetsInFinalize()
> .withoutMetadata();
>
>
> [1]
> https://github.com/apache/beam/blob/master/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/MultiDoFnFunction.java#L244
>
>
>  Thank You,
> Shrikant Bang
>
>
> On Mon, Jan 11, 2021 at 11:54 PM Boyuan Zhang  wrote:
>
>> +dev 
>>
>> Hi Shrikant,
>> If you look into the expansion of BoundedReadfromUnboundedSource[1], you
>> will notice that it will expand into Create single shard -> Split into
>> multiple shard -> read from one shard. The number of records from one shard
>> will not be larger than 1 and the number of shards will not be larger
>> than 100[2].
>>
>> Back to your questions,  the OutputReceiver will output the received
>> element to the downstream operation immediately. It will not keep the
>> record in batch in memory. But you are right that
>> BoundedReadFromUnboundedSource is memory-sensitive especially if your
>> records or your downstream operations consume a lot.
>>
>> [1]
>> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BoundedReadFromUnboundedSource.java#L93-L121
>> [2]
>> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BoundedReadFromUnboundedSource.java#L156-L161
>>
>>
>> On Mon, Jan 11, 2021 at 5:48 AM shrikant bang <
>> mailtoshrikant.b...@gmail.com> wrote:
>>
>>> Hi Team,
>>>
>>> I have ETL use cases with source as Kafka ( in *Batch* mode)
>>> with SparkRunner. I am trying to understand the internals' of
>>> KafkaIO.Read.
>>>
>>> Can someone please confirm if my understanding is correct?
>>>
>>>- WindowedContextOutputReceiver is getting used for collecting Kafka
>>> records from KafkaIO.Read from BoundedReadFromUnboundedSource [1].
>>>- All read Kafka records get stored in memory and gets spilled to
>>> downstream once the loop ends in Read function [2].
>>>
>>> Ref :
>>> [1] :
>>> https://github.com/apache/beam/blob/3bb232fb098700de408f574585dfe74bbaff7230/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BoundedReadFromUnboundedSource.java#L206
>>>
>>> [2] :
>>> https://github.com/apache/beam/blob/3bb232fb098700de408f574585dfe74bbaff7230/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BoundedReadFromUnboundedSource.java#L201
>>>
>>>
>>>
>>> Thank You,
>>> Shrikant Bang.
>>>
>>> On Sun, Jan 10, 2021 at 11:43 PM shrikant bang <
>>> mailtoshrikant.b...@gmail.com> wrote:
>>>
>>>> Hi Team,
>>>>
>>>>I have below questions/ understandings for KafkaIO.Read in batch
>>>> mode :
>>>>
>>>>1. I built an understanding on debugging that, KafkaIO converts
>>>>unbounded stream into bounded read and *buffers all records* till
>>>>either of criteria matches - max records/ max time to read.
>>>>If this understanding is correct, then read is memory intensive as
>>>>KafkaIO has to buffer all read records before passing to down-streams. 
>>>> Is
>>>>my understanding correct?
>>>>
>>>>2. If #1 is correct, then is there any way we can keep writing
>>>>records instead of buffering into memory in KafkaIO read operation (in
>>>>batch mode) ?
>>>>
>>>>
>>>> Thank You,
>>>> Shrikant Bang
>>>>
>>>


Re: please add me in beam project

2021-01-11 Thread Boyuan Zhang
Hi Keisuke,

Welcome!
For creating a PR, there is no additional permission required. You may want
to create a JIRA account and ask for permission for task tracking. You may
also want to check this out to get familiar with how to do contribution:
https://beam.apache.org/contribute/

On Mon, Jan 11, 2021 at 10:16 AM 谷口恵輔  wrote:

> Hi my name is Keisuke.
> I am sending a message because I would like to be a beam contributor.
> could you please give me a permission to send a pull request?
>
> this is my github account
> https://github.com/case-k-git
>
>  I read this document and send you a message but this is the first time
> for me to join oss project.
> so if there is not enough information please let me know. thank you
>
> https://github.com/apache/beam
>
>


Re: Understanding KafkaIO.Read In Batch Mode

2021-01-11 Thread Boyuan Zhang
+dev 

Hi Shrikant,
If you look into the expansion of BoundedReadfromUnboundedSource[1], you
will notice that it will expand into Create single shard -> Split into
multiple shard -> read from one shard. The number of records from one shard
will not be larger than 1 and the number of shards will not be larger
than 100[2].

Back to your questions,  the OutputReceiver will output the received
element to the downstream operation immediately. It will not keep the
record in batch in memory. But you are right that
BoundedReadFromUnboundedSource is memory-sensitive especially if your
records or your downstream operations consume a lot.

[1]
https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BoundedReadFromUnboundedSource.java#L93-L121
[2]
https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BoundedReadFromUnboundedSource.java#L156-L161


On Mon, Jan 11, 2021 at 5:48 AM shrikant bang 
wrote:

> Hi Team,
>
> I have ETL use cases with source as Kafka ( in *Batch* mode)
> with SparkRunner. I am trying to understand the internals' of
> KafkaIO.Read.
>
> Can someone please confirm if my understanding is correct?
>
>- WindowedContextOutputReceiver is getting used for collecting Kafka
> records from KafkaIO.Read from BoundedReadFromUnboundedSource [1].
>- All read Kafka records get stored in memory and gets spilled to
> downstream once the loop ends in Read function [2].
>
> Ref :
> [1] :
> https://github.com/apache/beam/blob/3bb232fb098700de408f574585dfe74bbaff7230/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BoundedReadFromUnboundedSource.java#L206
>
> [2] :
> https://github.com/apache/beam/blob/3bb232fb098700de408f574585dfe74bbaff7230/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BoundedReadFromUnboundedSource.java#L201
>
>
>
> Thank You,
> Shrikant Bang.
>
> On Sun, Jan 10, 2021 at 11:43 PM shrikant bang <
> mailtoshrikant.b...@gmail.com> wrote:
>
>> Hi Team,
>>
>>I have below questions/ understandings for KafkaIO.Read in batch mode :
>>
>>1. I built an understanding on debugging that, KafkaIO converts
>>unbounded stream into bounded read and *buffers all records* till
>>either of criteria matches - max records/ max time to read.
>>If this understanding is correct, then read is memory intensive as
>>KafkaIO has to buffer all read records before passing to down-streams. Is
>>my understanding correct?
>>
>>2. If #1 is correct, then is there any way we can keep writing
>>records instead of buffering into memory in KafkaIO read operation (in
>>batch mode) ?
>>
>>
>> Thank You,
>> Shrikant Bang
>>
>


Support of KafkaIO Dynamic Read

2021-01-07 Thread Boyuan Zhang
Hi team,

I'm working on KafkaIO dynamic read support which is tracked by BEAM-11325
 and I started the
documentation here:
https://docs.google.com/document/d/1FU3GxVRetHPLVizP3Mdv6mP5tpjZ3fd99qNjUI5DT5k/edit?usp=sharing,
which states the problem I want to solve and the proposed solutions.

Please feel free to drop any comments/concerns/suggestions/ideas : ) If the
design looks good in general, I'll start the dev work as soon as possible.

Thanks for your help!


Re: Compatibility between Beam v2.23 and Beam v2.26

2021-01-05 Thread Boyuan Zhang
https://github.com/apache/beam/pull/13240 seems suspicious to me.

 +Maximilian Michels  Any insights here?

On Tue, Jan 5, 2021 at 8:48 AM Antonio Si  wrote:

> Hi,
>
> I would like to followup with this question to see if there is a
> solution/workaround for this issue.
>
> Thanks.
>
> Antonio.
>
> On 2020/12/19 18:33:48, Antonio Si  wrote:
> > Hi,
> >
> > We were using Beam v2.23 and recently, we are testing upgrade to Beam
> v2.26. For Beam v2.26, we are passing --experiments=use_deprecated_read and
> --fasterCopy=true.
> >
> > We run into this exception when we resume our pipeline:
> >
> > Caused by: java.io.InvalidClassException:
> org.apache.beam.runners.flink.translation.types.CoderTypeSerializer; local
> class incompatible: stream classdesc serialVersionUID =
> 5241803328188007316, local class serialVersionUID = 7247319138941746449
> >   at
> java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:699)
> >   at
> java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1942)
> >   at
> java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1808)
> >   at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2099)
> >   at
> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1625)
> >   at java.io.ObjectInputStream.readObject(ObjectInputStream.java:465)
> >   at java.io.ObjectInputStream.readObject(ObjectInputStream.java:423)
> >   at
> org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil$TypeSerializerSerializationProxy.read(TypeSerializerSerializationUtil.java:301)
> >   at
> org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.tryReadSerializer(TypeSerializerSerializationUtil.java:116)
> >   at
> org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot.readSnapshot(TypeSerializerConfigSnapshot.java:113)
> >   at
> org.apache.flink.api.common.typeutils.TypeSerializerSnapshot.readVersionedSnapshot(TypeSerializerSnapshot.java:174)
> >   at
> org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil$TypeSerializerSnapshotSerializationProxy.deserializeV2(TypeSerializerSnapshotSerializationUtil.java:179)
> >   at
> org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil$TypeSerializerSnapshotSerializationProxy.read(TypeSerializerSnapshotSerializationUtil.java:150)
> >   at
> org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil.readSerializerSnapshot(TypeSerializerSnapshotSerializationUtil.java:76)
> >   at
> org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshotReadersWriters$CurrentReaderImpl.readStateMetaInfoSnapshot(StateMetaInfoSnapshotReadersWriters.java:219)
> >   at
> org.apache.flink.runtime.state.OperatorBackendSerializationProxy.read(OperatorBackendSerializationProxy.java:119)
> >   at
> org.apache.flink.runtime.state.OperatorStateRestoreOperation.restore(OperatorStateRestoreOperation.java:83)
> >
> > It looks like it is not able to deserialize objects from our existing
> checkpoints. Is there any way we could resume our v2.23 checkpoints by
> v2.26?
> >
> > Thanks for any suggestions.
> >
> > Antonio.
> >
>


Re: Add E2E test for Kafka to Pub/Sub complete example

2020-12-30 Thread Boyuan Zhang
Thanks for your contribution! I can help review PRs related to this topic.

On Wed, Dec 30, 2020 at 7:31 AM Ramazan Yapparov <
ramazan.yappa...@akvelon.com> wrote:

> Hi Beam Community,
>
> Recently our team added Kafka to Pub/Sub example pipeline to Apache Beam
> repository.
> We decided to move E2E tests out of the scope because we needed more time
> for investigation and implementation.
> And now the test is ready, the PR is open.
>
> Here are the jira tickets for this PR:
> https://issues.apache.org/jira/browse/BEAM-11410
> https://issues.apache.org/jira/browse/BEAM-11411
>
> Please share your feedback/comments about this proposal in the thread.
>
>
> Here is the PR, please take a look at it and feel free to share your
> comments:
> https://github.com/apache/beam/pull/13636
>
> Thank you,
> Ramazan
>
>


Re: Usability regression using SDF Unbounded Source wrapper + DirectRunner

2020-12-28 Thread Boyuan Zhang
Hi Steve,

We have one wrapper optimization[1] merged in and it will be released with
2.27.0. Would you like to verify whether it helps improve the
performance on DirectRunner?

[1] https://github.com/apache/beam/pull/13592

On Mon, Dec 28, 2020 at 12:17 PM Boyuan Zhang  wrote:

> Hi Antonio,
>
> Thanks for the data! I want to elaborate more on where the overhead could
> come from when on Flink.
>
> -  with --experiments=use_deprecated_read --fasterrCopy=true, I am able to
>> achieve 13K TPS
>
>
> This execution uses UnboundedSource path, where the checkpoint frequency
> for source reading is configured as the same frequency of flink checkpoint
> interval. In your case, the checkpoint frequency is every 60s.
> Flink reschedule the checkpoint marks to process by reading from states.
> Thue the overhead here could be the time for executing
> source.getCheckpointMark + reading from/writing to state + overhead of
> flink checkpoint execution.
>
>
>> -  with --experiments="beam_fn_api,use_sdf_kafka_read" --fasterCopy=true,
>> I am able to achieve 10K
>>
>
> This execution uses Kafka SDF implementation, where the
> checkpoint frequency is configutred as every 1 elements or every 10
> seconds by the OutputAndTimeBoundedSplittableProcessElementInvoker. As you
> mentioned that every poll takes 0.8s and returns 200 elements. So the
> checkpoint frequency here should be every 4s(hitting the 1 limit).
> The residuals can be from runner-issued checkpoint or SDF self-checkpoint.
> Flink reshedules the residuals by using Timer and State.
> Thus the overhead here could be the time for scheduling timers + reading
> from/writing to states. I would expect to see improvements if we control
> the frequency longer than 60s(for example, every 60s or every 15000
> elements).
>
>
>> -  with --fasterCopy=true alone, I am only able to achieve 5K TPS
>
>
> This execution uses UnboundedSourceAsSDFWrapperFn path, where the
> checkpoint frequency is also every 1 elements or every 10
> seconds. Flink also reshedules the residuals by using Timer and State. So
> the overhead here could be the time for scheduling timers + reading
> from/writing to states +  overhead of the wrapper wrapping unbounded source.
>
>
> On Wed, Dec 23, 2020 at 12:30 PM Jan Lukavský  wrote:
>
>> OK,
>>
>> could you make an experiment and increase the parallelism to something
>> significantly higher than the total number of partitions? Say 5 times
>> higher? Would that have impact on throughput in your case?
>>
>> Jan
>>
>> On 12/23/20 7:03 PM, Antonio Si wrote:
>> > Hi Jan,
>> >
>> > The performance data that I reported was run with parallelism = 8. We
>> also ran with parallelism = 15 and we observed similar behaviors although I
>> don't have the exact numbers. I can get you the numbers if needed.
>> >
>> > Regarding number of partitions, since we have multiple topics, the
>> number of partitions varies from 180 to 12. The highest TPS topic has 180
>> partitions, while the lowest TPS topic has 12 partitions.
>> >
>> > Thanks.
>> >
>> > Antonio.
>> >
>> > On 2020/12/23 12:28:42, Jan Lukavský  wrote:
>> >> Hi Antonio,
>> >>
>> >> can you please clarify a few things:
>> >>
>> >>a) what parallelism you use for your sources
>> >>
>> >>b) how many partitions there is in your topic(s)
>> >>
>> >> Thanks,
>> >>
>> >>Jan
>> >>
>> >> On 12/22/20 10:07 PM, Antonio Si wrote:
>> >>> Hi Boyuan,
>> >>>
>> >>> Let me clarify, I have tried with and without using
>> --experiments=beam_fn_api,use_sdf_kafka_read option:
>> >>>
>> >>> -  with --experiments=use_deprecated_read --fasterrCopy=true, I am
>> able to achieve 13K TPS
>> >>> -  with --experiments="beam_fn_api,use_sdf_kafka_read"
>> --fasterCopy=true, I am able to achieve 10K
>> >>> -  with --fasterCopy=true alone, I am only able to achieve 5K TPS
>> >>>
>> >>> In our testcase, we have multiple topics, checkpoint intervals is
>> 60s. Some topics have a lot higher traffics than others. We look at the
>> case with --experiments="beam_fn_api,use_sdf_kafka_read" --fasterCopy=true
>> options a little. Based on our observation, each consumer poll() in
>> ReadFromKafkaDoFn.processElement() takes about 0.8ms. So for topic with
>> high traffics, it will continue in the loop because every poll() will
>> return some re

Re: Usability regression using SDF Unbounded Source wrapper + DirectRunner

2020-12-28 Thread Boyuan Zhang
Hi Antonio,

Thanks for the data! I want to elaborate more on where the overhead could
come from when on Flink.

-  with --experiments=use_deprecated_read --fasterrCopy=true, I am able to
> achieve 13K TPS


This execution uses UnboundedSource path, where the checkpoint frequency
for source reading is configured as the same frequency of flink checkpoint
interval. In your case, the checkpoint frequency is every 60s.
Flink reschedule the checkpoint marks to process by reading from states.
Thue the overhead here could be the time for executing
source.getCheckpointMark + reading from/writing to state + overhead of
flink checkpoint execution.


> -  with --experiments="beam_fn_api,use_sdf_kafka_read" --fasterCopy=true,
> I am able to achieve 10K
>

This execution uses Kafka SDF implementation, where the
checkpoint frequency is configutred as every 1 elements or every 10
seconds by the OutputAndTimeBoundedSplittableProcessElementInvoker. As you
mentioned that every poll takes 0.8s and returns 200 elements. So the
checkpoint frequency here should be every 4s(hitting the 1 limit).
The residuals can be from runner-issued checkpoint or SDF self-checkpoint.
Flink reshedules the residuals by using Timer and State.
Thus the overhead here could be the time for scheduling timers + reading
from/writing to states. I would expect to see improvements if we control
the frequency longer than 60s(for example, every 60s or every 15000
elements).


> -  with --fasterCopy=true alone, I am only able to achieve 5K TPS


This execution uses UnboundedSourceAsSDFWrapperFn path, where the
checkpoint frequency is also every 1 elements or every 10
seconds. Flink also reshedules the residuals by using Timer and State. So
the overhead here could be the time for scheduling timers + reading
from/writing to states +  overhead of the wrapper wrapping unbounded source.


On Wed, Dec 23, 2020 at 12:30 PM Jan Lukavský  wrote:

> OK,
>
> could you make an experiment and increase the parallelism to something
> significantly higher than the total number of partitions? Say 5 times
> higher? Would that have impact on throughput in your case?
>
> Jan
>
> On 12/23/20 7:03 PM, Antonio Si wrote:
> > Hi Jan,
> >
> > The performance data that I reported was run with parallelism = 8. We
> also ran with parallelism = 15 and we observed similar behaviors although I
> don't have the exact numbers. I can get you the numbers if needed.
> >
> > Regarding number of partitions, since we have multiple topics, the
> number of partitions varies from 180 to 12. The highest TPS topic has 180
> partitions, while the lowest TPS topic has 12 partitions.
> >
> > Thanks.
> >
> > Antonio.
> >
> > On 2020/12/23 12:28:42, Jan Lukavský  wrote:
> >> Hi Antonio,
> >>
> >> can you please clarify a few things:
> >>
> >>a) what parallelism you use for your sources
> >>
> >>b) how many partitions there is in your topic(s)
> >>
> >> Thanks,
> >>
> >>Jan
> >>
> >> On 12/22/20 10:07 PM, Antonio Si wrote:
> >>> Hi Boyuan,
> >>>
> >>> Let me clarify, I have tried with and without using
> --experiments=beam_fn_api,use_sdf_kafka_read option:
> >>>
> >>> -  with --experiments=use_deprecated_read --fasterrCopy=true, I am
> able to achieve 13K TPS
> >>> -  with --experiments="beam_fn_api,use_sdf_kafka_read"
> --fasterCopy=true, I am able to achieve 10K
> >>> -  with --fasterCopy=true alone, I am only able to achieve 5K TPS
> >>>
> >>> In our testcase, we have multiple topics, checkpoint intervals is 60s.
> Some topics have a lot higher traffics than others. We look at the case
> with --experiments="beam_fn_api,use_sdf_kafka_read" --fasterCopy=true
> options a little. Based on our observation, each consumer poll() in
> ReadFromKafkaDoFn.processElement() takes about 0.8ms. So for topic with
> high traffics, it will continue in the loop because every poll() will
> return some records. Every poll returns about 200 records. So, it takes
> about 0.8ms for every 200 records. I am not sure if that is part of the
> reason for the performance.
> >>>
> >>> Thanks.
> >>>
> >>> Antonio.
> >>>
> >>> On 2020/12/21 19:03:19, Boyuan Zhang  wrote:
> >>>> Hi Antonio,
> >>>>
> >>>> Thanks for the data point. That's very valuable information!
> >>>>
> >>>> I didn't use DirectRunner. I am using FlinkRunner.
> >>>>> We measured the number of Kafka messages that we can processed per
> second.
> >>>>> With Beam v2

Re: Usability regression using SDF Unbounded Source wrapper + DirectRunner

2020-12-21 Thread Boyuan Zhang
Hi Antonio,

I'm getting one more question for your Kafka experiment on FlinkRunner. I'm
wondering what your checkpoint interval is for your flink application.

The reason why I ask is that IIUC, creating connections in Kafka should be
really cheap. So I would imagine the overhead here should be different from
the PubSub case. In Flink, the checkpoint frequency for SDF is configured
as 1 elements or 10 seconds(note that the checkpoint here is not the
same concept of Flink checkpoint). With UnboundedSource implementation, the
frequency of source checkpoint depends on the flink checkpoint frequency.

On Mon, Dec 21, 2020 at 1:16 PM Jan Lukavský  wrote:

> Sure. My ID is je-ik.
>
> Thanks,
>
>  Jan
> On 12/21/20 8:43 PM, Boyuan Zhang wrote:
>
> Thanks for your explanation, Jan. Now I can see what you mean here. I can
> try to have a PR to do such optimization. Would you like to share your
> github ID with me to review the PR later?
>
> On Mon, Dec 21, 2020 at 11:15 AM Robert Bradshaw 
> wrote:
>
>> If readers are expensive to create, this seems like an important (and not
>> too difficult) optimization.
>>
>> On Mon, Dec 21, 2020 at 11:04 AM Jan Lukavský  wrote:
>>
>>> Hi Boyuan,
>>>
>>> I think your analysis is correct - with one exception. It should  be
>>> possible to reuse the reader if and only if the last taken CheckpointMark
>>> equals to the new CheckpointMark the reader would be created from. But -
>>> this equality is on the happy path and should be satisfied for vast
>>> majority of invocations, so it will spare many call to createReader.
>>> Actually, it should be non-equal only after recovery from checkpoint, but
>>> then there should be no reader. So to be technically correct, we should
>>> keep the last CheckpointMark along with the open reader, but that might
>>> turn out to be non-necessary (I'm not sure about that and I would
>>> definitely keep the last CheckpointMark, because it is better safe than
>>> sorry :))
>>>
>>> Jan
>>> On 12/21/20 7:54 PM, Boyuan Zhang wrote:
>>>
>>> Hi Jan,
>>>>
>>>> it seems that what we would want is to couple the lifecycle of the
>>>> Reader not with the restriction but with the particular instance of
>>>> (Un)boundedSource (after being split). That could be done in the processing
>>>> DoFn, if it contained a cache mapping instance of the source to the
>>>> (possibly null - i.e. not yet open) reader. In @NewTracker we could assign
>>>> (or create) the reader to the tracker, as the tracker is created for each
>>>> restriction.
>>>>
>>>> WDYT?
>>>>
>>> I was thinking about this but it seems like it is not applicable to the
>>> way how UnboundedSource and UnboundedReader work together.
>>> Please correct me if I'm wrong. The UnboundedReader is created from
>>> UnboundedSource per CheckpointMark[1], which means for certain sources, the
>>> CheckpointMark could affect some attributes like start position of the
>>> reader when resuming. So a single UnboundedSource could be mapped to
>>> multiple readers because of different instances of CheckpointMarl. That's
>>> also the reason why we use CheckpointMark as the restriction.
>>>
>>> Please let me know if I misunderstand your suggestion.
>>>
>>> [1]
>>> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/io/UnboundedSource.java#L73-L78
>>>
>>> On Mon, Dec 21, 2020 at 9:18 AM Antonio Si  wrote:
>>>
>>>> Hi Boyuan,
>>>>
>>>> Sorry for my late reply. I was off for a few days.
>>>>
>>>> I didn't use DirectRunner. I am using FlinkRunner.
>>>>
>>>> We measured the number of Kafka messages that we can processed per
>>>> second.
>>>> With Beam v2.26 with --experiments=use_deprecated_read and
>>>> --fasterCopy=true,
>>>> we are able to consume 13K messages per second, but with Beam v2.26
>>>> without the use_deprecated_read option, we are only able to process 10K
>>>> messages
>>>> per second for the same pipeline.
>>>>
>>>> Thanks and regards,
>>>>
>>>> Antonio.
>>>>
>>>> On 2020/12/11 22:19:40, Boyuan Zhang  wrote:
>>>> > Hi Antonio,
>>>> >
>>>> > Thanks for the details! Which version of Beam SDK are you using? And
>>>> are
>>>> > you using --experiments=beam_fn_api 

Re: Usability regression using SDF Unbounded Source wrapper + DirectRunner

2020-12-21 Thread Boyuan Zhang
Thanks for your explanation, Jan. Now I can see what you mean here. I can
try to have a PR to do such optimization. Would you like to share your
github ID with me to review the PR later?

On Mon, Dec 21, 2020 at 11:15 AM Robert Bradshaw 
wrote:

> If readers are expensive to create, this seems like an important (and not
> too difficult) optimization.
>
> On Mon, Dec 21, 2020 at 11:04 AM Jan Lukavský  wrote:
>
>> Hi Boyuan,
>>
>> I think your analysis is correct - with one exception. It should  be
>> possible to reuse the reader if and only if the last taken CheckpointMark
>> equals to the new CheckpointMark the reader would be created from. But -
>> this equality is on the happy path and should be satisfied for vast
>> majority of invocations, so it will spare many call to createReader.
>> Actually, it should be non-equal only after recovery from checkpoint, but
>> then there should be no reader. So to be technically correct, we should
>> keep the last CheckpointMark along with the open reader, but that might
>> turn out to be non-necessary (I'm not sure about that and I would
>> definitely keep the last CheckpointMark, because it is better safe than
>> sorry :))
>>
>> Jan
>> On 12/21/20 7:54 PM, Boyuan Zhang wrote:
>>
>> Hi Jan,
>>>
>>> it seems that what we would want is to couple the lifecycle of the
>>> Reader not with the restriction but with the particular instance of
>>> (Un)boundedSource (after being split). That could be done in the processing
>>> DoFn, if it contained a cache mapping instance of the source to the
>>> (possibly null - i.e. not yet open) reader. In @NewTracker we could assign
>>> (or create) the reader to the tracker, as the tracker is created for each
>>> restriction.
>>>
>>> WDYT?
>>>
>> I was thinking about this but it seems like it is not applicable to the
>> way how UnboundedSource and UnboundedReader work together.
>> Please correct me if I'm wrong. The UnboundedReader is created from
>> UnboundedSource per CheckpointMark[1], which means for certain sources, the
>> CheckpointMark could affect some attributes like start position of the
>> reader when resuming. So a single UnboundedSource could be mapped to
>> multiple readers because of different instances of CheckpointMarl. That's
>> also the reason why we use CheckpointMark as the restriction.
>>
>> Please let me know if I misunderstand your suggestion.
>>
>> [1]
>> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/io/UnboundedSource.java#L73-L78
>>
>> On Mon, Dec 21, 2020 at 9:18 AM Antonio Si  wrote:
>>
>>> Hi Boyuan,
>>>
>>> Sorry for my late reply. I was off for a few days.
>>>
>>> I didn't use DirectRunner. I am using FlinkRunner.
>>>
>>> We measured the number of Kafka messages that we can processed per
>>> second.
>>> With Beam v2.26 with --experiments=use_deprecated_read and
>>> --fasterCopy=true,
>>> we are able to consume 13K messages per second, but with Beam v2.26
>>> without the use_deprecated_read option, we are only able to process 10K
>>> messages
>>> per second for the same pipeline.
>>>
>>> Thanks and regards,
>>>
>>> Antonio.
>>>
>>> On 2020/12/11 22:19:40, Boyuan Zhang  wrote:
>>> > Hi Antonio,
>>> >
>>> > Thanks for the details! Which version of Beam SDK are you using? And
>>> are
>>> > you using --experiments=beam_fn_api with DirectRunner to launch your
>>> > pipeline?
>>> >
>>> > For ReadFromKafkaDoFn.processElement(), it will take a Kafka
>>> > topic+partition as input element and a KafkaConsumer will be assigned
>>> to
>>> > this topic+partition then poll records continuously. The Kafka consumer
>>> > will resume reading and return from the process fn when
>>> >
>>> >- There are no available records currently(this is a feature of SDF
>>> >which calls SDF self-initiated checkpoint)
>>> >- The OutputAndTimeBoundedSplittableProcessElementInvoker issues
>>> >checkpoint request to ReadFromKafkaDoFn for getting partial
>>> results. The
>>> >checkpoint frequency for DirectRunner is every 100 output records
>>> or every
>>> >1 seconds.
>>> >
>>> > It seems like either the self-initiated checkpoint or DirectRunner
>>> issued
>>> > checkpoint gives you the performance regr

Re: Usability regression using SDF Unbounded Source wrapper + DirectRunner

2020-12-17 Thread Boyuan Zhang
Hi Jan,

I'm not saying that it's completely impossible to do so but I want to
explain why it's hard to apply these changes to existing SDF wrapper.

In the current SDF UnboundedSource wrapper[1], the restriction is the
. The reader is binded to the
UnboundedSourceAsSDFRestrictionTracker[2]. The reader is created from
CheckpointMark and is started when it's the first time to call
tracker.tryClaim(). The reader is closed when trySplit() happens
successfully. The DoFn only has access to the RestrictionTracker in
the @ProcessElement function, which means the SDF UnboundedSource wrapper
DoFn is not able to manage the reader directly though it's lifecycle. In
terms of the lifecycle of one RestrictionTracker instance, it is managed by
the invoker(or in portable execution, it's managed by the FnApiDoFnRunner).
The RestrictionTracker is created before @ProcessElement function
is invoked by calling @NewTracker, and it will be deconstructed when the
process function finishes.

It also makes sense to have CheckpointMark as the restriction because we
want to perform checkpoint on UnboundedSource.

[1]
https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java#L436
[2]
https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java#L750

On Thu, Dec 17, 2020 at 2:42 PM Jan Lukavský  wrote:

> Hi Boyuan,
>
> > Several changes could be made into PubSub SDF implementation specially.
> For example, the PuSub SDF can choose not respond to the checkpoint request
> when it thinks it's not a good time to do so. Besides, if the expensive
> connection can be binded to the lifecycle of the SDF instance instead of
> per restriction, the PubSub SDF implementation can choose to start the
> connection when the DoFn is started and close the connection when tearDown
> is called.
>
> Why the same cannot be applied to the general case? If we think about the
> "connection" and the "reader" as two abstract objects, it has the same
> methods - namely open and close, which is what defines the scope of the
> object. I think still think it should be possible to implement that
> generally.
>
> Jan
> On 12/17/20 11:19 PM, Boyuan Zhang wrote:
>
> Hi Jan, thanks for the quick followup!
>
> I'm not sure if I see the difference between writing a "native" SDF for
>> PubSub source and the UnboundedSource wrapper. With regards to the relation
>> between reader and checkpoint, wouldn't the native implementation be at the
>> same position?
>
> Several changes could be made into PubSub SDF implementation specially.
> For example, the PuSub SDF can choose not respond to the checkpoint request
> when it thinks it's not a good time to do so. Besides, if the expensive
> connection can be binded to the lifecycle of the SDF instance instead of
> per restriction, the PubSub SDF implementation can choose to start the
> connection when the DoFn is started and close the connection when tearDown
> is called.
>
> We might not be able to do so on SDF wrapper since it's a kind of general
> solution for all sources and not all sources don't have the connection
> binded to the restriction.
>
> Another workaround for using PubSub on DirectRunner might be using
> --experiments=enable_custom_pubsub_source, This flag will make the pipeline
> to use a DoFn to read from PubSub instead of using UnboundedSource. Hope it
> will be helpful as well.
>
>
> On Thu, Dec 17, 2020 at 1:09 PM Jan Lukavský  wrote:
>
>> Hi Boyuan,
>>
>> I'm not sure if I see the difference between writing a "native" SDF for
>> PubSub source and the UnboundedSource wrapper. With regards to the relation
>> between reader and checkpoint, wouldn't the native implementation be at the
>> same position?
>>
>> In my point of view, the decision to close the reader is simply a matter
>> of lifecycle of the reader. Currently, it is tightly bound to the
>> restriction being processed, but that could be relaxed, so that instead of
>> immediately closing the reader, it could be only _scheduled for closing in
>> future_ (using processing time timer for instance) provided it is not
>> reused in the remaining restriction after split (by the same instance of
>> DoFn). That is an optimization that could really make sense outside
>> DirectRunner, because for instance Flink has use cases, where user really
>> *wants* to configure quite often checkpoints (has relation to how Flink
>> implements @RequiresStableInput).
>>
>> Jan
>> On 12/17/20 9:04 PM, Boyuan Zhang wrote:
>>
>> Sorry for the confusion.
>>
>>
>>>  Are you saying it *is* necessary to close the reader on checkpoint, so
>>> 

Re: Usability regression using SDF Unbounded Source wrapper + DirectRunner

2020-12-17 Thread Boyuan Zhang
Hi Jan, thanks for the quick followup!

I'm not sure if I see the difference between writing a "native" SDF for
> PubSub source and the UnboundedSource wrapper. With regards to the relation
> between reader and checkpoint, wouldn't the native implementation be at the
> same position?

Several changes could be made into PubSub SDF implementation specially. For
example, the PuSub SDF can choose not respond to the checkpoint request
when it thinks it's not a good time to do so. Besides, if the expensive
connection can be binded to the lifecycle of the SDF instance instead of
per restriction, the PubSub SDF implementation can choose to start the
connection when the DoFn is started and close the connection when tearDown
is called.

We might not be able to do so on SDF wrapper since it's a kind of general
solution for all sources and not all sources don't have the connection
binded to the restriction.

Another workaround for using PubSub on DirectRunner might be using
--experiments=enable_custom_pubsub_source, This flag will make the pipeline
to use a DoFn to read from PubSub instead of using UnboundedSource. Hope it
will be helpful as well.


On Thu, Dec 17, 2020 at 1:09 PM Jan Lukavský  wrote:

> Hi Boyuan,
>
> I'm not sure if I see the difference between writing a "native" SDF for
> PubSub source and the UnboundedSource wrapper. With regards to the relation
> between reader and checkpoint, wouldn't the native implementation be at the
> same position?
>
> In my point of view, the decision to close the reader is simply a matter
> of lifecycle of the reader. Currently, it is tightly bound to the
> restriction being processed, but that could be relaxed, so that instead of
> immediately closing the reader, it could be only _scheduled for closing in
> future_ (using processing time timer for instance) provided it is not
> reused in the remaining restriction after split (by the same instance of
> DoFn). That is an optimization that could really make sense outside
> DirectRunner, because for instance Flink has use cases, where user really
> *wants* to configure quite often checkpoints (has relation to how Flink
> implements @RequiresStableInput).
>
> Jan
> On 12/17/20 9:04 PM, Boyuan Zhang wrote:
>
> Sorry for the confusion.
>
>
>>  Are you saying it *is* necessary to close the reader on checkpoint, so
>> the only solution is to reduce checkpoint frequency?
>
> In the PubSub on DirectRunner with SDF wrapper case, my answer is yes
> based on my understanding.
> Closing the reader during checkpoint is the implementation details of how
> the SDF wrapper wraps the Unbounded/Bounded source. It's not controlled by
> the DirectRunner and the only thing DirectRunner can control is the
> frequency of checkpoint, which is hardcoded now. And closing the reader is
> the right behavior since the work could be distributed to another instance
> in the real world.
>
> The ideal solution would be to offer a way to make the frequency
> configurable, most possibly via PipelineOptions. Or we turn the current
> PubSub UnboundedSource(and other source) implementation into SDF. IIUC, the
> SDF wrapper is a migration phase of Unbounded/Bounded source to SDF.
> Eventually we should have every source in SDF.
>
> On Thu, Dec 17, 2020 at 11:49 AM Brian Hulette 
> wrote:
>
>> Boyuan your suggestion seems at odds with Jan's. Are you saying it *is*
>> necessary to close the reader on checkpoint, so the only solution is to
>> reduce checkpoint frequency?
>>
>> On Thu, Dec 17, 2020 at 10:39 AM Boyuan Zhang  wrote:
>>
>>> Thanks for your investigation, Steve! It seems like preventing the
>>> checkpoint from happening so frequently would be one workaround for you.
>>> Making the checkpoint frequency configurable from pipeline option seems
>>> like the way to go.
>>>
>>> On Thu, Dec 17, 2020 at 7:35 AM Jan Lukavský  wrote:
>>>
>>>> Hi Steve,
>>>>
>>>> I didn't mean we should deliberately make DirectRunner slow, or we
>>>> should not fix performance issues, if can be fixed. What I meant was that
>>>> if we are to choose between short checkpoint time (and few elements
>>>> processed before checkpoint is taken) or performance, we should prefer
>>>> better tested code, in this particular case.
>>>>
>>>> > After a bunch of debugging, I think I finally figured out what the
>>>> problem is though.  During a checkpoint (in trySplit), the
>>>> UnboundedSourceViaSDF wrapper will close the current source reader and
>>>> create a new one.
>>>>
>>>> That is actually a great example. The problem should be fixed there
>>&g

Re: Usability regression using SDF Unbounded Source wrapper + DirectRunner

2020-12-17 Thread Boyuan Zhang
Sorry for the confusion.


>  Are you saying it *is* necessary to close the reader on checkpoint, so
> the only solution is to reduce checkpoint frequency?

In the PubSub on DirectRunner with SDF wrapper case, my answer is yes based
on my understanding.
Closing the reader during checkpoint is the implementation details of how
the SDF wrapper wraps the Unbounded/Bounded source. It's not controlled by
the DirectRunner and the only thing DirectRunner can control is the
frequency of checkpoint, which is hardcoded now. And closing the reader is
the right behavior since the work could be distributed to another instance
in the real world.

The ideal solution would be to offer a way to make the frequency
configurable, most possibly via PipelineOptions. Or we turn the current
PubSub UnboundedSource(and other source) implementation into SDF. IIUC, the
SDF wrapper is a migration phase of Unbounded/Bounded source to SDF.
Eventually we should have every source in SDF.

On Thu, Dec 17, 2020 at 11:49 AM Brian Hulette  wrote:

> Boyuan your suggestion seems at odds with Jan's. Are you saying it *is*
> necessary to close the reader on checkpoint, so the only solution is to
> reduce checkpoint frequency?
>
> On Thu, Dec 17, 2020 at 10:39 AM Boyuan Zhang  wrote:
>
>> Thanks for your investigation, Steve! It seems like preventing the
>> checkpoint from happening so frequently would be one workaround for you.
>> Making the checkpoint frequency configurable from pipeline option seems
>> like the way to go.
>>
>> On Thu, Dec 17, 2020 at 7:35 AM Jan Lukavský  wrote:
>>
>>> Hi Steve,
>>>
>>> I didn't mean we should deliberately make DirectRunner slow, or we
>>> should not fix performance issues, if can be fixed. What I meant was that
>>> if we are to choose between short checkpoint time (and few elements
>>> processed before checkpoint is taken) or performance, we should prefer
>>> better tested code, in this particular case.
>>>
>>> > After a bunch of debugging, I think I finally figured out what the
>>> problem is though.  During a checkpoint (in trySplit), the
>>> UnboundedSourceViaSDF wrapper will close the current source reader and
>>> create a new one.
>>>
>>> That is actually a great example. The problem should be fixed there (the
>>> reader probably need not to be closed on checkpoint). And it is
>>> DirectRunner that manifested this, due to short checkpointing.
>>>
>>> Jan
>>> On 12/17/20 4:14 PM, Steve Niemitz wrote:
>>>
>>> > Primary purpose of DirectRunner is testing, not performance
>>>
>>> That's one argument, but it's very difficult to effectively test a
>>> pipeline when I need to wait 15+ minutes for the first element to go
>>> through it.  I also, disagree in general that we shouldn't care about the
>>> performance of the DirectRunner.  It's likely the first runner new users of
>>> beam try (I know it was for us), and if it doesn't provide enough
>>> performance to actually run a representative pipeline, users may
>>> extrapolate that performance onto other runners (I know we did).
>>> Anecdotally, the fact that the DirectRunner didn't work for some of our
>>> initial test pipelines (because of performance problems) probably delayed
>>> our adoption of beam by at least 6 months.
>>>
>>> > Steve, based on your findings, it seems like it takes more time for
>>> the SDF pipeline to actually start to read from PubSub and more time to
>>> output records.
>>>
>>> Pubsub reads start ~instantly. but I'm not able to see any elements
>>> actually output from it for a LONG time, sometimes 30+ minutes.  I see the
>>> reader acking back to pubsub, so it IS committing, but no elements output.
>>>
>>> After a bunch of debugging, I think I finally figured out what the
>>> problem is though.  During a checkpoint (in trySplit), the
>>> UnboundedSourceViaSDF wrapper will close the current source reader and
>>> create a new one.  The problem is, the pubsub reader needs some time to
>>> correctly estimate it's watermark [1], and because it gets closed and
>>> recreated so frequently due to checkpointing (either number of elements, or
>>> duration), it can never actually provide accurate estimates, and always
>>> returns the min watermark.  This seems like it prevents some internal
>>> timers from ever firing, effectively holding all elements in the pipeline
>>> state.  I can confirm this also by looking at WatermarkManager, where I see
>>> all the bundles pending.
>>>
>>>

Re: Usability regression using SDF Unbounded Source wrapper + DirectRunner

2020-12-17 Thread Boyuan Zhang
y every Beam user relies on the direct
>> > runners so every regression or improvement on it affects everyone, but
>> > well that's a subject worth its own thread.
>> >
>> > On Thu, Dec 17, 2020 at 10:55 AM Jan Lukavský  wrote:
>> >> Hi,
>> >>
>> >> from my point of view the number in DirectRunner are set correctly.
>> Primary purpose of DirectRunner is testing, not performance, so
>> DirectRunner makes intentionally frequent checkpoints to easily exercise
>> potential bugs in user code. It might be possible to make the frequency
>> configurable, though.
>> >>
>> >> Jan
>> >>
>> >> On 12/17/20 12:20 AM, Boyuan Zhang wrote:
>> >>
>> >> It's not a portable execution on DirectRunner so I would expect that
>> outputs from OutputAndTimeBoundedSplittableProcessElementInvoker should be
>> emitted immediately. For SDF execution on DirectRunner, the overhead could
>> come from the SDF expansion, SDF wrapper and the invoker.
>> >>
>> >> Steve, based on your findings, it seems like it takes more time for
>> the SDF pipeline to actually start to read from PubSub and more time to
>> output records. Are you able to tell how much time each part is taking?
>> >>
>> >> On Wed, Dec 16, 2020 at 1:53 PM Robert Bradshaw 
>> wrote:
>> >>> If all it takes is bumping these numbers up a bit, that seems like a
>> reasonable thing to do ASAP. (I would argue that perhaps they shouldn't be
>> static, e.g. it might be preferable to start emitting results right away,
>> but use larger batches for the steady state if there are performance
>> benefits.)
>> >>>
>> >>> That being said, it sounds like there's something deeper going on
>> here. We should also verify that this performance impact is limited to the
>> direct runner.
>> >>>
>> >>> On Wed, Dec 16, 2020 at 1:36 PM Steve Niemitz 
>> wrote:
>> >>>> I tried changing my build locally to 10 seconds and 10,000 elements
>> but it didn't seem to make much of a difference, it still takes a few
>> minutes for elements to begin actually showing up to downstream stages from
>> the Pubsub read.  I can see elements being emitted from
>> OutputAndTimeBoundedSplittableProcessElementInvoker, and bundles being
>> committed by ParDoEvaluator.finishBundle, but after that, they seem to just
>> kind of disappear somewhere.
>> >>>>
>> >>>> On Wed, Dec 16, 2020 at 4:18 PM Boyuan Zhang 
>> wrote:
>> >>>>> Making it as the PipelineOptions was my another proposal but it
>> might take some time to do so. On the other hand, tuning the number into
>> something acceptable is low-hanging fruit.
>> >>>>>
>> >>>>> On Wed, Dec 16, 2020 at 12:48 PM Ismaël Mejía 
>> wrote:
>> >>>>>> It sounds reasonable. I am wondering also on the consequence of
>> these
>> >>>>>> parameters for other runners (where it is every 10 seconds or 1
>> >>>>>> elements) + their own configuration e.g. checkpointInterval,
>> >>>>>> checkpointTimeoutMillis and minPauseBetweenCheckpoints for Flink.
>> It
>> >>>>>> is not clear for me what would be chosen now in this case.
>> >>>>>>
>> >>>>>> I know we are a bit anti knobs but maybe it makes sense to make
>> this
>> >>>>>> configurable via PipelineOptions at least for Direct runner.
>> >>>>>>
>> >>>>>> On Wed, Dec 16, 2020 at 7:29 PM Boyuan Zhang 
>> wrote:
>> >>>>>>> I agree, Ismael.
>> >>>>>>>
>> >>>>>>>  From my current investigation, the performance overhead should
>> majorly come from the frequency of checkpoint in
>> OutputAndTimeBoundedSplittableProcessElementinvoker[1], which is hardcoded
>> in the DirectRunner(every 1 seconds or 100 elements)[2]. I believe
>> configuring these numbers on DirectRunner should improve reported cases so
>> far. My last proposal was to change the number to every 5 seconds or 1
>> elements. What do you think?
>> >>>>>>>
>> >>>>>>> [1]
>> https://github.com/apache/beam/blob/master/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java
>> >>>>>>> [2]
>> https://github.com/apache/beam/blob/3b

Re: Usability regression using SDF Unbounded Source wrapper + DirectRunner

2020-12-16 Thread Boyuan Zhang
It's not a portable execution on DirectRunner so I would expect that
outputs from OutputAndTimeBoundedSplittableProcessElementInvoker should be
emitted immediately. For SDF execution on DirectRunner, the overhead could
come from the SDF expansion, SDF wrapper and the invoker.

Steve, based on your findings, it seems like it takes more time for the SDF
pipeline to actually start to read from PubSub and more time to output
records. Are you able to tell how much time each part is taking?

On Wed, Dec 16, 2020 at 1:53 PM Robert Bradshaw  wrote:

> If all it takes is bumping these numbers up a bit, that seems like a
> reasonable thing to do ASAP. (I would argue that perhaps they shouldn't be
> static, e.g. it might be preferable to start emitting results right away,
> but use larger batches for the steady state if there are performance
> benefits.)
>
> That being said, it sounds like there's something deeper going on here. We
> should also verify that this performance impact is limited to the direct
> runner.
>
> On Wed, Dec 16, 2020 at 1:36 PM Steve Niemitz  wrote:
>
>> I tried changing my build locally to 10 seconds and 10,000 elements but
>> it didn't seem to make much of a difference, it still takes a few minutes
>> for elements to begin actually showing up to downstream stages from the
>> Pubsub read.  I can see elements being emitted
>> from OutputAndTimeBoundedSplittableProcessElementInvoker, and bundles being
>> committed by ParDoEvaluator.finishBundle, but after that, they seem to just
>> kind of disappear somewhere.
>>
>> On Wed, Dec 16, 2020 at 4:18 PM Boyuan Zhang  wrote:
>>
>>> Making it as the PipelineOptions was my another proposal but it might
>>> take some time to do so. On the other hand, tuning the number into
>>> something acceptable is low-hanging fruit.
>>>
>>> On Wed, Dec 16, 2020 at 12:48 PM Ismaël Mejía  wrote:
>>>
>>>> It sounds reasonable. I am wondering also on the consequence of these
>>>> parameters for other runners (where it is every 10 seconds or 1
>>>> elements) + their own configuration e.g. checkpointInterval,
>>>> checkpointTimeoutMillis and minPauseBetweenCheckpoints for Flink. It
>>>> is not clear for me what would be chosen now in this case.
>>>>
>>>> I know we are a bit anti knobs but maybe it makes sense to make this
>>>> configurable via PipelineOptions at least for Direct runner.
>>>>
>>>> On Wed, Dec 16, 2020 at 7:29 PM Boyuan Zhang 
>>>> wrote:
>>>> >
>>>> > I agree, Ismael.
>>>> >
>>>> > From my current investigation, the performance overhead should
>>>> majorly come from the frequency of checkpoint in
>>>> OutputAndTimeBoundedSplittableProcessElementinvoker[1], which is hardcoded
>>>> in the DirectRunner(every 1 seconds or 100 elements)[2]. I believe
>>>> configuring these numbers on DirectRunner should improve reported cases so
>>>> far. My last proposal was to change the number to every 5 seconds or 1
>>>> elements. What do you think?
>>>> >
>>>> > [1]
>>>> https://github.com/apache/beam/blob/master/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java
>>>> > [2]
>>>> https://github.com/apache/beam/blob/3bb232fb098700de408f574585dfe74bbaff7230/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java#L178-L181
>>>> >
>>>> > On Wed, Dec 16, 2020 at 9:02 AM Ismaël Mejía 
>>>> wrote:
>>>> >>
>>>> >> I can guess that the same issues mentioned here probably will affect
>>>> >> the usability for people trying Beam's interactive SQL on Unbounded
>>>> IO
>>>> >> too.
>>>> >>
>>>> >> We should really take into account that the performance of the SDF
>>>> >> based path should be as good or better than the previous version
>>>> >> before considering its removal (--experiments=use_deprecated_read)
>>>> and
>>>> >> probably have consensus when this happens.
>>>> >>
>>>> >>
>>>> >> On Fri, Dec 11, 2020 at 11:33 PM Boyuan Zhang 
>>>> wrote:
>>>> >> >
>>>> >> > > From what I've seen, the direct runner initiates a checkpoint
>>>> after every element output.
>>>> >> > That seems like the 1 second 

Re: Usability regression using SDF Unbounded Source wrapper + DirectRunner

2020-12-16 Thread Boyuan Zhang
Making it as the PipelineOptions was my another proposal but it might take
some time to do so. On the other hand, tuning the number into
something acceptable is low-hanging fruit.

On Wed, Dec 16, 2020 at 12:48 PM Ismaël Mejía  wrote:

> It sounds reasonable. I am wondering also on the consequence of these
> parameters for other runners (where it is every 10 seconds or 1
> elements) + their own configuration e.g. checkpointInterval,
> checkpointTimeoutMillis and minPauseBetweenCheckpoints for Flink. It
> is not clear for me what would be chosen now in this case.
>
> I know we are a bit anti knobs but maybe it makes sense to make this
> configurable via PipelineOptions at least for Direct runner.
>
> On Wed, Dec 16, 2020 at 7:29 PM Boyuan Zhang  wrote:
> >
> > I agree, Ismael.
> >
> > From my current investigation, the performance overhead should majorly
> come from the frequency of checkpoint in
> OutputAndTimeBoundedSplittableProcessElementinvoker[1], which is hardcoded
> in the DirectRunner(every 1 seconds or 100 elements)[2]. I believe
> configuring these numbers on DirectRunner should improve reported cases so
> far. My last proposal was to change the number to every 5 seconds or 1
> elements. What do you think?
> >
> > [1]
> https://github.com/apache/beam/blob/master/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java
> > [2]
> https://github.com/apache/beam/blob/3bb232fb098700de408f574585dfe74bbaff7230/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java#L178-L181
> >
> > On Wed, Dec 16, 2020 at 9:02 AM Ismaël Mejía  wrote:
> >>
> >> I can guess that the same issues mentioned here probably will affect
> >> the usability for people trying Beam's interactive SQL on Unbounded IO
> >> too.
> >>
> >> We should really take into account that the performance of the SDF
> >> based path should be as good or better than the previous version
> >> before considering its removal (--experiments=use_deprecated_read) and
> >> probably have consensus when this happens.
> >>
> >>
> >> On Fri, Dec 11, 2020 at 11:33 PM Boyuan Zhang 
> wrote:
> >> >
> >> > > From what I've seen, the direct runner initiates a checkpoint after
> every element output.
> >> > That seems like the 1 second limit kicks in before the output reaches
> 100 elements.
> >> >
> >> > I think the original purpose for DirectRunner to use a small limit on
> issuing checkpoint requests is for exercising SDF better in a small data
> set. But it brings overhead on a larger set owing to too many checkpoints.
> It would be ideal to make this limit configurable from pipeline but the
> easiest approach is that we figure out a number for most common cases. Do
> you think we raise the limit to 1000 elements or every 5 seconds will help?
> >> >
> >> > On Fri, Dec 11, 2020 at 2:22 PM Steve Niemitz 
> wrote:
> >> >>
> >> >> From what I've seen, the direct runner initiates a checkpoint after
> every element output.
> >> >>
> >> >> On Fri, Dec 11, 2020 at 5:19 PM Boyuan Zhang 
> wrote:
> >> >>>
> >> >>> Hi Antonio,
> >> >>>
> >> >>> Thanks for the details! Which version of Beam SDK are you using?
> And are you using --experiments=beam_fn_api with DirectRunner to launch
> your pipeline?
> >> >>>
> >> >>> For ReadFromKafkaDoFn.processElement(), it will take a Kafka
> topic+partition as input element and a KafkaConsumer will be assigned to
> this topic+partition then poll records continuously. The Kafka consumer
> will resume reading and return from the process fn when
> >> >>>
> >> >>> There are no available records currently(this is a feature of SDF
> which calls SDF self-initiated checkpoint)
> >> >>> The OutputAndTimeBoundedSplittableProcessElementInvoker issues
> checkpoint request to ReadFromKafkaDoFn for getting partial results. The
> checkpoint frequency for DirectRunner is every 100 output records or every
> 1 seconds.
> >> >>>
> >> >>> It seems like either the self-initiated checkpoint or DirectRunner
> issued checkpoint gives you the performance regression since there is
> overhead when rescheduling residuals. In your case, it's more like that the
> checkpoint behavior of OutputAndTimeBoundedSplittableProcessElementInvoker
> gives you 200 elements a batch. I want to understand what kind of
> performanc

Re: Usability regression using SDF Unbounded Source wrapper + DirectRunner

2020-12-16 Thread Boyuan Zhang
I agree, Ismael.

>From my current investigation, the performance overhead should majorly come
from the frequency of checkpoint in
OutputAndTimeBoundedSplittableProcessElementinvoker[1], which is hardcoded
in the DirectRunner(every 1 seconds or 100 elements)[2]. I believe
configuring these numbers on DirectRunner should improve reported cases so
far. My last proposal was to change the number to every 5 seconds or 1
elements. What do you think?

[1]
https://github.com/apache/beam/blob/master/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java
[2]
https://github.com/apache/beam/blob/3bb232fb098700de408f574585dfe74bbaff7230/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java#L178-L181

On Wed, Dec 16, 2020 at 9:02 AM Ismaël Mejía  wrote:

> I can guess that the same issues mentioned here probably will affect
> the usability for people trying Beam's interactive SQL on Unbounded IO
> too.
>
> We should really take into account that the performance of the SDF
> based path should be as good or better than the previous version
> before considering its removal (--experiments=use_deprecated_read) and
> probably have consensus when this happens.
>
>
> On Fri, Dec 11, 2020 at 11:33 PM Boyuan Zhang  wrote:
> >
> > > From what I've seen, the direct runner initiates a checkpoint after
> every element output.
> > That seems like the 1 second limit kicks in before the output reaches
> 100 elements.
> >
> > I think the original purpose for DirectRunner to use a small limit on
> issuing checkpoint requests is for exercising SDF better in a small data
> set. But it brings overhead on a larger set owing to too many checkpoints.
> It would be ideal to make this limit configurable from pipeline but the
> easiest approach is that we figure out a number for most common cases. Do
> you think we raise the limit to 1000 elements or every 5 seconds will help?
> >
> > On Fri, Dec 11, 2020 at 2:22 PM Steve Niemitz 
> wrote:
> >>
> >> From what I've seen, the direct runner initiates a checkpoint after
> every element output.
> >>
> >> On Fri, Dec 11, 2020 at 5:19 PM Boyuan Zhang 
> wrote:
> >>>
> >>> Hi Antonio,
> >>>
> >>> Thanks for the details! Which version of Beam SDK are you using? And
> are you using --experiments=beam_fn_api with DirectRunner to launch your
> pipeline?
> >>>
> >>> For ReadFromKafkaDoFn.processElement(), it will take a Kafka
> topic+partition as input element and a KafkaConsumer will be assigned to
> this topic+partition then poll records continuously. The Kafka consumer
> will resume reading and return from the process fn when
> >>>
> >>> There are no available records currently(this is a feature of SDF
> which calls SDF self-initiated checkpoint)
> >>> The OutputAndTimeBoundedSplittableProcessElementInvoker issues
> checkpoint request to ReadFromKafkaDoFn for getting partial results. The
> checkpoint frequency for DirectRunner is every 100 output records or every
> 1 seconds.
> >>>
> >>> It seems like either the self-initiated checkpoint or DirectRunner
> issued checkpoint gives you the performance regression since there is
> overhead when rescheduling residuals. In your case, it's more like that the
> checkpoint behavior of OutputAndTimeBoundedSplittableProcessElementInvoker
> gives you 200 elements a batch. I want to understand what kind of
> performance regression you are noticing? Is it slower to output the same
> amount of records?
> >>>
> >>> On Fri, Dec 11, 2020 at 1:31 PM Antonio Si 
> wrote:
> >>>>
> >>>> Hi Boyuan,
> >>>>
> >>>> This is Antonio. I reported the KafkaIO.read() performance issue on
> the slack channel a few days ago.
> >>>>
> >>>> I am not sure if this is helpful, but I have been doing some
> debugging on the SDK KafkaIO performance issue for our pipeline and I would
> like to provide some observations.
> >>>>
> >>>> It looks like in my case the ReadFromKafkaDoFn.processElement()  was
> invoked within the same thread and every time kafaconsumer.poll() is
> called, it returns some records, from 1 up to 200 records. So, it will
> proceed to run the pipeline steps. Each kafkaconsumer.poll() takes about
> 0.8ms. So, in this case, the polling and running of the pipeline are
> executed sequentially within a single thread. So, after processing a batch
> of records, it will need to wait for 0.8ms before it can process the next
> batch of records again.
> >>>

Re: Usability regression using SDF Unbounded Source wrapper + DirectRunner

2020-12-11 Thread Boyuan Zhang
> From what I've seen, the direct runner initiates a checkpoint after every
element output.
That seems like the 1 second limit kicks in before the output reaches 100
elements.

I think the original purpose for DirectRunner to use a small limit on
issuing checkpoint requests is for exercising SDF better in a small data
set. But it brings overhead on a larger set owing to too many checkpoints.
It would be ideal to make this limit configurable from pipeline but the
easiest approach is that we figure out a number for most common cases. Do
you think we raise the limit to 1000 elements or every 5 seconds will help?

On Fri, Dec 11, 2020 at 2:22 PM Steve Niemitz  wrote:

> From what I've seen, the direct runner initiates a checkpoint after every
> element output.
>
> On Fri, Dec 11, 2020 at 5:19 PM Boyuan Zhang  wrote:
>
>> Hi Antonio,
>>
>> Thanks for the details! Which version of Beam SDK are you using? And are
>> you using --experiments=beam_fn_api with DirectRunner to launch your
>> pipeline?
>>
>> For ReadFromKafkaDoFn.processElement(), it will take a Kafka
>> topic+partition as input element and a KafkaConsumer will be assigned to
>> this topic+partition then poll records continuously. The Kafka consumer
>> will resume reading and return from the process fn when
>>
>>- There are no available records currently(this is a feature of SDF
>>which calls SDF self-initiated checkpoint)
>>- The OutputAndTimeBoundedSplittableProcessElementInvoker issues
>>checkpoint request to ReadFromKafkaDoFn for getting partial results. The
>>checkpoint frequency for DirectRunner is every 100 output records or every
>>1 seconds.
>>
>> It seems like either the self-initiated checkpoint or DirectRunner issued
>> checkpoint gives you the performance regression since there is overhead
>> when rescheduling residuals. In your case, it's more like that the
>> checkpoint behavior of OutputAndTimeBoundedSplittableProcessElementInvoker
>> gives you 200 elements a batch. I want to understand what kind of
>> performance regression you are noticing? Is it slower to output the same
>> amount of records?
>>
>> On Fri, Dec 11, 2020 at 1:31 PM Antonio Si  wrote:
>>
>>> Hi Boyuan,
>>>
>>> This is Antonio. I reported the KafkaIO.read() performance issue on the
>>> slack channel a few days ago.
>>>
>>> I am not sure if this is helpful, but I have been doing some debugging
>>> on the SDK KafkaIO performance issue for our pipeline and I would like to
>>> provide some observations.
>>>
>>> It looks like in my case the ReadFromKafkaDoFn.processElement()  was
>>> invoked within the same thread and every time kafaconsumer.poll() is
>>> called, it returns some records, from 1 up to 200 records. So, it will
>>> proceed to run the pipeline steps. Each kafkaconsumer.poll() takes about
>>> 0.8ms. So, in this case, the polling and running of the pipeline are
>>> executed sequentially within a single thread. So, after processing a batch
>>> of records, it will need to wait for 0.8ms before it can process the next
>>> batch of records again.
>>>
>>> Any suggestions would be appreciated.
>>>
>>> Hope that helps.
>>>
>>> Thanks and regards,
>>>
>>> Antonio.
>>>
>>> On 2020/12/04 19:17:46, Boyuan Zhang  wrote:
>>> > Opened https://issues.apache.org/jira/browse/BEAM-11403 for tracking.
>>> >
>>> > On Fri, Dec 4, 2020 at 10:52 AM Boyuan Zhang 
>>> wrote:
>>> >
>>> > > Thanks for the pointer, Steve! I'll check it out. The execution
>>> paths for
>>> > > UnboundedSource and SDF wrapper are different. It's highly possible
>>> that
>>> > > the regression either comes from the invocation path for SDF
>>> wrapper, or
>>> > > the implementation of SDF wrapper itself.
>>> > >
>>> > > On Fri, Dec 4, 2020 at 6:33 AM Steve Niemitz 
>>> wrote:
>>> > >
>>> > >> Coincidentally, someone else in the ASF slack mentioned [1]
>>> yesterday
>>> > >> that they were seeing significantly reduced performance using
>>> KafkaIO.Read
>>> > >> w/ the SDF wrapper vs the unbounded source.  They mentioned they
>>> were using
>>> > >> flink 1.9.
>>> > >>
>>> > >> https://the-asf.slack.com/archives/C9H0YNP3P/p1607057900393900
>>> > >>
>>> > >> On Thu, Dec 3, 2020 at 1:56 P

Re: Usability regression using SDF Unbounded Source wrapper + DirectRunner

2020-12-11 Thread Boyuan Zhang
Hi Antonio,

Thanks for the details! Which version of Beam SDK are you using? And are
you using --experiments=beam_fn_api with DirectRunner to launch your
pipeline?

For ReadFromKafkaDoFn.processElement(), it will take a Kafka
topic+partition as input element and a KafkaConsumer will be assigned to
this topic+partition then poll records continuously. The Kafka consumer
will resume reading and return from the process fn when

   - There are no available records currently(this is a feature of SDF
   which calls SDF self-initiated checkpoint)
   - The OutputAndTimeBoundedSplittableProcessElementInvoker issues
   checkpoint request to ReadFromKafkaDoFn for getting partial results. The
   checkpoint frequency for DirectRunner is every 100 output records or every
   1 seconds.

It seems like either the self-initiated checkpoint or DirectRunner issued
checkpoint gives you the performance regression since there is overhead
when rescheduling residuals. In your case, it's more like that the
checkpoint behavior of OutputAndTimeBoundedSplittableProcessElementInvoker
gives you 200 elements a batch. I want to understand what kind of
performance regression you are noticing? Is it slower to output the same
amount of records?

On Fri, Dec 11, 2020 at 1:31 PM Antonio Si  wrote:

> Hi Boyuan,
>
> This is Antonio. I reported the KafkaIO.read() performance issue on the
> slack channel a few days ago.
>
> I am not sure if this is helpful, but I have been doing some debugging on
> the SDK KafkaIO performance issue for our pipeline and I would like to
> provide some observations.
>
> It looks like in my case the ReadFromKafkaDoFn.processElement()  was
> invoked within the same thread and every time kafaconsumer.poll() is
> called, it returns some records, from 1 up to 200 records. So, it will
> proceed to run the pipeline steps. Each kafkaconsumer.poll() takes about
> 0.8ms. So, in this case, the polling and running of the pipeline are
> executed sequentially within a single thread. So, after processing a batch
> of records, it will need to wait for 0.8ms before it can process the next
> batch of records again.
>
> Any suggestions would be appreciated.
>
> Hope that helps.
>
> Thanks and regards,
>
> Antonio.
>
> On 2020/12/04 19:17:46, Boyuan Zhang  wrote:
> > Opened https://issues.apache.org/jira/browse/BEAM-11403 for tracking.
> >
> > On Fri, Dec 4, 2020 at 10:52 AM Boyuan Zhang  wrote:
> >
> > > Thanks for the pointer, Steve! I'll check it out. The execution paths
> for
> > > UnboundedSource and SDF wrapper are different. It's highly possible
> that
> > > the regression either comes from the invocation path for SDF wrapper,
> or
> > > the implementation of SDF wrapper itself.
> > >
> > > On Fri, Dec 4, 2020 at 6:33 AM Steve Niemitz 
> wrote:
> > >
> > >> Coincidentally, someone else in the ASF slack mentioned [1] yesterday
> > >> that they were seeing significantly reduced performance using
> KafkaIO.Read
> > >> w/ the SDF wrapper vs the unbounded source.  They mentioned they were
> using
> > >> flink 1.9.
> > >>
> > >> https://the-asf.slack.com/archives/C9H0YNP3P/p1607057900393900
> > >>
> > >> On Thu, Dec 3, 2020 at 1:56 PM Boyuan Zhang 
> wrote:
> > >>
> > >>> Hi Steve,
> > >>>
> > >>> I think the major performance regression comes from
> > >>> OutputAndTimeBoundedSplittableProcessElementInvoker[1], which will
> > >>> checkpoint the DoFn based on time/output limit and use timers/state
> to
> > >>> reschedule works.
> > >>>
> > >>> [1]
> > >>>
> https://github.com/apache/beam/blob/master/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java
> > >>>
> > >>> On Thu, Dec 3, 2020 at 9:40 AM Steve Niemitz 
> > >>> wrote:
> > >>>
> > >>>> I have a pipeline that reads from pubsub, does some aggregation, and
> > >>>> writes to various places.  Previously, in older versions of beam,
> when
> > >>>> running this in the DirectRunner, messages would go through the
> pipeline
> > >>>> almost instantly, making it very easy to debug locally, etc.
> > >>>>
> > >>>> However, after upgrading to beam 2.25, I noticed that it could take
> on
> > >>>> the order of 5-10 minutes for messages to get from the pubsub read
> step to
> > >>>> the next step in the pipeline (deserializing them, etc).  The
> subscription
> > >>>> being read from has on the order of 100,000 elements/sec arriving
> in it.
> > >>>>
> > >>>> Setting --experiments=use_deprecated_read fixes it, and makes the
> > >>>> pipeline behave as it did before.
> > >>>>
> > >>>> It seems like the SDF implementation in the DirectRunner here is
> > >>>> causing some kind of issue, either buffering a very large amount of
> data
> > >>>> before emitting it in a bundle, or something else.  Has anyone else
> run
> > >>>> into this?
> > >>>>
> > >>>
> >
>


Re: Dynamic timers in python sdk.

2020-12-10 Thread Boyuan Zhang
Thanks for the explanation! That makes sense. We may also want to update
pydoc to state the usage explicitly since it's quite different from how
Java SDK does it.

On Thu, Dec 10, 2020 at 11:13 AM Robert Bradshaw 
wrote:

> On Thu, Dec 10, 2020 at 11:05 AM Boyuan Zhang  wrote:
>
>> We should also consider whether the default used in set() should be the
>>> empty string, or some value completely disjoint from any other string. (I'd
>>> lean towards the latter.)
>>
>> From API layer, the default value could be None for non-dynamic timer.
>> When translating to timer data message, None will be translated into empty
>> string.
>>
>
> Yes, but to keep them disjoint we'd want to translate the empty string
> into something else.
>
>
>> It seems like either timer.set(timestamp, dynamic_timer_tag=a_tag) or
>> set_dynamic means we will allow users to use one TimerSpec instance for
>> both static timer and dynamic timer.
>>
>
> Sure.
>
>
>> I'm wondering for registering on_timer callback, are we going to allow
>> users to register 2 callbacks(one for dynamic timer, one for static timer)
>> for single one TimerSpec instance, or just having one callback with
>> DynamicTagParam, where when DynamicTagParam is None means current firing
>> timer is a static one(otherwise, it's a dynamic one).
>>
>
> No, there'll still be a single callback. You can inspect the tag if you
> want.
>
> The view is that there aren't two types of timers--"static" timers are
> just dynamic timers with a fixed (aka static) tag (that we provide for you
> as a convenience).
>
> On Thu, Dec 10, 2020 at 10:43 AM Robert Bradshaw 
>> wrote:
>>
>>> Yep.
>>>
>>> A slight variant on this is to add separate set_dynamic and
>>> clear_dynamic methods, rather than letting set and clear take an optional
>>> argument, but I'm not sure I like that as much as the simple extension you
>>> proposed.
>>>
>>> We should also consider whether the default used in set() should be the
>>> empty string, or some value completely disjoint from any other string. (I'd
>>> lean towards the latter.)
>>>
>>> On Thu, Dec 10, 2020 at 10:23 AM Yichi Zhang  wrote:
>>>
>>>> Fair enough, I think we can also just extend existing timer API to
>>>> allow setting a dynamic timer tag field:
>>>>
>>>> timer.set(timestamp) -> timer.set(timestamp, dynamic_timer_tag=a_tag)
>>>> timer.clear() -> timer.clear(dynamic_timer_tag=a_tag)
>>>>
>>>> and have the default value of dynamic_timer_tag to be empty (the
>>>> special case)
>>>>
>>>> On Wed, Dec 9, 2020 at 5:12 PM Robert Bradshaw 
>>>> wrote:
>>>>
>>>>> On Wed, Dec 9, 2020 at 3:48 PM Kyle Weaver 
>>>>> wrote:
>>>>>
>>>>>> Possibly a dumb question, but: if "the static timer is just a special
>>>>>> case of the dynamic timer," why do we need to use different classes at 
>>>>>> all?
>>>>>>
>>>>>
>>>>> I agree, I would argue that there is little if any value to the user
>>>>> to distinguish between these two "types" of timers at all.
>>>>>
>>>>>
>>>>>> On Wed, Dec 9, 2020 at 2:30 PM Yichi Zhang  wrote:
>>>>>>
>>>>>>> Hi, Beam community,
>>>>>>>
>>>>>>> I'm trying to add the dynamic timer
>>>>>>> <https://issues.apache.org/jira/browse/BEAM-6857> support to the
>>>>>>> python sdk. In java sdk, a dynamic timer is specified through declaring 
>>>>>>> a
>>>>>>> TimerSpec of timerMap type and annotating it with @TimerFamily in
>>>>>>> process method parameter:
>>>>>>>
>>>>>>>   @TimerFamily("timers") private final TimerSpec timer =
>>>>>>> TimerSpecs.timerMap(TimeDomain.EVENT_TIME);
>>>>>>>
>>>>>>>   @ProcessElement public void process(
>>>>>>>   @Element KV element,
>>>>>>>   @Timestamp Instant elementTs,
>>>>>>>   @TimerFamily("timers") TimerMap timers) {
>>>>>>>  timers.set(element.getValue().getActionType(), elementTs);
>>>>>>>   }
>>>>>>>
>>>>>>>
>>>>>>&g

Re: Dynamic timers in python sdk.

2020-12-10 Thread Boyuan Zhang
>
> We should also consider whether the default used in set() should be the
> empty string, or some value completely disjoint from any other string. (I'd
> lean towards the latter.)

>From API layer, the default value could be None for non-dynamic timer. When
translating to timer data message, None will be translated into empty
string.

It seems like either timer.set(timestamp, dynamic_timer_tag=a_tag) or
set_dynamic means we will allow users to use one TimerSpec instance for
both static timer and dynamic timer. I'm wondering for registering on_timer
callback, are we going to allow users to register 2 callbacks(one for
dynamic timer, one for static timer) for single one TimerSpec instance, or
just having one callback with DynamicTagParam, where when DynamicTagParam
is None means current firing timer is a static one(otherwise, it's a
dynamic one).

On Thu, Dec 10, 2020 at 10:43 AM Robert Bradshaw 
wrote:

> Yep.
>
> A slight variant on this is to add separate set_dynamic and clear_dynamic
> methods, rather than letting set and clear take an optional argument, but
> I'm not sure I like that as much as the simple extension you proposed.
>
> We should also consider whether the default used in set() should be the
> empty string, or some value completely disjoint from any other string. (I'd
> lean towards the latter.)
>
> On Thu, Dec 10, 2020 at 10:23 AM Yichi Zhang  wrote:
>
>> Fair enough, I think we can also just extend existing timer API to allow
>> setting a dynamic timer tag field:
>>
>> timer.set(timestamp) -> timer.set(timestamp, dynamic_timer_tag=a_tag)
>> timer.clear() -> timer.clear(dynamic_timer_tag=a_tag)
>>
>> and have the default value of dynamic_timer_tag to be empty (the special
>> case)
>>
>> On Wed, Dec 9, 2020 at 5:12 PM Robert Bradshaw 
>> wrote:
>>
>>> On Wed, Dec 9, 2020 at 3:48 PM Kyle Weaver  wrote:
>>>
 Possibly a dumb question, but: if "the static timer is just a special
 case of the dynamic timer," why do we need to use different classes at all?

>>>
>>> I agree, I would argue that there is little if any value to the user
>>> to distinguish between these two "types" of timers at all.
>>>
>>>
 On Wed, Dec 9, 2020 at 2:30 PM Yichi Zhang  wrote:

> Hi, Beam community,
>
> I'm trying to add the dynamic timer
>  support to the
> python sdk. In java sdk, a dynamic timer is specified through declaring a
> TimerSpec of timerMap type and annotating it with @TimerFamily in
> process method parameter:
>
>   @TimerFamily("timers") private final TimerSpec timer =
> TimerSpecs.timerMap(TimeDomain.EVENT_TIME);
>
>   @ProcessElement public void process(
>   @Element KV element,
>   @Timestamp Instant elementTs,
>   @TimerFamily("timers") TimerMap timers) {
>  timers.set(element.getValue().getActionType(), elementTs);
>   }
>
>
> In python, I'm trying to figure out how we should differentiate a
> dynamic timer to the conventional static timer, at the timer spec
> declaration or process parameters annotation, or both:
>
> Approach 1: if we differentiate them only at timer spec declaration it
> looks like:
>
> class TimerDoFn(beam.DoFn):
>   timer_spec = userstate.TimerSpec('static_timer', 
> userstate.TimeDomain.WATERMARK)
>   timer_family_spec = userstate.TimerFamilySpec('dynamic_timer', 
> userstate.TimeDomain.WATERMARK)
>   def process(self,
>   element,
>   timer=beam.DoFn.TimerParam(timer_spec),
>   timer_map=beam.DoFn.TimerParam(timer_family_spec)):
>
>   @userstate.on_timer(timer_spec)
>   def process_timer(
>   self,
>   ts=beam.DoFn.TimestampParam,
>   timer=beam.DoFn.TimerParam(timer_spec)):
>
>   @userstate.on_timer(timer_family_spec)
>   def process_timer_map(
>   self,
>   ts=beam.DoFn.TimestampParam,
>   dynamic_tag=DoFn.DynamicTagParam,
>   timer_map=beam.DoFn.TimerParam(timer_family_spec))
>
> Approach 2: if only at parameter annotation:
>
> class TimerDoFn(beam.DoFn):
>   timer_spec = userstate.TimerSpec('static_timer', 
> userstate.TimeDomain.WATERMARK)
>   timer_family_spec = userstate.TimerSpec('dynamic_timer', 
> userstate.TimeDomain.WATERMARK)
>   def process(self,
>   element,
>   timer=beam.DoFn.TimerParam(timer_spec),
>   
> timer_map=beam.DoFn.TimerFamilyParam(timer_family_spec)):
>
>   @userstate.on_timer(timer_spec)
>   def process_timer(
>   self,
>   ts=beam.DoFn.TimestampParam,
>   timer=beam.DoFn.TimerParam(timer_spec)):
>
>   @userstate.on_timer(timer_family_spec)
>   def 

Re: Throttling stream outputs per trigger?

2020-12-08 Thread Boyuan Zhang
I think your understanding is correct. Does the CommitOffset transform have
side-effects on your pipeline?

On Tue, Dec 8, 2020 at 3:35 PM Vincent Marquez 
wrote:

>
> *~Vincent*
>
>
> On Tue, Dec 8, 2020 at 3:13 PM Boyuan Zhang  wrote:
>
>> Please note that each record output from ReadFromKafkaDoFn is in a
>> GlobalWindow. The workflow is:
>> ReadFromKafkaDoFn -> Reshuffle -> Window.into(FixedWindows) ->
>> Max.longsPerKey -> CommitDoFn
>>|
>>---> downstream consumers
>>
>> but won't there still be 5 commits that happen as fast as possible for
>>> each of the windows that were constructed from the initial fetch?
>>
>> I'm not sure what you mean here. Would you like to elaborate more on your
>> questions?
>>
>
> Sure, I'll try to explain, it's very possible I just am misunderstanding
> Windowing here.
>
> Assumption 1:  Windowing works on the output timestamp.
> Assumption 2:  Max.longsPerKey will fire as fast as it can, in other
> words, there is no throttling.
>
> So, if we have a topic that has the following msgs:
> msg | timestamp (mm,ss)
> ---
>A  |  01:00
>B  |  01:01
>D  |  06:00
>E  |  06:04
>F  |  12:02
>
> and we read them all at once, we will have one window that contains [A,B]
> and another one that has [D,E], and a third that has [F].  Once we get the
> max offset for all three, won't they fire back to back without delay? So F
> will fire as soon as E is finished committing, which fires immediately
> after B is committed?
>
>


Re: Throttling stream outputs per trigger?

2020-12-08 Thread Boyuan Zhang
Please note that each record output from ReadFromKafkaDoFn is in a
GlobalWindow. The workflow is:
ReadFromKafkaDoFn -> Reshuffle -> Window.into(FixedWindows) ->
Max.longsPerKey -> CommitDoFn
   |
   ---> downstream consumers

but won't there still be 5 commits that happen as fast as possible for each
> of the windows that were constructed from the initial fetch?

I'm not sure what you mean here. Would you like to elaborate more on your
questions?

On Tue, Dec 8, 2020 at 1:46 PM Vincent Marquez 
wrote:

>
> *~Vincent*
>
>
> On Tue, Dec 8, 2020 at 1:34 PM Boyuan Zhang  wrote:
>
>> Hi Vicent,
>>
>> Window.into(FixedWindows.of(Duration.standardMinutes(5))) operation just
>> applies the window information to each element, not really does the
>> grouping operation. And in the commit transform, there is a combine
>> transform applied(Max.longsPerKey()).
>> Window.into(FixedWindows.of(Duration.standardMinutes(5))) + Max.longsPerKey()
>> means to output 1 element per 5 mins. This is different from your case
>> since the trigger in the CommitTransform is for the combine purpose.
>> And in order to prevent the data loss error you mentioned, there is a
>> persistent layer(Reshuffle) between Kafka read and any downstream
>> transform.
>>
>>
> Apologies, I don't understand how the delay would work here though.  If we
> have a kafka topic that has 100 messages in it, each with a timestamp one
> minute apart, that means we have 20 windows that will be generated from one
> possible fetch, outputted by the ReadFromKafkaDoFn.   I understand the
> Max.longsPerKey() will take the max per window, but won't there still be 5
> commits that happen as fast as possible for each of the windows that were
> constructed from the initial fetch?
>
>
>
>
>
>
>
>
>> For your case, will the pipeline like KafkaRead -> Reshuffle ->
>> GroupIntoBatches -> downstream help you?
>>
>> On Tue, Dec 8, 2020 at 1:19 PM Vincent Marquez 
>> wrote:
>>
>>> If this is the case that the pipeline has no way of enforcing fixed time
>>> windows, how does this work:
>>>
>>>
>>> https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaCommitOffset.java#L126
>>>
>>> Isn't this supposed to only trigger every five minutes, regardless of
>>> how much data can immediately be grouped together in five minute windows?
>>> If there is a way to mark that the fixed window should only trigger every
>>> so many minutes, that would solve my use case.  If there isn't a way to do
>>> this, the Kafka offset code seems broken and could result in 'data loss' by
>>> improperly committing offsets before they are run through the rest of the
>>> pipeline?
>>>
>>> *~Vincent*
>>>
>>>
>>> On Fri, Oct 16, 2020 at 4:17 AM Maximilian Michels 
>>> wrote:
>>>
>>>> > the downstream consumer has these requirements.
>>>>
>>>> Blocking should normally be avoided at all cost, but if the downstream
>>>> operator has the requirement to only emit a fixed number of messages
>>>> per
>>>> second, it should enforce this, i.e. block once the maximum number of
>>>> messages for a time period have been reached. This will automatically
>>>> lead to backpressure in Runners like Flink or Dataflow.
>>>>
>>>> -Max
>>>>
>>>> On 07.10.20 18:30, Luke Cwik wrote:
>>>> > SplittableDoFns apply to both batch and streaming pipelines. They are
>>>> > allowed to produce an unbounded amount of data and can either self
>>>> > checkpoint saying they want to resume later or the runner will ask
>>>> them
>>>> > to checkpoint via a split call.
>>>> >
>>>> > There hasn't been anything concrete on backpressure, there has been
>>>> work
>>>> > done about exposing signals[1] related to IO that a runner can then
>>>> use
>>>> > intelligently but throttling isn't one of them yet.
>>>> >
>>>> > 1:
>>>> >
>>>> https://lists.apache.org/thread.html/r7c1bf68bd126f3421019e238363415604505f82aeb28ccaf8b834d0d%40%3Cdev.beam.apache.org%3E
>>>> > <
>>>> https://lists.apache.org/thread.html/r7c1bf68bd126f3421019e238363415604505f82aeb28ccaf8b834d0d%40%3Cdev.beam.apache.org%3E
>>>> >
>>>> >
>>>> >

Re: Throttling stream outputs per trigger?

2020-12-08 Thread Boyuan Zhang
Hi Vicent,

Window.into(FixedWindows.of(Duration.standardMinutes(5))) operation just
applies the window information to each element, not really does the
grouping operation. And in the commit transform, there is a combine
transform applied(Max.longsPerKey()).
Window.into(FixedWindows.of(Duration.standardMinutes(5))) + Max.longsPerKey()
means to output 1 element per 5 mins. This is different from your case
since the trigger in the CommitTransform is for the combine purpose.
And in order to prevent the data loss error you mentioned, there is a
persistent layer(Reshuffle) between Kafka read and any downstream
transform.

For your case, will the pipeline like KafkaRead -> Reshuffle ->
GroupIntoBatches -> downstream help you?

On Tue, Dec 8, 2020 at 1:19 PM Vincent Marquez 
wrote:

> If this is the case that the pipeline has no way of enforcing fixed time
> windows, how does this work:
>
>
> https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaCommitOffset.java#L126
>
> Isn't this supposed to only trigger every five minutes, regardless of how
> much data can immediately be grouped together in five minute windows?  If
> there is a way to mark that the fixed window should only trigger every so
> many minutes, that would solve my use case.  If there isn't a way to do
> this, the Kafka offset code seems broken and could result in 'data loss' by
> improperly committing offsets before they are run through the rest of the
> pipeline?
>
> *~Vincent*
>
>
> On Fri, Oct 16, 2020 at 4:17 AM Maximilian Michels  wrote:
>
>> > the downstream consumer has these requirements.
>>
>> Blocking should normally be avoided at all cost, but if the downstream
>> operator has the requirement to only emit a fixed number of messages per
>> second, it should enforce this, i.e. block once the maximum number of
>> messages for a time period have been reached. This will automatically
>> lead to backpressure in Runners like Flink or Dataflow.
>>
>> -Max
>>
>> On 07.10.20 18:30, Luke Cwik wrote:
>> > SplittableDoFns apply to both batch and streaming pipelines. They are
>> > allowed to produce an unbounded amount of data and can either self
>> > checkpoint saying they want to resume later or the runner will ask them
>> > to checkpoint via a split call.
>> >
>> > There hasn't been anything concrete on backpressure, there has been
>> work
>> > done about exposing signals[1] related to IO that a runner can then use
>> > intelligently but throttling isn't one of them yet.
>> >
>> > 1:
>> >
>> https://lists.apache.org/thread.html/r7c1bf68bd126f3421019e238363415604505f82aeb28ccaf8b834d0d%40%3Cdev.beam.apache.org%3E
>> > <
>> https://lists.apache.org/thread.html/r7c1bf68bd126f3421019e238363415604505f82aeb28ccaf8b834d0d%40%3Cdev.beam.apache.org%3E
>> >
>> >
>> > On Tue, Oct 6, 2020 at 3:51 PM Vincent Marquez
>> > mailto:vincent.marq...@gmail.com>> wrote:
>> >
>> > Thanks for the response.  Is my understanding correct that
>> > SplittableDoFns are only applicable to Batch pipelines?  I'm
>> > wondering if there's any proposals to address backpressure needs?
>> > /~Vincent/
>> >
>> >
>> > On Tue, Oct 6, 2020 at 1:37 PM Luke Cwik > > > wrote:
>> >
>> > There is no general back pressure mechanism within Apache Beam
>> > (runners should be intelligent about this but there is currently
>> > no way to say I'm being throttled so runners don't know that
>> > throwing more CPUs at a problem won't make it go faster). Y
>> >
>> > You can control how quickly you ingest data for runners that
>> > support splittable DoFns with SDK initiated checkpoints with
>> > resume delays. A splittable DoFn is able to return
>> > resume().withDelay(Duration.seconds(10)) from
>> > the @ProcessElement method. See Watch[1] for an example.
>> >
>> > The 2.25.0 release enables more splittable DoFn features on more
>> > runners. I'm working on a blog (initial draft[2], still mostly
>> > empty) to update the old blog from 2017.
>> >
>> > 1:
>> >
>> https://github.com/apache/beam/blob/9c239ac93b40e911f03bec5da3c58a07fdceb245/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Watch.java#L908
>> > <
>> https://github.com/apache/beam/blob/9c239ac93b40e911f03bec5da3c58a07fdceb245/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Watch.java#L908
>> >
>> > 2:
>> >
>> https://docs.google.com/document/d/1kpn0RxqZaoacUPVSMYhhnfmlo8fGT-p50fEblaFr2HE/edit#
>> > <
>> https://docs.google.com/document/d/1kpn0RxqZaoacUPVSMYhhnfmlo8fGT-p50fEblaFr2HE/edit#
>> >
>> >
>> >
>> > On Tue, Oct 6, 2020 at 10:39 AM Vincent Marquez
>> > mailto:vincent.marq...@gmail.com>>
>> > wrote:
>> >
>> > Hmm, I'm not sure how that will help, I understand how to
>> > batch up the data, but it is the triggering part that I
>> >  

Re: Proposal: Redis Stream Connector

2020-12-04 Thread Boyuan Zhang
Hi Vincent,

1. Just to be clear, for a streaming pipeline (say, on the dataflow
> runner)it  will use the 'residual' result of the SplitRestriction
> (retrieved from trySplit) as the checkpoint, so if the pipeline is stopped
> due to an error, then restarted with the same checkpoint it would resume
> off from the last written Residual checkpoint position?


Checkpoint is *only* persisted while the pipeline is running. So when the
pipeline is stopped and restarted, there is no way for the pipeline to
restart from the last checkpoint that is produced by the trySplit.
Alternatively, you can use bundle finalization to commit what you have
read, or create a CommitTransform. For example, in Kafka read, we create a
commit transform[1] which commits the read offset every 5 mins. When the
pipeline is stopped and restarted, the Kafka read will read from the last
committed offset.


> 2. Another question I have for writing an unbounded Splittable DoFn is how
> to write a test. With UnboundedSource, there was a way to wrap it to make
> it bounded so it would stop after processing N number of elements.  I would
> like to do the same, is there a uniform way of doing this currently?  If
> not I will just write a something custom for the loop in the processElement
> to check if it should continue opposed to 'while (true)'. Let me know if
> there is a uniform way, or you have a better idea on how to write a test
> for my PTransform that uses a Splittable DoFn.
>

 What I do for Kafka test[2][3] is to start a streaming pipeline and wait
for a certain time to cancel it.

[1]
https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaCommitOffset.java
[2]
https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOIT.java#L137
[3]
https://github.com/apache/beam/blob/master/.test-infra/jenkins/job_PerformanceTests_KafkaIO_IT.groovy

On Fri, Dec 4, 2020 at 11:57 AM Vincent Marquez 
wrote:

> Thank you for your help Boyuan.
>
> 1. Just to be clear, for a streaming pipeline (say, on the dataflow
> runner)it  will use the 'residual' result of the SplitRestriction
> (retrieved from trySplit) as the checkpoint, so if the pipeline is stopped
> due to an error, then restarted with the same checkpoint it would resume
> off from the last written Residual checkpoint position?
>
> 2. Another question I have for writing an unbounded Splittable DoFn is how
> to write a test. With UnboundedSource, there was a way to wrap it to make
> it bounded so it would stop after processing N number of elements.  I would
> like to do the same, is there a uniform way of doing this currently?  If
> not I will just write a something custom for the loop in the processElement
> to check if it should continue opposed to 'while (true)'. Let me know if
> there is a uniform way, or you have a better idea on how to write a test
> for my PTransform that uses a Splittable DoFn.
>
> Thanks again.
>
>
> *~Vincent*
>
>
> On Mon, Nov 30, 2020 at 10:25 AM Boyuan Zhang  wrote:
>
>> In Splittable DoFn, the trySplit[1] API in RestrictionTracker is for
>> performing checkpointing, keeping primary as current restriction and
>> returning residuals. In the DoFn, you can do Splittable DoFn initiated
>> checkpoint by returning ProcessContinuation.resume()[2]. Beam programming
>> guide[3] also talks about Splittable DoFn initiated checkpoint and runner
>> initiated checkpoint.
>>
>> [1]
>> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/RestrictionTracker.java#L72-L108
>> [2]
>> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java#L1297-L1333
>> [3]
>> https://beam.apache.org/documentation/programming-guide/#splittable-dofns
>>
>> On Sun, Nov 29, 2020 at 10:28 PM Vincent Marquez <
>> vincent.marq...@gmail.com> wrote:
>>
>>> Regarding checkpointing:
>>>
>>> I'm confused how the Splittable DoFn can make use of checkpoints to
>>> resume and not have data loss.  Unlike the old API that had a very easy to
>>> understand method called 'getCheckpointMark' that allows me to return the
>>> completed work, I don't see where that is done with the current API.
>>>
>>> I tried looking at the OffsetRangeTracker and how it is used by Kafka
>>> but I'm failing to understand it.  The process method takes the
>>> RestrictionTracker, but there isn't a way I see for the OffsetRangeTracker
>>> to represent half completed work (in the event of an exception/crash during
>>> a previous 'process' method run.   Is there some documentation that co

Re: Usability regression using SDF Unbounded Source wrapper + DirectRunner

2020-12-04 Thread Boyuan Zhang
Opened https://issues.apache.org/jira/browse/BEAM-11403 for tracking.

On Fri, Dec 4, 2020 at 10:52 AM Boyuan Zhang  wrote:

> Thanks for the pointer, Steve! I'll check it out. The execution paths for
> UnboundedSource and SDF wrapper are different. It's highly possible that
> the regression either comes from the invocation path for SDF wrapper, or
> the implementation of SDF wrapper itself.
>
> On Fri, Dec 4, 2020 at 6:33 AM Steve Niemitz  wrote:
>
>> Coincidentally, someone else in the ASF slack mentioned [1] yesterday
>> that they were seeing significantly reduced performance using KafkaIO.Read
>> w/ the SDF wrapper vs the unbounded source.  They mentioned they were using
>> flink 1.9.
>>
>> https://the-asf.slack.com/archives/C9H0YNP3P/p1607057900393900
>>
>> On Thu, Dec 3, 2020 at 1:56 PM Boyuan Zhang  wrote:
>>
>>> Hi Steve,
>>>
>>> I think the major performance regression comes from
>>> OutputAndTimeBoundedSplittableProcessElementInvoker[1], which will
>>> checkpoint the DoFn based on time/output limit and use timers/state to
>>> reschedule works.
>>>
>>> [1]
>>> https://github.com/apache/beam/blob/master/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java
>>>
>>> On Thu, Dec 3, 2020 at 9:40 AM Steve Niemitz 
>>> wrote:
>>>
>>>> I have a pipeline that reads from pubsub, does some aggregation, and
>>>> writes to various places.  Previously, in older versions of beam, when
>>>> running this in the DirectRunner, messages would go through the pipeline
>>>> almost instantly, making it very easy to debug locally, etc.
>>>>
>>>> However, after upgrading to beam 2.25, I noticed that it could take on
>>>> the order of 5-10 minutes for messages to get from the pubsub read step to
>>>> the next step in the pipeline (deserializing them, etc).  The subscription
>>>> being read from has on the order of 100,000 elements/sec arriving in it.
>>>>
>>>> Setting --experiments=use_deprecated_read fixes it, and makes the
>>>> pipeline behave as it did before.
>>>>
>>>> It seems like the SDF implementation in the DirectRunner here is
>>>> causing some kind of issue, either buffering a very large amount of data
>>>> before emitting it in a bundle, or something else.  Has anyone else run
>>>> into this?
>>>>
>>>


Re: Usability regression using SDF Unbounded Source wrapper + DirectRunner

2020-12-04 Thread Boyuan Zhang
Thanks for the pointer, Steve! I'll check it out. The execution paths for
UnboundedSource and SDF wrapper are different. It's highly possible that
the regression either comes from the invocation path for SDF wrapper, or
the implementation of SDF wrapper itself.

On Fri, Dec 4, 2020 at 6:33 AM Steve Niemitz  wrote:

> Coincidentally, someone else in the ASF slack mentioned [1] yesterday that
> they were seeing significantly reduced performance using KafkaIO.Read w/
> the SDF wrapper vs the unbounded source.  They mentioned they were using
> flink 1.9.
>
> https://the-asf.slack.com/archives/C9H0YNP3P/p1607057900393900
>
> On Thu, Dec 3, 2020 at 1:56 PM Boyuan Zhang  wrote:
>
>> Hi Steve,
>>
>> I think the major performance regression comes from
>> OutputAndTimeBoundedSplittableProcessElementInvoker[1], which will
>> checkpoint the DoFn based on time/output limit and use timers/state to
>> reschedule works.
>>
>> [1]
>> https://github.com/apache/beam/blob/master/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java
>>
>> On Thu, Dec 3, 2020 at 9:40 AM Steve Niemitz  wrote:
>>
>>> I have a pipeline that reads from pubsub, does some aggregation, and
>>> writes to various places.  Previously, in older versions of beam, when
>>> running this in the DirectRunner, messages would go through the pipeline
>>> almost instantly, making it very easy to debug locally, etc.
>>>
>>> However, after upgrading to beam 2.25, I noticed that it could take on
>>> the order of 5-10 minutes for messages to get from the pubsub read step to
>>> the next step in the pipeline (deserializing them, etc).  The subscription
>>> being read from has on the order of 100,000 elements/sec arriving in it.
>>>
>>> Setting --experiments=use_deprecated_read fixes it, and makes the
>>> pipeline behave as it did before.
>>>
>>> It seems like the SDF implementation in the DirectRunner here is causing
>>> some kind of issue, either buffering a very large amount of data before
>>> emitting it in a bundle, or something else.  Has anyone else run into this?
>>>
>>


[Proposal] Remove @Experimental from Splittable DoFn APIs

2020-12-03 Thread Boyuan Zhang
Hi folks,

As we are reaching a stable state on Splittable DoFn APIs both in Java and
Python SDK, I'm proposing to remove Experimental annotations from these
APIs.

I have opened one PR[1] to do so. Please feel free to drop any comments on
that PR.

Thanks for your help!

[1] https://github.com/apache/beam/pull/13199


Re: Usability regression using SDF Unbounded Source wrapper + DirectRunner

2020-12-03 Thread Boyuan Zhang
Hi Steve,

I think the major performance regression comes from
OutputAndTimeBoundedSplittableProcessElementInvoker[1], which will
checkpoint the DoFn based on time/output limit and use timers/state to
reschedule works.

[1]
https://github.com/apache/beam/blob/master/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java

On Thu, Dec 3, 2020 at 9:40 AM Steve Niemitz  wrote:

> I have a pipeline that reads from pubsub, does some aggregation, and
> writes to various places.  Previously, in older versions of beam, when
> running this in the DirectRunner, messages would go through the pipeline
> almost instantly, making it very easy to debug locally, etc.
>
> However, after upgrading to beam 2.25, I noticed that it could take on the
> order of 5-10 minutes for messages to get from the pubsub read step to the
> next step in the pipeline (deserializing them, etc).  The subscription
> being read from has on the order of 100,000 elements/sec arriving in it.
>
> Setting --experiments=use_deprecated_read fixes it, and makes the pipeline
> behave as it did before.
>
> It seems like the SDF implementation in the DirectRunner here is causing
> some kind of issue, either buffering a very large amount of data before
> emitting it in a bundle, or something else.  Has anyone else run into this?
>


Re: Proposal: Redis Stream Connector

2020-11-30 Thread Boyuan Zhang
In Splittable DoFn, the trySplit[1] API in RestrictionTracker is for
performing checkpointing, keeping primary as current restriction and
returning residuals. In the DoFn, you can do Splittable DoFn initiated
checkpoint by returning ProcessContinuation.resume()[2]. Beam programming
guide[3] also talks about Splittable DoFn initiated checkpoint and runner
initiated checkpoint.

[1]
https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/RestrictionTracker.java#L72-L108
[2]
https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java#L1297-L1333
[3]
https://beam.apache.org/documentation/programming-guide/#splittable-dofns

On Sun, Nov 29, 2020 at 10:28 PM Vincent Marquez 
wrote:

> Regarding checkpointing:
>
> I'm confused how the Splittable DoFn can make use of checkpoints to resume
> and not have data loss.  Unlike the old API that had a very easy to
> understand method called 'getCheckpointMark' that allows me to return the
> completed work, I don't see where that is done with the current API.
>
> I tried looking at the OffsetRangeTracker and how it is used by Kafka but
> I'm failing to understand it.  The process method takes the
> RestrictionTracker, but there isn't a way I see for the OffsetRangeTracker
> to represent half completed work (in the event of an exception/crash during
> a previous 'process' method run.   Is there some documentation that could
> help me understand this part?  Thanks in advance.
>
> *~Vincent*
>
>
> On Thu, Nov 26, 2020 at 2:01 PM Ismaël Mejía  wrote:
>
>> Just want to mention that we have been working with Vincent in the
>> ReadAll implementation for Cassandra based on normal DoFn, and we
>> expect to get it merged for the next release of Beam. Vincent is
>> familiarized now with DoFn based IO composition, a first step towards
>> SDF understanding. Vincent you can think of our Cassandra RingRange as
>> a Restriction in the context of SDF. Just for reference it would be
>> good to read in advance these two:
>>
>> https://beam.apache.org/blog/splittable-do-fn/
>> https://beam.apache.org/documentation/programming-guide/#sdf-basics
>>
>> Thanks Boyuan for offering your help I think it is really needed
>> considering that we don't have many Unbounded SDF connectors to use as
>> reference.
>>
>> On Thu, Nov 19, 2020 at 11:16 PM Boyuan Zhang  wrote:
>> >
>> >
>> >
>> > On Thu, Nov 19, 2020 at 1:29 PM Vincent Marquez <
>> vincent.marq...@gmail.com> wrote:
>> >>
>> >>
>> >>
>> >>
>> >> On Thu, Nov 19, 2020 at 10:38 AM Boyuan Zhang 
>> wrote:
>> >>>
>> >>> Hi Vincent,
>> >>>
>> >>> Thanks for your contribution! I'm happy to work with you on this when
>> you contribute the code into Beam.
>> >>
>> >>
>> >> Should I write up a JIRA to start?  I have access, I've already been
>> in the process of contributing some big changes to the CassandraIO
>> connector.
>> >
>> >
>> > Yes, please create a JIRA and assign it to yourself.
>> >
>> >>
>> >>
>> >>>
>> >>>
>> >>> Another thing is that it would be preferable to use Splittable DoFn
>> instead of using UnboundedSource to write a new IO.
>> >>
>> >>
>> >> I would prefer to use the UnboundedSource connector, I've already
>> written most of it, but also, I see some challenges using Splittable DoFn
>> for Redis streams.
>> >>
>> >> Unlike Kafka and Kinesis, Redis Streams offsets are not simply
>> monotonically increasing counters, so there is not a way  to just claim a
>> chunk of work and know that the chunk has any actual data in it.
>> >>
>> >> Since UnboundedSource is not yet deprecated, could I contribute that
>> after finishing up some test aspects, and then perhaps we can implement a
>> Splittable DoFn version?
>> >
>> >
>> > It would be nice not to build new IOs on top of UnboundedSource.
>> Currently we already have the wrapper class which translates the existing
>> UnboundedSource into Unbounded Splittable DoFn and executes the
>> UnboundedSource as the Splittable DoFn. How about you open a WIP PR and we
>> go through the UnboundedSource implementation together to figure out a
>> design for using Splittable DoFn?
>> >
>> >
>> >>
>> >>
>> >>
>> >>>
>> >>>
>> >>> On Thu, Nov 19, 2020 at 10:18 AM Vincent Marquez <
>> vincent.marq...@gmail.com> wrote:
>> >>>>
>> >>>> Currently, Redis offers a streaming queue functionality similar to
>> Kafka/Kinesis/Pubsub, but Beam does not currently have a connector for it.
>> >>>>
>> >>>> I've written an UnboundedSource connector that makes use of Redis
>> Streams as a POC and it seems to work well.
>> >>>>
>> >>>> If someone is willing to work with me, I could write up a JIRA
>> and/or open up a WIP pull request if there is interest in getting this as
>> an official connector.  I would mostly need guidance on naming/testing
>> aspects.
>> >>>>
>> >>>> https://redis.io/topics/streams-intro
>> >>>>
>> >>>> ~Vincent
>> >>
>> >>
>> >> ~Vincent
>>
>


Re: Create External Transform with WindowFn

2020-11-30 Thread Boyuan Zhang
Hi Steve,

Unfortunately I don't think there is a workaround before we have the change
that Cham mentions.

On Mon, Nov 30, 2020 at 8:16 AM Steve Niemitz  wrote:

> I'm trying to write an xlang transform that uses Reshuffle internally, and
> ran into this as well.  Is there any workaround to this for now (other than
> removing the reshuffle), or do I just need to wait for what Chamikara
> mentioned?  I noticed the same issue was mentioned in the SnowflakeIO.Read
> PR as well [1].
>
> https://github.com/apache/beam/pull/12149#discussion_r463710165
>
> On Wed, Aug 26, 2020 at 10:55 PM Boyuan Zhang  wrote:
>
>> That explains a lot. Thanks, Cham!
>>
>> On Wed, Aug 26, 2020 at 7:44 PM Chamikara Jayalath 
>> wrote:
>>
>>> Due to the proto -> object -> proto conversion we do today, Python needs
>>> to parse the full sub-graph from Java. We have hooks for PTransforms and
>>> Coders but not for windowing operations. This limitation will go away after
>>> we have direct Beam proto to Dataflow proto conversion in place.
>>>
>>> On Wed, Aug 26, 2020 at 7:03 PM Robert Burke  wrote:
>>>
>>>> Coders should only be checked over the language boundaries.
>>>>
>>>> On Wed, Aug 26, 2020, 6:24 PM Boyuan Zhang  wrote:
>>>>
>>>>> Thanks Cham!
>>>>>
>>>>>  I just realized that the *beam:window_fn:serialized_**java:v1 *is
>>>>> introduced by Java *Reshuffle.viaRandomKey()*. But
>>>>> *Reshuffle.viaRandomKey()* does rewindowed into original window
>>>>> strategy(which is *GlobalWindows *in my case). Is it expected that we
>>>>> also check intermediate PCollection rather than only the PCollection that
>>>>> across the language boundary?
>>>>>
>>>>> More about my Ptransform:
>>>>> MyExternalPTransform  -- expand to --  ParDo() ->
>>>>> Reshuffle.viaRandomKey() -> ParDo() -> WindowInto(FixWindow) -> ParDo() ->
>>>>> output void
>>>>>
>>>>>|
>>>>>
>>>>> -> ParDo()
>>>>> -> output PCollection to Python SDK
>>>>>
>>>>> On Tue, Aug 25, 2020 at 6:29 PM Chamikara Jayalath <
>>>>> chamik...@google.com> wrote:
>>>>>
>>>>>> Also it's strange that Java used (beam:window_fn:serialized_java:v1)
>>>>>> for the URN here instead of "beam:window_fn:fixed_windows:v1" [1]
>>>>>> which is what is being registered by Python [2]. This seems to be the
>>>>>> immediate issue. Tracking bug for supporting custom windows is
>>>>>> https://issues.apache.org/jira/browse/BEAM-10507.
>>>>>>
>>>>>> [1]
>>>>>> https://github.com/apache/beam/blob/master/model/pipeline/src/main/proto/standard_window_fns.proto#L55
>>>>>> [2]
>>>>>> https://github.com/apache/beam/blob/bd4df94ae10a7e7b0763c1917746d2faf5aeed6c/sdks/python/apache_beam/transforms/window.py#L449
>>>>>>
>>>>>> On Tue, Aug 25, 2020 at 6:07 PM Chamikara Jayalath <
>>>>>> chamik...@google.com> wrote:
>>>>>>
>>>>>>> Pipelines that use external WindowingStrategies might be failing
>>>>>>> during proto -> object -> proto conversion we do today. This limitation
>>>>>>> will go away once Dataflow directly starts reading Beam protos. We are
>>>>>>> working on this now.
>>>>>>>
>>>>>>> Thanks,
>>>>>>> Cham
>>>>>>>
>>>>>>> On Tue, Aug 25, 2020 at 5:38 PM Boyuan Zhang 
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Thanks, Robert! I want to add more details on my External
>>>>>>>> PTransform:
>>>>>>>>
>>>>>>>> MyExternalPTransform  -- expand to --  ParDo() ->
>>>>>>>> WindowInto(FixWindow) -> ParDo() -> output void
>>>>>>>>
>>>>>>>> |
>>>>>>>>
>>>>>>>> -> ParDo() -> output PCollection to Python SDK
>>>>>>>> The full stacktrace:
>>>>>>>>
>>>>>>>> INFO:root:Using Java

Re: CheckDone in unbounded SDF?

2020-11-28 Thread Boyuan Zhang
You need to update your current restriction when you do trySplit. That
means your current restriction should be the primary of the split result.
Please refer to existing OffsetRangeTracker implementation[1] for more
details.

[1]
https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/OffsetRangeTracker.java#L81

On Fri, Nov 27, 2020 at 11:48 PM Daniel Collins 
wrote:

> Ah, I appear to have missed this line, which indicates that the current
> RestrictionTracker must be updated.
>
> This invocation updates the {@link
>* #currentRestriction()} to be the primary restriction effectively
> having the current {@link
>* DoFn.ProcessElement} execution responsible for performing the work
> that the primary restriction
>* represents.
>
> On Sat, Nov 28, 2020 at 2:45 AM Daniel Collins 
> wrote:
>
>> This does not appear to work: The CheckDone call, as far as I can tell,
>> is made on the existing range not the split range based on the following
>> error:
>>
>> Error message from worker: java.lang.IllegalStateException: Last
>> attempted offset was 4601978 in range [2975759, 9223372036854775807),
>> claiming work in [4601979, 9223372036854775807) was not attempted
>> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState(Preconditions.java:862)
>>
>> On Sat, Nov 28, 2020 at 2:18 AM Daniel Collins 
>> wrote:
>>
>>> Can you confirm that the following implementation of trySplit will work
>>> as intended (from an OffsetRangeTracker subclass)?
>>>
>>> @Override
>>>   public SplitResult trySplit(double fractionOfRemainder) {
>>> return SplitResult.of(
>>> new OffsetRange(currentRestriction().getFrom(),
>>> lastClaimedOffset + 1),
>>> new OffsetRange(lastClaimedOffset + 1, Long.MAX_VALUE));
>>>   }
>>>
>>> > It would nice to update the documentation if that's confusing.
>>>
>>> If you could please update this (if this is indeed the case) to confirm
>>> that it ensures that there will never be two unbounded restrictions sent to
>>> DoFns running at the same time using this pattern, that would be great.
>>>
>>> In addition, I'm not quite sure how this works? When the 'trySplit' call
>>> occurs, it returns two OffsetRanges, which don't yet include information
>>> about claimed offsets. How is the first half of this converted to my
>>> OffsetRangeTracker subclass with offsets already claimed? Does the runtime
>>> call (and importantly, is it required to call):
>>>
>>> MyRestrictionTracker tracker =
>>> MyDoFn.newTracker(splitResult.getPrimary());
>>> tracker.tryClaim(previousClaimed);
>>> tracker.checkDone();
>>>
>>> On Sat, Nov 28, 2020 at 2:08 AM Boyuan Zhang  wrote:
>>>
>>>> It would nice to update the documentation if that's confusing.
>>>>
>>>> On Fri, Nov 27, 2020 at 11:05 PM Daniel Collins 
>>>> wrote:
>>>>
>>>>> I think the documentation for trySplit() doesn't make it clear that it
>>>>> supports this use case. In particular this section:
>>>>>
>>>>> > This invocation updates the {@link #currentRestriction()} to be the
>>>>> primary restriction effectively having the current {@link
>>>>> DoFn.ProcessElement} execution responsible for performing the work that 
>>>>> the
>>>>> primary restriction represents. The residual restriction will be executed
>>>>> in a separate {@link DoFn.ProcessElement} invocation (likely in a 
>>>>> different
>>>>> process). The work performed by executing the primary and residual
>>>>> restrictions as separate {@link DoFn.ProcessElement} invocations MUST be
>>>>> equivalent to the work performed as if this split never occurred.
>>>>>
>>>>> Implies that the runner will try to run both restrictions again on
>>>>> separate workers. This is not the behavior I am looking for, hence my
>>>>> confusion. Can we change the documentation here to make clear that
>>>>> checkDone will be called on the primary restriction in the output to 
>>>>> ensure
>>>>> that it is actually completed if the trySplit call was triggered by a call
>>>>> to resume()?
>>>>>
>>>>> On Sat, Nov 28, 2020 at 1:58 AM Boyuan Zhang 
>>>>> wrote:
>>>>>
>>>>>> And if you look into the 

Re: CheckDone in unbounded SDF?

2020-11-27 Thread Boyuan Zhang
It would nice to update the documentation if that's confusing.

On Fri, Nov 27, 2020 at 11:05 PM Daniel Collins 
wrote:

> I think the documentation for trySplit() doesn't make it clear that it
> supports this use case. In particular this section:
>
> > This invocation updates the {@link #currentRestriction()} to be the
> primary restriction effectively having the current {@link
> DoFn.ProcessElement} execution responsible for performing the work that the
> primary restriction represents. The residual restriction will be executed
> in a separate {@link DoFn.ProcessElement} invocation (likely in a different
> process). The work performed by executing the primary and residual
> restrictions as separate {@link DoFn.ProcessElement} invocations MUST be
> equivalent to the work performed as if this split never occurred.
>
> Implies that the runner will try to run both restrictions again on
> separate workers. This is not the behavior I am looking for, hence my
> confusion. Can we change the documentation here to make clear that
> checkDone will be called on the primary restriction in the output to ensure
> that it is actually completed if the trySplit call was triggered by a call
> to resume()?
>
> On Sat, Nov 28, 2020 at 1:58 AM Boyuan Zhang  wrote:
>
>> And if you look into the RestrictionTracker javadoc[1], it mentions that
>> what means when you return null from trySplit.
>>
>> [1]
>> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/RestrictionTracker.java#L104-L107
>>
>> On Fri, Nov 27, 2020 at 10:54 PM Boyuan Zhang  wrote:
>>
>>> To the extent I can see, this never mentions the restriction that you
>>>>  to implement a split() that returns a bounded restriction if
>>>> returning resume() from an SDF. Nor does this restriction particularly make
>>>> sense if the range being processed is itself unbounded? Perhaps you would
>>>> consider not calling checkDone() on resume() if the restriction provided to
>>>> the runner is unbounded since it would be unreasonable to complete an
>>>> unbounded restriction?
>>>
>>>
>>> It seems like you are not familiar with how beam deals with resume() so
>>> let's start from this part. Let's say that your SDF is processing a
>>> restriction of [0, MAX) and so far you have done tryClaim(5), and you
>>> want to return resume() at this point. When you return resume() from here,
>>> the beam Java DoFn invoker will know you want to resume and call your
>>> restrictionTracker.trySplit(fractionOfRemainder=0.0) to get the residual
>>> from [0, Max). Ideally, your RestrictionTracker should return [0, 6) as
>>> your current restriction and [6, MAX) as the residual. Then beam Java DoFn
>>> invoker will call checkDone on the restriction [0, 6) to double check your
>>> SDF has called tryClaim(5) to ensure there is no data loss. Then the SDK
>>> will return the [6, Max) restriction back to runner(in your case that's
>>> Dataflow) and the runner will reschedule [6, MAX) based on its scheduling
>>> strategy. That's why if you want to use resume(), you need to implement
>>> trySplit. That's also why trySplit and checkDone also make sense on
>>> Unbounded restriction.
>>>
>>> Perhaps it would be better to explain in terms of why I'm trying to do
>>>> this. If the subscription has not received any data in a while, or is
>>>> receiving data infrequently, I want to enable dataflow to scale down to 1
>>>> worker, but there will be no need to call "tryClaim" if there is no new
>>>> data from Pub/Sub Lite. All I want to do is, if data is arriving
>>>> infrequently, give dataflow the opportunity to scale my job down. I'm not
>>>> actually done with the data because, as new data can always be published in
>>>> the future, we can't know that, and I'm trying to avoid needing to
>>>> implement bounded reads to artificially produce sub-windows when an
>>>> unbounded output is much more natural.
>>>
>>>
>>> So you are referring to the resume use case. Please note that even
>>> though you are returning resume() from your SDF, that doesn't means
>>> Dataflow will guarantee that the worker will be downscaled to 1. But
>>> resume() indeed can help you free some workers to process other work,
>>> compared to having your SDF doing busy wait.
>>>
>>> This is good to know. So to rephrase, could I periodically call
>>>> tryClaim() to yield control back
>>>> to the 

Re: CheckDone in unbounded SDF?

2020-11-27 Thread Boyuan Zhang
And if you look into the RestrictionTracker javadoc[1], it mentions that
what means when you return null from trySplit.

[1]
https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/RestrictionTracker.java#L104-L107

On Fri, Nov 27, 2020 at 10:54 PM Boyuan Zhang  wrote:

> To the extent I can see, this never mentions the restriction that you
>>  to implement a split() that returns a bounded restriction if
>> returning resume() from an SDF. Nor does this restriction particularly make
>> sense if the range being processed is itself unbounded? Perhaps you would
>> consider not calling checkDone() on resume() if the restriction provided to
>> the runner is unbounded since it would be unreasonable to complete an
>> unbounded restriction?
>
>
> It seems like you are not familiar with how beam deals with resume() so
> let's start from this part. Let's say that your SDF is processing a
> restriction of [0, MAX) and so far you have done tryClaim(5), and you
> want to return resume() at this point. When you return resume() from here,
> the beam Java DoFn invoker will know you want to resume and call your
> restrictionTracker.trySplit(fractionOfRemainder=0.0) to get the residual
> from [0, Max). Ideally, your RestrictionTracker should return [0, 6) as
> your current restriction and [6, MAX) as the residual. Then beam Java DoFn
> invoker will call checkDone on the restriction [0, 6) to double check your
> SDF has called tryClaim(5) to ensure there is no data loss. Then the SDK
> will return the [6, Max) restriction back to runner(in your case that's
> Dataflow) and the runner will reschedule [6, MAX) based on its scheduling
> strategy. That's why if you want to use resume(), you need to implement
> trySplit. That's also why trySplit and checkDone also make sense on
> Unbounded restriction.
>
> Perhaps it would be better to explain in terms of why I'm trying to do
>> this. If the subscription has not received any data in a while, or is
>> receiving data infrequently, I want to enable dataflow to scale down to 1
>> worker, but there will be no need to call "tryClaim" if there is no new
>> data from Pub/Sub Lite. All I want to do is, if data is arriving
>> infrequently, give dataflow the opportunity to scale my job down. I'm not
>> actually done with the data because, as new data can always be published in
>> the future, we can't know that, and I'm trying to avoid needing to
>> implement bounded reads to artificially produce sub-windows when an
>> unbounded output is much more natural.
>
>
> So you are referring to the resume use case. Please note that even though
> you are returning resume() from your SDF, that doesn't means Dataflow will
> guarantee that the worker will be downscaled to 1. But resume() indeed can
> help you free some workers to process other work, compared to having your
> SDF doing busy wait.
>
> This is good to know. So to rephrase, could I periodically call
>> tryClaim() to yield control back
>> to the runtime?
>
>
> You can do so by implementing RestrictionTracker.trySplit() and using
> resume().
>
> You may also want to take a look at Kafka example[1]. Hope that is helpful.
>
> [1]
> https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java
>
>
>
> On Fri, Nov 27, 2020 at 10:34 PM Daniel Collins 
> wrote:
>
>> > There is an update-to-date sdf programming guide[1] and typically [2]
>> is talking about SDF initiated-checkpointing
>>
>> To the extent I can see, this never mentions the restriction that you
>>  to implement a split() that returns a bounded restriction if
>> returning resume() from an SDF. Nor does this restriction particularly make
>> sense if the range being processed is itself unbounded? Perhaps you would
>> consider not calling checkDone() on resume() if the restriction provided to
>> the runner is unbounded since it would be unreasonable to complete an
>> unbounded restriction?
>>
>> > It depends on the definition of no new data to enable rescheduling.
>>
>> Perhaps it would be better to explain in terms of why I'm trying to do
>> this. If the subscription has not received any data in a while, or is
>> receiving data infrequently, I want to enable dataflow to scale down to 1
>> worker, but there will be no need to call "tryClaim" if there is no new
>> data from Pub/Sub Lite. All I want to do is, if data is arriving
>> infrequently, give dataflow the opportunity to scale my job down. I'm not
>> actually done with the data because, as new data can always be published in
>> 

Re: CheckDone in unbounded SDF?

2020-11-27 Thread Boyuan Zhang
>
> To the extent I can see, this never mentions the restriction that you
>  to implement a split() that returns a bounded restriction if
> returning resume() from an SDF. Nor does this restriction particularly make
> sense if the range being processed is itself unbounded? Perhaps you would
> consider not calling checkDone() on resume() if the restriction provided to
> the runner is unbounded since it would be unreasonable to complete an
> unbounded restriction?


It seems like you are not familiar with how beam deals with resume() so
let's start from this part. Let's say that your SDF is processing a
restriction of [0, MAX) and so far you have done tryClaim(5), and you
want to return resume() at this point. When you return resume() from here,
the beam Java DoFn invoker will know you want to resume and call your
restrictionTracker.trySplit(fractionOfRemainder=0.0) to get the residual
from [0, Max). Ideally, your RestrictionTracker should return [0, 6) as
your current restriction and [6, MAX) as the residual. Then beam Java DoFn
invoker will call checkDone on the restriction [0, 6) to double check your
SDF has called tryClaim(5) to ensure there is no data loss. Then the SDK
will return the [6, Max) restriction back to runner(in your case that's
Dataflow) and the runner will reschedule [6, MAX) based on its scheduling
strategy. That's why if you want to use resume(), you need to implement
trySplit. That's also why trySplit and checkDone also make sense on
Unbounded restriction.

Perhaps it would be better to explain in terms of why I'm trying to do
> this. If the subscription has not received any data in a while, or is
> receiving data infrequently, I want to enable dataflow to scale down to 1
> worker, but there will be no need to call "tryClaim" if there is no new
> data from Pub/Sub Lite. All I want to do is, if data is arriving
> infrequently, give dataflow the opportunity to scale my job down. I'm not
> actually done with the data because, as new data can always be published in
> the future, we can't know that, and I'm trying to avoid needing to
> implement bounded reads to artificially produce sub-windows when an
> unbounded output is much more natural.


So you are referring to the resume use case. Please note that even though
you are returning resume() from your SDF, that doesn't means Dataflow will
guarantee that the worker will be downscaled to 1. But resume() indeed can
help you free some workers to process other work, compared to having your
SDF doing busy wait.

This is good to know. So to rephrase, could I periodically call
> tryClaim() to yield control back
> to the runtime?


You can do so by implementing RestrictionTracker.trySplit() and using
resume().

You may also want to take a look at Kafka example[1]. Hope that is helpful.

[1]
https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java



On Fri, Nov 27, 2020 at 10:34 PM Daniel Collins 
wrote:

> > There is an update-to-date sdf programming guide[1] and typically [2] is
> talking about SDF initiated-checkpointing
>
> To the extent I can see, this never mentions the restriction that you
>  to implement a split() that returns a bounded restriction if
> returning resume() from an SDF. Nor does this restriction particularly make
> sense if the range being processed is itself unbounded? Perhaps you would
> consider not calling checkDone() on resume() if the restriction provided to
> the runner is unbounded since it would be unreasonable to complete an
> unbounded restriction?
>
> > It depends on the definition of no new data to enable rescheduling.
>
> Perhaps it would be better to explain in terms of why I'm trying to do
> this. If the subscription has not received any data in a while, or is
> receiving data infrequently, I want to enable dataflow to scale down to 1
> worker, but there will be no need to call "tryClaim" if there is no new
> data from Pub/Sub Lite. All I want to do is, if data is arriving
> infrequently, give dataflow the opportunity to scale my job down. I'm not
> actually done with the data because, as new data can always be published in
> the future, we can't know that, and I'm trying to avoid needing to
> implement bounded reads to artificially produce sub-windows when an
> unbounded output is much more natural.
>
> > Please note that the when an SDF is processing one element restriction
> pair, the start of the restriction is never changed. You will always get
> the same offset when you call currentRestriction().getFrom().
>
> This is good to know. So to rephrase, could I periodically call
> tryClaim() to yield control back
> to the runtime?
>
> -Dan
>
>
>
> On Sat, Nov 28, 2020 at 1:21 AM Boyuan Zhang  wrote:
>
>> > Okay, is this documented a

Re: CheckDone in unbounded SDF?

2020-11-27 Thread Boyuan Zhang
> Okay, is this documented anywhere? In particular,
https://s.apache.org/splittable-do-fn seems out of date, since it
implies resume() should be returned when tryClaim returns false.

There is an update-to-date sdf programming guide[1] and typically [2] is
talking about SDF initiated-checkpointing. And stop() should be returned
when tryClaim returns false where resume() is expected to return when the
restriction is not fully processed and you want to defer processing in the
future.

> If this is the case, is there any way I can yield control to the runtime
if I have no new data to enable rescheduling? For example, can I call
tracker.tryClaim(tracker.currentRestriction().getFrom()) ?

It depends on the definition of no new data to enable rescheduling.
If you believe that you are done with current restriction even though you
are not reaching to the end of restriction, you can specially say I'm done
with current one by calling restrictionTracker.tryClaim(MAX_LONG)(or
tryClaim(restriction.getTo) if you are sure your end of restriction is not
changed by any splitting).
If you just want to re-process the rest of the restriction after a certain
time, e.g, 5 mins, 30mins and so on, you need to implement the trySplit and
return resume(duration) when you want to resume.

Please note that the when an SDF is processing one element restriction
pair, the start of the restriction is never changed. You will always get
the same offset when you call currentRestriction().getFrom().


[1]
https://beam.apache.org/documentation/programming-guide/#splittable-dofns
[2]
https://beam.apache.org/documentation/programming-guide/#user-initiated-checkpoint

On Fri, Nov 27, 2020 at 10:07 PM Daniel Collins 
wrote:

> > you can either never return resume() from your SDF or implement suitable
> trySplit() logic for your RestrictionTracker
>
> Okay, is this documented anywhere? In particular,
> https://s.apache.org/splittable-do-fn seems out of date, since it
> implies resume() should be returned when tryClaim returns false.
>
> If this is the case, is there any way I can yield control to the runtime
> if I have no new data to enable rescheduling? For example, can I call
> tracker.tryClaim(tracker.currentRestriction().getFrom()) ?
>
> On Sat, Nov 28, 2020 at 12:57 AM Boyuan Zhang  wrote:
>
>> > IIUC, this should never happen as long as I return null to trySplit. Is
>> this not the case? (trySplit implementation below)
>>
>> I noticed that in your implementation you return null for your
>> RestrictionTracker.trySplit. That means you cannot return resume() from
>> your SplittableDoFn.process() body since resume() means
>> performing SplittableDoFn self-initiated checkpointing and
>> deferring processing residuals.
>>
>> In your case, you can either never return resume() from your SDF or
>> implement suitable trySplit() logic for your RestrictionTracker. For
>> example, KafkaIO is using GrowableOffsetRangeTracker[1] to track an
>> infinite restriction.
>>
>> [1]
>> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/GrowableOffsetRangeTracker.java
>>
>> On Fri, Nov 27, 2020 at 9:50 PM Daniel Collins 
>> wrote:
>>
>>> > Please note that your current restriction might be changed to a finite
>>> restriction during processing one bundle if you do SplittableDoFn
>>> self-initiated checkpointing or any runner issued splits
>>>
>>> IIUC, this should never happen as long as I return null to trySplit. Is
>>> this not the case? (trySplit implementation below)
>>>
>>> @Override
>>>   public SplitResult trySplit(double fractionOfRemainder)
>>> {  return null; }
>>>
>>> > what will you do if we reach the finally block?
>>>
>>> At that point an exception is being thrown out of the processElement
>>> function. The answer to that would be "what will the runtime do if an
>>> exception is thrown out of the processElement function"
>>>
>>> > Open a WIP PR
>>>
>>> I have, but I'm staging changes in a separate repo. See
>>> https://github.com/googleapis/java-pubsublite/pull/390 (although this
>>> incorporates other changes, see PubsubLitePartitionSdf.java
>>> <https://github.com/googleapis/java-pubsublite/pull/390/files#diff-850ceb4efa83df7d14b259e2c672682d227473634f4b524650449775db14b8a1>
>>>  and PubsubLiteOffsetRangeTracker.java
>>> <https://github.com/googleapis/java-pubsublite/pull/390/files#diff-972f8d0dd1db4c4ce38e702e4e9f6a88ef69c2f1eab9870d9512cbe48e22ce67>
>>>  for
>>> the sdf and restriction tracker implementations)
>>>
>>> -Dan
&

Re: CheckDone in unbounded SDF?

2020-11-27 Thread Boyuan Zhang
> IIUC, this should never happen as long as I return null to trySplit. Is
this not the case? (trySplit implementation below)

I noticed that in your implementation you return null for your
RestrictionTracker.trySplit. That means you cannot return resume() from
your SplittableDoFn.process() body since resume() means
performing SplittableDoFn self-initiated checkpointing and
deferring processing residuals.

In your case, you can either never return resume() from your SDF or
implement suitable trySplit() logic for your RestrictionTracker. For
example, KafkaIO is using GrowableOffsetRangeTracker[1] to track an
infinite restriction.

[1]
https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/GrowableOffsetRangeTracker.java

On Fri, Nov 27, 2020 at 9:50 PM Daniel Collins  wrote:

> > Please note that your current restriction might be changed to a finite
> restriction during processing one bundle if you do SplittableDoFn
> self-initiated checkpointing or any runner issued splits
>
> IIUC, this should never happen as long as I return null to trySplit. Is
> this not the case? (trySplit implementation below)
>
> @Override
>   public SplitResult trySplit(double fractionOfRemainder)
> {  return null; }
>
> > what will you do if we reach the finally block?
>
> At that point an exception is being thrown out of the processElement
> function. The answer to that would be "what will the runtime do if an
> exception is thrown out of the processElement function"
>
> > Open a WIP PR
>
> I have, but I'm staging changes in a separate repo. See
> https://github.com/googleapis/java-pubsublite/pull/390 (although this
> incorporates other changes, see PubsubLitePartitionSdf.java
> <https://github.com/googleapis/java-pubsublite/pull/390/files#diff-850ceb4efa83df7d14b259e2c672682d227473634f4b524650449775db14b8a1>
>  and PubsubLiteOffsetRangeTracker.java
> <https://github.com/googleapis/java-pubsublite/pull/390/files#diff-972f8d0dd1db4c4ce38e702e4e9f6a88ef69c2f1eab9870d9512cbe48e22ce67>
>  for
> the sdf and restriction tracker implementations)
>
> -Dan
>
> On Sat, Nov 28, 2020 at 12:42 AM Boyuan Zhang  wrote:
>
>>
>>
>> On Fri, Nov 27, 2020 at 9:34 PM Daniel Collins 
>> wrote:
>>
>>> Hello Boyuan,
>>>
>>> Responses inline.
>>>
>>> > The checkDone is invoked by the SDK harness to guarantee that when you
>>> exit you SplittableDoFn.process you must have completed all the work in the
>>> current restriction
>>>
>>> This is impossible with unbounded restrictions since, by definition, all
>>> work cannot be completed.
>>>
>> Please note that your current restriction might be changed to a finite
>> restriction during processing one bundle if you do SplittableDoFn
>> self-initiated checkpointing or any runner issued splits.
>>
>>
>>>
>>> > In your case, it seems like after you do tryClaim(3188439), you return
>>> stop() directly from your SplittableDoFn.process function
>>>
>>> This is not true. The code in question is below. stop() is only returned
>>> if tryClaim returns false.
>>>
>>> -Dan
>>>
>>> ```
>>> if (!tracker.tryClaim(tracker.currentRestriction().getFrom())) {
>>>
>>   logger.atWarning().log("Failed to claim initial restriction for
>>> partition " + partition);
>>>   return ProcessContinuation.stop();
>>> }
>>> sleepTimeRemaining = maxSleepTime;
>>> Committer committer = committerSupplier.apply(partition);
>>> committer.startAsync().awaitRunning();
>>> try (PullSubscriber subscriber =
>>> subscriberFactory.apply(partition,
>>> Offset.of(tracker.currentRestriction().getFrom( {
>>>   while (true) {
>>> List messages = doPoll(subscriber);
>>> // We polled for as long as possible, yield to the runtime to
>>> allow it to reschedule us on
>>> // a new task.
>>> if (messages.isEmpty()) {
>>>   logger.atWarning().log("Yielding due to timeout on partition "
>>> + partition);
>>>   return ProcessContinuation.resume();
>>> }
>>> long lastOffset = Iterables.getLast(messages).offset().value();
>>> if (tracker.tryClaim(lastOffset)) {
>>>   messages.forEach(
>>>   message ->
>>>       receiver.outputWithTimestamp(
>>>   message, new
>>> Instant(Timestamps.toMillis(

Re: CheckDone in unbounded SDF?

2020-11-27 Thread Boyuan Zhang
On Fri, Nov 27, 2020 at 9:34 PM Daniel Collins  wrote:

> Hello Boyuan,
>
> Responses inline.
>
> > The checkDone is invoked by the SDK harness to guarantee that when you
> exit you SplittableDoFn.process you must have completed all the work in the
> current restriction
>
> This is impossible with unbounded restrictions since, by definition, all
> work cannot be completed.
>
Please note that your current restriction might be changed to a finite
restriction during processing one bundle if you do SplittableDoFn
self-initiated checkpointing or any runner issued splits.


>
> > In your case, it seems like after you do tryClaim(3188439), you return
> stop() directly from your SplittableDoFn.process function
>
> This is not true. The code in question is below. stop() is only returned
> if tryClaim returns false.
>
> -Dan
>
> ```
> if (!tracker.tryClaim(tracker.currentRestriction().getFrom())) {
>
  logger.atWarning().log("Failed to claim initial restriction for
> partition " + partition);
>   return ProcessContinuation.stop();
> }
> sleepTimeRemaining = maxSleepTime;
> Committer committer = committerSupplier.apply(partition);
> committer.startAsync().awaitRunning();
> try (PullSubscriber subscriber =
> subscriberFactory.apply(partition,
> Offset.of(tracker.currentRestriction().getFrom( {
>   while (true) {
> List messages = doPoll(subscriber);
> // We polled for as long as possible, yield to the runtime to
> allow it to reschedule us on
> // a new task.
> if (messages.isEmpty()) {
>   logger.atWarning().log("Yielding due to timeout on partition " +
> partition);
>   return ProcessContinuation.resume();
> }
> long lastOffset = Iterables.getLast(messages).offset().value();
> if (tracker.tryClaim(lastOffset)) {
>   messages.forEach(
>   message ->
>   receiver.outputWithTimestamp(
>   message, new
> Instant(Timestamps.toMillis(message.publishTime();
>   committer.commitOffset(Offset.of(lastOffset + 1)).get();
> } else {
>   logger.atWarning().log("Stopping partition " + partition);
>   return ProcessContinuation.stop();
> }
>   }
> } finally {
>   committer.stopAsync().awaitTerminated();
> }
> ```
>
>From your code, what will you do if we reach the finally block? Would you
like to open a WIP PR to show more details?


>
> On Sat, Nov 28, 2020 at 12:22 AM Boyuan Zhang  wrote:
>
>> Hi Daniel,
>>
>> The checkDone is invoked by the SDK harness to guarantee that when you
>> exit you SplittableDoFn.process(either you return stop() or resume()), you
>> must have completed all the work in the current restriction. This is one of
>> major ways for SplittableDoFn to prevent data loss.
>>
>> In your case, it seems like after you do tryClaim(3188439), you return
>> stop() directly from your SplittableDoFn.process function. That's not a
>> correct way when working with restriction and restriction tracker. You
>> should either return resume() to perform SplittableDoFn initiated
>> checkpoint to defer processing restriction [3188439, 9223372036854775807),
>> or you should return stop() only when you have tryClaim() return False.
>>
>>
>> On Fri, Nov 27, 2020 at 9:07 PM Daniel Collins 
>> wrote:
>>
>>> Hello all,
>>>
>>> I'm trying to convert PubSubLiteIO into an SDF. I'm running into the
>>> following error on dataflow with a RestrictionTracker returning UNBOUNDED
>>> to isBounded. It looks like calls are being made to `checkDone`, but
>>> looking at the documentation of `checkDone`, I don't think there's any
>>> rational thing I can do in this case. Does anyone know what should be done
>>> for this method?
>>>
>>> The following exist in the RestrictionTracker javadoc:
>>> -Must throw an exception with an informative error message, if there is
>>> still any unclaimed work remaining in the restriction. (there is, the
>>> restriction is unbounded)
>>> -{@link RestrictionTracker#checkDone} MUST succeed
>>>
>>> Thanks,
>>>
>>> Dan
>>>
>>> "Error message from worker: java.lang.IllegalStateException: Last
>>> attempted offset was 3188438 in range [1998348, 9223372036854775807),
>>> claiming work in [3188439, 9223372036854775807) was no

Re: CheckDone in unbounded SDF?

2020-11-27 Thread Boyuan Zhang
Hi Daniel,

The checkDone is invoked by the SDK harness to guarantee that when you exit
you SplittableDoFn.process(either you return stop() or resume()), you must
have completed all the work in the current restriction. This is one of
major ways for SplittableDoFn to prevent data loss.

In your case, it seems like after you do tryClaim(3188439), you return
stop() directly from your SplittableDoFn.process function. That's not a
correct way when working with restriction and restriction tracker. You
should either return resume() to perform SplittableDoFn initiated
checkpoint to defer processing restriction [3188439, 9223372036854775807),
or you should return stop() only when you have tryClaim() return False.


On Fri, Nov 27, 2020 at 9:07 PM Daniel Collins  wrote:

> Hello all,
>
> I'm trying to convert PubSubLiteIO into an SDF. I'm running into the
> following error on dataflow with a RestrictionTracker returning UNBOUNDED
> to isBounded. It looks like calls are being made to `checkDone`, but
> looking at the documentation of `checkDone`, I don't think there's any
> rational thing I can do in this case. Does anyone know what should be done
> for this method?
>
> The following exist in the RestrictionTracker javadoc:
> -Must throw an exception with an informative error message, if there is
> still any unclaimed work remaining in the restriction. (there is, the
> restriction is unbounded)
> -{@link RestrictionTracker#checkDone} MUST succeed
>
> Thanks,
>
> Dan
>
> "Error message from worker: java.lang.IllegalStateException: Last
> attempted offset was 3188438 in range [1998348, 9223372036854775807),
> claiming work in [3188439, 9223372036854775807) was not attempted
>
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState(Preconditions.java:862)
>
> org.apache.beam.sdk.transforms.splittabledofn.OffsetRangeTracker.checkDone(OffsetRangeTracker.java:117)
>
> com.google.cloud.pubsublite.beam.PubsubLiteOffsetRangeTracker.checkDone(PubsubLiteOffsetRangeTracker.java:60)
>
> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.sdk.fn.splittabledofn.RestrictionTrackers$RestrictionTrackerObserver.checkDone(RestrictionTrackers.java:77)
>
> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.OutputAndTimeBoundedSplittableProcessElementInvoker.invokeProcessElement(OutputAndTimeBoundedSplittableProcessElementInvoker.java:226)
>
> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SplittableParDoViaKeyedWorkItems$ProcessFn.processElement(SplittableParDoViaKeyedWorkItems.java:524)"
>


Re: Implementing an IO Connector for Debezium

2020-11-27 Thread Boyuan Zhang
Yes, I'm having a PR[1] under reviewing to update the IO development guide.

[1] https://github.com/apache/beam/pull/13227
<https://github.com/apache/beam/pull/13227>

On Fri, Nov 27, 2020 at 8:00 AM Alexey Romanenko 
wrote:

> Seems like the documentation about creating new IO connectors [1] is out
> of date and it makes people get confused about the recommended way of
> developing :
>
> *“Splittable DoFn is a new sources framework that is under development and
> will replace the other options for developing bounded and unbounded
> sources.*"
>
> Do you think we need rewrite this section completely according to a large
> progress with moving to SDF-based connectors in last time? Though, it would
> be useful to keep an old (current) one since Source API is still used.
>
>
> [1] https://beam.apache.org/documentation/io/developing-io-overview/
>
> On 25 Nov 2020, at 19:37, Boyuan Zhang  wrote:
>
> +dev 
>
> Hi Bashir,
>
> Most recently we are recommending to use Splittable DoFn[1] to build new
> IO connectors. We have several examples for that in our codebase:
> Java examples:
>
>- Kafka
>
> <https://github.com/apache/beam/blob/571338b0cc96e2e80f23620fe86de5c92dffaccc/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java#L118>
>- An I/O connector for Apache Kafka <https://kafka.apache.org/> (an
>open-source distributed event streaming platform).
>- Watch
>
> <https://github.com/apache/beam/blob/571338b0cc96e2e80f23620fe86de5c92dffaccc/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Watch.java#L787>
>- Uses a polling function producing a growing set of outputs for each input
>until a per-input termination condition is met.
>- Parquet
>
> <https://github.com/apache/beam/blob/571338b0cc96e2e80f23620fe86de5c92dffaccc/sdks/java/io/parquet/src/main/java/org/apache/beam/sdk/io/parquet/ParquetIO.java#L365>
>- An I/O connector for Apache Parquet <https://parquet.apache.org/>
>(an open-source columnar storage format).
>- HL7v2
>
> <https://github.com/apache/beam/blob/6fdde4f4eab72b49b10a8bb1cb3be263c5c416b5/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/HL7v2IO.java#L493>
>- An I/O connector for HL7v2 messages (a clinical messaging format that
>provides data about events that occur inside an organization) part of 
> Google’s
>Cloud Healthcare API <https://cloud.google.com/healthcare>.
>- BoundedSource wrapper
>
> <https://github.com/apache/beam/blob/571338b0cc96e2e80f23620fe86de5c92dffaccc/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java#L248>
>- A wrapper which converts an existing BoundedSource implementation to a
>splittable DoFn.
>- UnboundedSource wrapper
>
> <https://github.com/apache/beam/blob/571338b0cc96e2e80f23620fe86de5c92dffaccc/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java#L432>
>- A wrapper which converts an existing UnboundedSource implementation to a
>splittable DoFn.
>
>
> Python examples:
>
>- BoundedSourceWrapper
>
> <https://github.com/apache/beam/blob/571338b0cc96e2e80f23620fe86de5c92dffaccc/sdks/python/apache_beam/io/iobase.py#L1375>
>- A wrapper which converts an existing BoundedSource implementation to a
>splittable DoFn.
>
>
> [1]
> https://beam.apache.org/documentation/programming-guide/#splittable-dofns
>
> On Wed, Nov 25, 2020 at 8:19 AM Bashir Sadjad  wrote:
>
>> Hi,
>>
>> I have a scenario in which a streaming pipeline should read update
>> messages from MySQL binlog (through Debezium). To implement this pipeline
>> using Beam, I understand there is a KafkaIO which I can use. But I also
>> want to support a local mode in which there is no Kafka and the messages
>> are directly consumed using embedded Debezium because this is a much
>> simpler architecture (no Kafka, ZooKeeper, and Kafka Connect).
>>
>> I did a little bit of search and it seems there is no IO connector for
>> Debezim, hence I have to implement one following this guide
>> <https://beam.apache.org/documentation/io/developing-io-java/>. I wonder:
>>
>> 1) Does this approach make sense or is it better to rely on Kafka even
>> for the local single machine use case?
>>
>> 2) Beside the above guide, is there any simple example IO that I can
>> follow to implement the UnboundedSource/Reader? I have looked at some
>> examples here <https://github.com/apache/beam/tree/master/sdks/java/io> but
>> was wondering if there is a recommended/simple one as a tutorial.
>>
>> Thanks
>>
>> -B
>> P.S. If this is better suited for dev@, please feel free to move it to
>> that list.
>>
>
>


Re: Implementing an IO Connector for Debezium

2020-11-25 Thread Boyuan Zhang
+dev 

Hi Bashir,

Most recently we are recommending to use Splittable DoFn[1] to build new IO
connectors. We have several examples for that in our codebase:
Java examples:

   -

   Kafka
   

   - An I/O connector for Apache Kafka  (an
   open-source distributed event streaming platform).
   -

   Watch
   

   - Uses a polling function producing a growing set of outputs for each input
   until a per-input termination condition is met.
   -

   Parquet
   

   - An I/O connector for Apache Parquet  (an
   open-source columnar storage format).
   -

   HL7v2
   

   - An I/O connector for HL7v2 messages (a clinical messaging format that
   provides data about events that occur inside an organization) part
of Google’s
   Cloud Healthcare API .
   -

   BoundedSource wrapper
   

   - A wrapper which converts an existing BoundedSource implementation to a
   splittable DoFn.
   -

   UnboundedSource wrapper
   

   - A wrapper which converts an existing UnboundedSource implementation to a
   splittable DoFn.


Python examples:

   - BoundedSourceWrapper
   

   - A wrapper which converts an existing BoundedSource implementation to a
   splittable DoFn.


[1]
https://beam.apache.org/documentation/programming-guide/#splittable-dofns

On Wed, Nov 25, 2020 at 8:19 AM Bashir Sadjad  wrote:

> Hi,
>
> I have a scenario in which a streaming pipeline should read update
> messages from MySQL binlog (through Debezium). To implement this pipeline
> using Beam, I understand there is a KafkaIO which I can use. But I also
> want to support a local mode in which there is no Kafka and the messages
> are directly consumed using embedded Debezium because this is a much
> simpler architecture (no Kafka, ZooKeeper, and Kafka Connect).
>
> I did a little bit of search and it seems there is no IO connector for
> Debezim, hence I have to implement one following this guide
> . I wonder:
>
> 1) Does this approach make sense or is it better to rely on Kafka even for
> the local single machine use case?
>
> 2) Beside the above guide, is there any simple example IO that I can
> follow to implement the UnboundedSource/Reader? I have looked at some
> examples here  but
> was wondering if there is a recommended/simple one as a tutorial.
>
> Thanks
>
> -B
> P.S. If this is better suited for dev@, please feel free to move it to
> that list.
>


Re: RabbitMq Read - fail ack message

2020-11-23 Thread Boyuan Zhang
Thanks for updating the stackoverflow thread.

I just filed the jira here: https://issues.apache.org/jira/browse/BEAM-11328
for tracking progress.

On Mon, Nov 23, 2020 at 4:10 PM Rafael Ribeiro  wrote:

> @Boyuan Zhang you make my day happy.
>
> The workaround worked as a charm
>
> Could we raise an issue on JIRA to fix that?
> I also put the answer on stackoverflow for someone else that could have
> the same problem
>
> https://stackoverflow.com/questions/64947718/apache-beam-rabbitmq-read-fail-ack-message-and-exception-raised/64978419#64978419
>
> thanks a lot for your help
>
>
> Em seg., 23 de nov. de 2020 às 19:55, Boyuan Zhang 
> escreveu:
>
>> As a workaround, you can add --experiments=use_deprecated_read when
>> launching your pipeline to bypass the sdf unbounded source wrapper here.
>>
>> On Mon, Nov 23, 2020 at 2:52 PM Boyuan Zhang  wrote:
>>
>>> Hi Rafael,
>>>
>>> As you mentioned, within withMaxNumRecords, the unbounded source will be
>>> executed as the bounded one. It may not be ideal for you.
>>>
>>> It seems like a bug for direct runner and sdf unbounded source wrapper
>>> when doing finalizeCheckpoint. Do you want to file a JIRA on this problem?
>>>
>>> On Mon, Nov 23, 2020 at 2:28 PM Rafael Ribeiro 
>>> wrote:
>>>
>>>>
>>>> Hi,
>>>>>
>>>>> I'm implementing a pipeline to read RabbitMq queue.
>>>>>
>>>>> I'm having problems when I read it at unbound stream
>>>>> it is saying that channel is already closed and ack is not sent to
>>>>> rabbitmq and message still on the queue:
>>>>> 
>>>>> WARNING: Failed to finalize
>>>>> Finalization{expiryTime=2020-11-21T19:33:14.909Z,
>>>>> callback=org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn$$Lambda$378/0x0001007ee440@4ae82af9}
>>>>> for completed bundle CommittedImmutableListBundle{PCollection=Read 
>>>>> RabbitMQ
>>>>> queue/Read(RabbitMQSource)/ParDo(UnboundedSourceAsSDFWrapper)/ParMultiDo(UnboundedSourceAsSDFWrapper)/ProcessKeyedElements/SplittableParDoViaKeyedWorkItems.GBKIntoKeyedWorkItems.out
>>>>> [PCollection],
>>>>> key=org.apache.beam.repackaged.direct_java.runners.local.StructuralKey$CoderStructuralKey@3607f949,
>>>>> elements=[ValueInGlobalWindow{value=ComposedKeyedWorkItem{key=[-55, 41,
>>>>> -123, 97, 13, 104, 92, 61, 92, 122, -19, 112, -90, 16, 7, -97, 89, 107,
>>>>> -80, 12, 9, 120, 10, -97, 72, 114, -62, -105, 101, -34, 96, 48, 30, -96, 
>>>>> 8,
>>>>> -19, 23, -115, -9, 87, 1, -58, -127, 70, -59, -24, -40, -111, -63, -119,
>>>>> 51, -108, 126, 64, -4, -120, -41, 9, 56, -63, -18, -18, -1, 17, -82, 90,
>>>>> -32, 110, 67, -12, -97, 10, -107, -110, 13, -74, -47, -113, 122, 27, 52,
>>>>> 46, -111, -118, -8, 118, -3, 20, 71, -109, 65, -87, -94, 107, 114, 116,
>>>>> -110, -126, -79, -123, -67, 18, -33, 70, -100, 9, -81, -65, -2, 98, 33,
>>>>> -122, -46, 23, -103, -70, 79, -23, 74, 9, 5, -9, 65, -33, -52, 5, 9, 101],
>>>>> elements=[], timers=[TimerData{timerId=1:1605986594072, timerFamilyId=,
>>>>> namespace=Window(org.apache.beam.sdk.transforms.windowing.GlobalWindow@4958d651),
>>>>> timestamp=2020-11-21T19:23:14.072Z,
>>>>> outputTimestamp=2020-11-21T19:23:14.072Z, domain=PROCESSING_TIME}]},
>>>>> pane=PaneInfo.NO_FIRING}], minimumTimestamp=-290308-12-21T19:59:05.225Z,
>>>>> synchronizedProcessingOutputWatermark=2020-11-21T19:23:14.757Z}
>>>>> com.rabbitmq.client.AlreadyClosedException: channel is already closed
>>>>> due to clean channel shutdown; protocol method:
>>>>> #method(reply-code=200, reply-text=OK, class-id=0,
>>>>> method-id=0)
>>>>> at
>>>>> com.rabbitmq.client.impl.AMQChannel.ensureIsOpen(AMQChannel.java:258)
>>>>> at
>>>>> com.rabbitmq.client.impl.AMQChannel.transmit(AMQChannel.java:427)
>>>>> at
>>>>> com.rabbitmq.client.impl.AMQChannel.transmit(AMQChannel.java:421)
>>>>> at
>>>>> com.rabbitmq.client.impl.recovery.RecoveryAwareChannelN.basicAck(RecoveryAwareChannelN.java:93)
>>>>> at
>>>>> com.rabbitmq.client.impl.recovery.AutorecoveringChannel.basicAck(AutorecoveringChannel.java:428)
>>>>> at
>>>>> org

Re: RabbitMq Read - fail ack message

2020-11-23 Thread Boyuan Zhang
As a workaround, you can add --experiments=use_deprecated_read when
launching your pipeline to bypass the sdf unbounded source wrapper here.

On Mon, Nov 23, 2020 at 2:52 PM Boyuan Zhang  wrote:

> Hi Rafael,
>
> As you mentioned, within withMaxNumRecords, the unbounded source will be
> executed as the bounded one. It may not be ideal for you.
>
> It seems like a bug for direct runner and sdf unbounded source wrapper
> when doing finalizeCheckpoint. Do you want to file a JIRA on this problem?
>
> On Mon, Nov 23, 2020 at 2:28 PM Rafael Ribeiro 
> wrote:
>
>>
>> Hi,
>>>
>>> I'm implementing a pipeline to read RabbitMq queue.
>>>
>>> I'm having problems when I read it at unbound stream
>>> it is saying that channel is already closed and ack is not sent to
>>> rabbitmq and message still on the queue:
>>> 
>>> WARNING: Failed to finalize
>>> Finalization{expiryTime=2020-11-21T19:33:14.909Z,
>>> callback=org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn$$Lambda$378/0x0001007ee440@4ae82af9}
>>> for completed bundle CommittedImmutableListBundle{PCollection=Read RabbitMQ
>>> queue/Read(RabbitMQSource)/ParDo(UnboundedSourceAsSDFWrapper)/ParMultiDo(UnboundedSourceAsSDFWrapper)/ProcessKeyedElements/SplittableParDoViaKeyedWorkItems.GBKIntoKeyedWorkItems.out
>>> [PCollection],
>>> key=org.apache.beam.repackaged.direct_java.runners.local.StructuralKey$CoderStructuralKey@3607f949,
>>> elements=[ValueInGlobalWindow{value=ComposedKeyedWorkItem{key=[-55, 41,
>>> -123, 97, 13, 104, 92, 61, 92, 122, -19, 112, -90, 16, 7, -97, 89, 107,
>>> -80, 12, 9, 120, 10, -97, 72, 114, -62, -105, 101, -34, 96, 48, 30, -96, 8,
>>> -19, 23, -115, -9, 87, 1, -58, -127, 70, -59, -24, -40, -111, -63, -119,
>>> 51, -108, 126, 64, -4, -120, -41, 9, 56, -63, -18, -18, -1, 17, -82, 90,
>>> -32, 110, 67, -12, -97, 10, -107, -110, 13, -74, -47, -113, 122, 27, 52,
>>> 46, -111, -118, -8, 118, -3, 20, 71, -109, 65, -87, -94, 107, 114, 116,
>>> -110, -126, -79, -123, -67, 18, -33, 70, -100, 9, -81, -65, -2, 98, 33,
>>> -122, -46, 23, -103, -70, 79, -23, 74, 9, 5, -9, 65, -33, -52, 5, 9, 101],
>>> elements=[], timers=[TimerData{timerId=1:1605986594072, timerFamilyId=,
>>> namespace=Window(org.apache.beam.sdk.transforms.windowing.GlobalWindow@4958d651),
>>> timestamp=2020-11-21T19:23:14.072Z,
>>> outputTimestamp=2020-11-21T19:23:14.072Z, domain=PROCESSING_TIME}]},
>>> pane=PaneInfo.NO_FIRING}], minimumTimestamp=-290308-12-21T19:59:05.225Z,
>>> synchronizedProcessingOutputWatermark=2020-11-21T19:23:14.757Z}
>>> com.rabbitmq.client.AlreadyClosedException: channel is already closed
>>> due to clean channel shutdown; protocol method:
>>> #method(reply-code=200, reply-text=OK, class-id=0,
>>> method-id=0)
>>> at
>>> com.rabbitmq.client.impl.AMQChannel.ensureIsOpen(AMQChannel.java:258)
>>> at
>>> com.rabbitmq.client.impl.AMQChannel.transmit(AMQChannel.java:427)
>>> at
>>> com.rabbitmq.client.impl.AMQChannel.transmit(AMQChannel.java:421)
>>> at
>>> com.rabbitmq.client.impl.recovery.RecoveryAwareChannelN.basicAck(RecoveryAwareChannelN.java:93)
>>> at
>>> com.rabbitmq.client.impl.recovery.AutorecoveringChannel.basicAck(AutorecoveringChannel.java:428)
>>> at
>>> org.apache.beam.sdk.io.rabbitmq.RabbitMqIO$RabbitMQCheckpointMark.finalizeCheckpoint(RabbitMqIO.java:433)
>>> at
>>> org.apache.beam.runners.direct.EvaluationContext.handleResult(EvaluationContext.java:195)
>>> at
>>> org.apache.beam.runners.direct.QuiescenceDriver$TimerIterableCompletionCallback.handleResult(QuiescenceDriver.java:287)
>>> at
>>> org.apache.beam.runners.direct.DirectTransformExecutor.finishBundle(DirectTransformExecutor.java:189)
>>> at
>>> org.apache.beam.runners.direct.DirectTransformExecutor.run(DirectTransformExecutor.java:126)
>>> at
>>> java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
>>> at
>>> java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
>>> at
>>> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
>>> at
>>> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
>>> at java.base/java.lang.Thread.run(Thread.java:834)
>>>   

Re: RabbitMq Read - fail ack message

2020-11-23 Thread Boyuan Zhang
Hi Rafael,

As you mentioned, within withMaxNumRecords, the unbounded source will be
executed as the bounded one. It may not be ideal for you.

It seems like a bug for direct runner and sdf unbounded source wrapper when
doing finalizeCheckpoint. Do you want to file a JIRA on this problem?

On Mon, Nov 23, 2020 at 2:28 PM Rafael Ribeiro  wrote:

>
> Hi,
>>
>> I'm implementing a pipeline to read RabbitMq queue.
>>
>> I'm having problems when I read it at unbound stream
>> it is saying that channel is already closed and ack is not sent to
>> rabbitmq and message still on the queue:
>> 
>> WARNING: Failed to finalize
>> Finalization{expiryTime=2020-11-21T19:33:14.909Z,
>> callback=org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn$$Lambda$378/0x0001007ee440@4ae82af9}
>> for completed bundle CommittedImmutableListBundle{PCollection=Read RabbitMQ
>> queue/Read(RabbitMQSource)/ParDo(UnboundedSourceAsSDFWrapper)/ParMultiDo(UnboundedSourceAsSDFWrapper)/ProcessKeyedElements/SplittableParDoViaKeyedWorkItems.GBKIntoKeyedWorkItems.out
>> [PCollection],
>> key=org.apache.beam.repackaged.direct_java.runners.local.StructuralKey$CoderStructuralKey@3607f949,
>> elements=[ValueInGlobalWindow{value=ComposedKeyedWorkItem{key=[-55, 41,
>> -123, 97, 13, 104, 92, 61, 92, 122, -19, 112, -90, 16, 7, -97, 89, 107,
>> -80, 12, 9, 120, 10, -97, 72, 114, -62, -105, 101, -34, 96, 48, 30, -96, 8,
>> -19, 23, -115, -9, 87, 1, -58, -127, 70, -59, -24, -40, -111, -63, -119,
>> 51, -108, 126, 64, -4, -120, -41, 9, 56, -63, -18, -18, -1, 17, -82, 90,
>> -32, 110, 67, -12, -97, 10, -107, -110, 13, -74, -47, -113, 122, 27, 52,
>> 46, -111, -118, -8, 118, -3, 20, 71, -109, 65, -87, -94, 107, 114, 116,
>> -110, -126, -79, -123, -67, 18, -33, 70, -100, 9, -81, -65, -2, 98, 33,
>> -122, -46, 23, -103, -70, 79, -23, 74, 9, 5, -9, 65, -33, -52, 5, 9, 101],
>> elements=[], timers=[TimerData{timerId=1:1605986594072, timerFamilyId=,
>> namespace=Window(org.apache.beam.sdk.transforms.windowing.GlobalWindow@4958d651),
>> timestamp=2020-11-21T19:23:14.072Z,
>> outputTimestamp=2020-11-21T19:23:14.072Z, domain=PROCESSING_TIME}]},
>> pane=PaneInfo.NO_FIRING}], minimumTimestamp=-290308-12-21T19:59:05.225Z,
>> synchronizedProcessingOutputWatermark=2020-11-21T19:23:14.757Z}
>> com.rabbitmq.client.AlreadyClosedException: channel is already closed due
>> to clean channel shutdown; protocol method:
>> #method(reply-code=200, reply-text=OK, class-id=0,
>> method-id=0)
>> at
>> com.rabbitmq.client.impl.AMQChannel.ensureIsOpen(AMQChannel.java:258)
>> at
>> com.rabbitmq.client.impl.AMQChannel.transmit(AMQChannel.java:427)
>> at
>> com.rabbitmq.client.impl.AMQChannel.transmit(AMQChannel.java:421)
>> at
>> com.rabbitmq.client.impl.recovery.RecoveryAwareChannelN.basicAck(RecoveryAwareChannelN.java:93)
>> at
>> com.rabbitmq.client.impl.recovery.AutorecoveringChannel.basicAck(AutorecoveringChannel.java:428)
>> at
>> org.apache.beam.sdk.io.rabbitmq.RabbitMqIO$RabbitMQCheckpointMark.finalizeCheckpoint(RabbitMqIO.java:433)
>> at
>> org.apache.beam.runners.direct.EvaluationContext.handleResult(EvaluationContext.java:195)
>> at
>> org.apache.beam.runners.direct.QuiescenceDriver$TimerIterableCompletionCallback.handleResult(QuiescenceDriver.java:287)
>> at
>> org.apache.beam.runners.direct.DirectTransformExecutor.finishBundle(DirectTransformExecutor.java:189)
>> at
>> org.apache.beam.runners.direct.DirectTransformExecutor.run(DirectTransformExecutor.java:126)
>> at
>> java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
>> at
>> java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
>> at
>> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
>> at
>> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
>> at java.base/java.lang.Thread.run(Thread.java:834)
>>   
>> BUT
>> if I include  withMaxNumRecords
>> I receive the message and ack is sent to rabbitmq queue
>> but it works as bound data
>>
>>   
>> CODE
>> my code is like below:
>> Pipeline p = Pipeline.create(options);
>>
>>PCollection messages = p.apply("Read RabbitMQ queue",
>> RabbitMqIO.read()
>> .withUri("amqp://guest:guest@localhost:5672")
>> .withQueue("cart.idle.process")
>> //.withMaxNumRecords(1)  // TRANFORM BOUND
>> );
>>
>> PCollection rows = messages.apply("Transform Json to
>> TableRow",
>> ParDo.of(new DoFn() {
>>
>> @ProcessElement
>> public void processElement(ProcessContext c) {
>>
>> ObjectMapper objectMapper = new ObjectMapper();
>> String jsonInString = new 

Re: Proposal: Redis Stream Connector

2020-11-19 Thread Boyuan Zhang
On Thu, Nov 19, 2020 at 1:29 PM Vincent Marquez 
wrote:

>
>
>
> On Thu, Nov 19, 2020 at 10:38 AM Boyuan Zhang  wrote:
>
>> Hi Vincent,
>>
>> Thanks for your contribution! I'm happy to work with you on this when you
>> contribute the code into Beam.
>>
>
> Should I write up a JIRA to start?  I have access, I've already been in
> the process of contributing some big changes to the CassandraIO connector.
>

Yes, please create a JIRA and assign it to yourself.


>
>
>>
>> Another thing is that it would be preferable to use Splittable DoFn
>> <https://beam.apache.org/documentation/programming-guide/#splittable-dofns> 
>> instead
>> of using UnboundedSource to write a new IO.
>>
>
> I would prefer to use the UnboundedSource connector, I've already written
> most of it, but also, I see some challenges using Splittable DoFn for Redis
> streams.
>
> Unlike Kafka and Kinesis, Redis Streams offsets are *not* simply
> monotonically increasing counters, so there is not a way  to just claim a
> chunk of work and know that the chunk has any actual data in it.
>
> Since UnboundedSource is not yet deprecated, could I contribute that after
> finishing up some test aspects, and then perhaps we can implement a
> Splittable DoFn version?
>

It would be nice *not* to build new IOs on top of UnboundedSource.
Currently we already have the wrapper class which translates the existing
UnboundedSource into Unbounded Splittable DoFn and executes the
UnboundedSource as the Splittable DoFn. How about you open a WIP PR and we
go through the UnboundedSource implementation together to figure out a
design for using Splittable DoFn?



>
>
>
>>
>> On Thu, Nov 19, 2020 at 10:18 AM Vincent Marquez <
>> vincent.marq...@gmail.com> wrote:
>>
>>> Currently, Redis offers a streaming queue functionality similar to
>>> Kafka/Kinesis/Pubsub, but Beam does not currently have a connector for it.
>>>
>>> I've written an UnboundedSource connector that makes use of Redis
>>> Streams as a POC and it seems to work well.
>>>
>>> If someone is willing to work with me, I could write up a JIRA and/or
>>> open up a WIP pull request if there is interest in getting this as an
>>> official connector.  I would mostly need guidance on naming/testing aspects.
>>>
>>> https://redis.io/topics/streams-intro
>>>
>>> *~Vincent*
>>>
>>
> ~Vincent
>


Re: Proposal: Redis Stream Connector

2020-11-19 Thread Boyuan Zhang
Hi Vincent,

Thanks for your contribution! I'm happy to work with you on this when you
contribute the code into Beam.

Another thing is that it would be preferable to use Splittable DoFn

instead
of using UnboundedSource to write a new IO.

On Thu, Nov 19, 2020 at 10:18 AM Vincent Marquez 
wrote:

> Currently, Redis offers a streaming queue functionality similar to
> Kafka/Kinesis/Pubsub, but Beam does not currently have a connector for it.
>
> I've written an UnboundedSource connector that makes use of Redis Streams
> as a POC and it seems to work well.
>
> If someone is willing to work with me, I could write up a JIRA and/or open
> up a WIP pull request if there is interest in getting this as an official
> connector.  I would mostly need guidance on naming/testing aspects.
>
> https://redis.io/topics/streams-intro
>
> *~Vincent*
>


Re: Cross language pipeline example

2020-11-11 Thread Boyuan Zhang
I was taking KafkaIO as an example of x-lang when I was doing something
related:

   - python transform:
   
https://github.com/apache/beam/blob/e1fdb9884898a673285b3b932fa99ae019b9c7b5/sdks/python/apache_beam/io/kafka.py#L110
   - java transform:
   
https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L559
   - python pipeline:
   
https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/external/xlang_kafkaio_it_test.py#L87


On Wed, Nov 11, 2020 at 11:07 AM Ke Wu  wrote:

> Hello,
>
> Is there an example demonstrating how a cross language pipeline look like?
> e.g. a pipeline where it is composes of Java and Python code/transforms.
>
> Best,
> Ke


Re: Self-checkpoint Support on Portable Flink

2020-10-23 Thread Boyuan Zhang
Hi there,

I just updated the doc
<https://docs.google.com/document/d/1372B7HYxtcUYjZOnOM7OBTfSJ4CyFg_gaPD_NUxWClo/edit?usp=sharing>
with
implementation details and opened PR13105
<https://github.com/apache/beam/pull/13105> for review.

Thanks for your help!

On Wed, Oct 14, 2020 at 3:40 AM Maximilian Michels  wrote:

> Duplicates cannot happen because the state of all operators will be
> rolled back to the latest checkpoint, in case of failures.
>
> On 14.10.20 06:31, Reuven Lax wrote:
> > Does this mean that we have to deal with duplicate messages over the
> > back edge? Or will that not happen, since duplicates mean that we rolled
> > back a checkpoint.
> >
> > On Tue, Oct 13, 2020 at 2:59 AM Maximilian Michels  > <mailto:m...@apache.org>> wrote:
> >
> > There would be ways around the lack of checkpointing in cycles, e.g.
> > buffer and backloop only after checkpointing is complete, similarly
> how
> > we implement @RequiresStableInput in the Flink Runner.
> >
> > -Max
> >
> > On 07.10.20 04:05, Reuven Lax wrote:
> >  > It appears that there's a proposal
> >  >
> > (
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-16%3A+Loop+Fault+Tolerance
> > <
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-16%3A+Loop+Fault+Tolerance
> >
> >
> >  >
> > <
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-16%3A+Loop+Fault+Tolerance
> > <
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-16%3A+Loop+Fault+Tolerance
> >>)
> >
> >  > and an abandoned PR to fix this, but AFAICT this remains a
> > limitation of
> >  > Flink. If Flink can't guarantee processing of records on back
> > edges, I
> >  > don't think we can use cycles, as we might otherwise lose the
> > residuals.
> >  >
> >  > On Tue, Oct 6, 2020 at 6:16 PM Reuven Lax  > <mailto:re...@google.com>
> >  > <mailto:re...@google.com <mailto:re...@google.com>>> wrote:
> >  >
> >  > This is what I was thinking of
> >  >
> >  > "Flink currently only provides processing guarantees for jobs
> >  > without iterations. Enabling checkpointing on an iterative job
> >  > causes an exception. In order to force checkpointing on an
> > iterative
> >  > program the user needs to set a special flag when enabling
> >  > checkpointing:|env.enableCheckpointing(interval,
> >  > CheckpointingMode.EXACTLY_ONCE, force = true)|.
> >  >
> >  > Please note that records in flight in the loop edges (and the
> > state
> >  > changes associated with them) will be lost during failure."
> >  >
> >  >
> >  >
> >  >
> >  >
> >  >
> >  > On Tue, Oct 6, 2020 at 5:44 PM Boyuan Zhang
> > mailto:boyu...@google.com>
> >  > <mailto:boyu...@google.com <mailto:boyu...@google.com>>>
> wrote:
> >  >
> >  > Hi Reuven,
> >  >
> >  > As Luke mentioned, at least there are some limitations
> around
> >  > tracking watermark with flink cycles. I'm going to use
> > State +
> >  > Timer without flink cycle to support self-checkpoint. For
> >  > dynamic split, we can either explore flink cycle approach
> or
> >  > limit depth approach.
> >  >
> >  > On Tue, Oct 6, 2020 at 5:33 PM Reuven Lax
> > mailto:re...@google.com>
> >  > <mailto:re...@google.com <mailto:re...@google.com>>>
> wrote:
> >  >
> >  > Aren't there some limitations associated with flink
> > cycles?
> >  > I seem to remember various features that could not be
> > used.
> >  > I'm assuming that watermarks are not supported across
> >  > cycles, but is there anything else?
> >  >
> >  > On Tue, Oct 6, 2020 at 7:12 AM Maximilian Michels
> >  > mailto:m...@apache.org>
> > <mailto:m...@apache.org <mailto:m...@apache.org>>> wrote:
> >  >
> >  > Thanks for starting the conversation. The two
> > approaches
> >  > both look good
&g

Re: Self-checkpoint Support on Portable Flink

2020-10-06 Thread Boyuan Zhang
Hi Reuven,

As Luke mentioned, at least there are some limitations around tracking
watermark with flink cycles. I'm going to use State + Timer without flink
cycle to support self-checkpoint. For dynamic split, we can either explore
flink cycle approach or limit depth approach.

On Tue, Oct 6, 2020 at 5:33 PM Reuven Lax  wrote:

> Aren't there some limitations associated with flink cycles? I seem to
> remember various features that could not be used. I'm assuming that
> watermarks are not supported across cycles, but is there anything else?
>
> On Tue, Oct 6, 2020 at 7:12 AM Maximilian Michels  wrote:
>
>> Thanks for starting the conversation. The two approaches both look good
>> to me. Probably we want to start with approach #1 for all Runners to be
>> able to support delaying bundles. Flink supports cycles and thus
>> approach #2 would also be applicable and could be used to implement
>> dynamic splitting.
>>
>> -Max
>>
>> On 05.10.20 23:13, Luke Cwik wrote:
>> > Thanks Boyuan, I left a few comments.
>> >
>> > On Mon, Oct 5, 2020 at 11:12 AM Boyuan Zhang > > <mailto:boyu...@google.com>> wrote:
>> >
>> > Hi team,
>> >
>> > I'm looking at adding self-checkpoint support to portable Flink
>> > runner(BEAM-10940
>> > <https://issues.apache.org/jira/browse/BEAM-10940>) for both batch
>> > and streaming. I summarized the problem that we want to solve and
>> > proposed 2 potential approaches in this doc
>> > <
>> https://docs.google.com/document/d/1372B7HYxtcUYjZOnOM7OBTfSJ4CyFg_gaPD_NUxWClo/edit?usp=sharing
>> >.
>> >
>> > I want to collect feedback on which approach is preferred and
>> > anything that I have not taken into consideration yet but I should.
>> > Many thanks to all your help!
>> >
>> > Boyuan
>> >
>>
>


Re: [DISCUSS] Clearing timers (https://github.com/apache/beam/pull/12836)

2020-09-18 Thread Boyuan Zhang
Hi Reuven,

Would you like to share the links to potential fixes? We can figure out
what we can do there.

On Fri, Sep 18, 2020 at 4:21 PM Reuven Lax  wrote:

>
>
> On Fri, Sep 18, 2020 at 3:14 PM Luke Cwik  wrote:
>
>> PR 12836[1] is adding support for clearing timers and there is a
>> discussion about what the semantics for a cleared timer should be.
>>
>> So far we have:
>> 1) Clearing an unset timer is a no-op
>> 2) If the last action on the timer was to clear it, then a future bundle
>> should not see it fire
>>
>> Ambiguity occurs if the last action on a timer was to clear it within the
>> same bundle then should the current bundle not see it fire if it has yet to
>> become visible to the user? Since element processing and timer firings are
>> "unordered", this can happen.
>>
>> Having the clear prevent the timer from firing within the same bundle if
>> it has yet to fire could make sense and simplifies clearing timer loops.
>> For example:
>>
>> @ProcessElement
>> process(ProcessContext c) {
>>   if (initialCondition) {
>> setTimer();
>>   } else {
>> clearTimer();
>>   }
>> }
>>
>> @OnTimer
>> onTimer(...) {
>>   do some side effect
>>   set timer to fire again in the future
>> }
>>
>> would require logic within the onTimer() method to check to see if we
>> should stop instead of relying on the fact that the clear will prevent the
>> timer loop.
>>
>> On the other hand, we currently don't prevent timers from firing that are
>> eligible within the same bundle if their firing time is changed within the
>> bundle to some future time. Clearing timers could be treated conceptually
>> like setting them to "infinity" and hence the current set logic would
>> suggest that we shouldn't prevent timer firings that are part of the same
>> bundle.
>>
>
> This "current" behavior is a bug, and one that has led to some weird
> effects. There have been some PRs attempting to fix it, and I think we
> should prioritize fixing this bug.
>
>
>> Are there additional use cases that we should consider that suggest one
>> approach over the other?
>> What do people think?
>>
>> 1: https://github.com/apache/beam/pull/12836
>>
>


Re: Clear Timer in Java SDK

2020-09-03 Thread Boyuan Zhang
Thanks, Luke!

Do we want to support the API in Java SDK? And what's the common approach
for now in Java SDK to "kind of" clear a timer, like setting the
fireTimestamp to infinity future?

On Thu, Sep 3, 2020 at 3:43 PM Luke Cwik  wrote:

> Java SDK hasn't exposed the ability to remove timers.
>
> On Wed, Sep 2, 2020 at 11:00 AM Boyuan Zhang  wrote:
>
>> Hi team,
>>
>> I'm looking for something similar to timer.clear() from Python SDK[1] in
>> Java SDK but it seems like we haven't exposed clearing timer API from Java
>> Timer. Does Java SDK have another way to clear a timer or we just haven't
>> worked on this API?
>>
>> [1]
>> https://github.com/apache/beam/blob/master/sdks/python/apache_beam/runners/worker/bundle_processor.py#L660-L671
>>
>


Clear Timer in Java SDK

2020-09-02 Thread Boyuan Zhang
Hi team,

I'm looking for something similar to timer.clear() from Python SDK[1] in
Java SDK but it seems like we haven't exposed clearing timer API from Java
Timer. Does Java SDK have another way to clear a timer or we just haven't
worked on this API?

[1]
https://github.com/apache/beam/blob/master/sdks/python/apache_beam/runners/worker/bundle_processor.py#L660-L671


Re: Create External Transform with WindowFn

2020-08-26 Thread Boyuan Zhang
Thanks Cham!

 I just realized that the *beam:window_fn:serialized_**java:v1 *is
introduced by Java *Reshuffle.viaRandomKey()*. But
*Reshuffle.viaRandomKey()* does rewindowed into original window
strategy(which is *GlobalWindows *in my case). Is it expected that we also
check intermediate PCollection rather than only the PCollection that across
the language boundary?

More about my Ptransform:
MyExternalPTransform  -- expand to --  ParDo() -> Reshuffle.viaRandomKey()
-> ParDo() -> WindowInto(FixWindow) -> ParDo() -> output void

 |

  -> ParDo() ->
output PCollection to Python SDK

On Tue, Aug 25, 2020 at 6:29 PM Chamikara Jayalath 
wrote:

> Also it's strange that Java used (beam:window_fn:serialized_java:v1) for
> the URN here instead of "beam:window_fn:fixed_windows:v1" [1] which is
> what is being registered by Python [2]. This seems to be the immediate
> issue. Tracking bug for supporting custom windows is
> https://issues.apache.org/jira/browse/BEAM-10507.
>
> [1]
> https://github.com/apache/beam/blob/master/model/pipeline/src/main/proto/standard_window_fns.proto#L55
> [2]
> https://github.com/apache/beam/blob/bd4df94ae10a7e7b0763c1917746d2faf5aeed6c/sdks/python/apache_beam/transforms/window.py#L449
>
> On Tue, Aug 25, 2020 at 6:07 PM Chamikara Jayalath 
> wrote:
>
>> Pipelines that use external WindowingStrategies might be failing during
>> proto -> object -> proto conversion we do today. This limitation will go
>> away once Dataflow directly starts reading Beam protos. We are working on
>> this now.
>>
>> Thanks,
>> Cham
>>
>> On Tue, Aug 25, 2020 at 5:38 PM Boyuan Zhang  wrote:
>>
>>> Thanks, Robert! I want to add more details on my External PTransform:
>>>
>>> MyExternalPTransform  -- expand to --  ParDo() -> WindowInto(FixWindow)
>>> -> ParDo() -> output void
>>> |
>>> ->
>>> ParDo() -> output PCollection to Python SDK
>>> The full stacktrace:
>>>
>>> INFO:root:Using Java SDK harness container image 
>>> dataflow-dev.gcr.io/boyuanz/java:latest
>>> Starting expansion service at localhost:53569
>>> Aug 13, 2020 7:42:11 PM 
>>> org.apache.beam.sdk.expansion.service.ExpansionService 
>>> loadRegisteredTransforms
>>> INFO: Registering external transforms: [beam:external:java:kafka:read:v1, 
>>> beam:external:java:kafka:write:v1, beam:external:java:jdbc:read_rows:v1, 
>>> beam:external:java:jdbc:write:v1, beam:external:java:generate_sequence:v1]
>>> beam:external:java:kafka:read:v1: 
>>> org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader$$Lambda$8/0x000800b2a440@4ac68d3e
>>> beam:external:java:kafka:write:v1: 
>>> org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader$$Lambda$8/0x000800b2a440@277c0f21
>>> beam:external:java:jdbc:read_rows:v1: 
>>> org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader$$Lambda$8/0x000800b2a440@6073f712
>>> beam:external:java:jdbc:write:v1: 
>>> org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader$$Lambda$8/0x000800b2a440@43556938
>>> beam:external:java:generate_sequence:v1: 
>>> org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader$$Lambda$8/0x000800b2a440@3d04a311
>>> WARNING:apache_beam.options.pipeline_options_validator:Option --zone is 
>>> deprecated. Please use --worker_zone instead.
>>> Aug 13, 2020 7:42:12 PM 
>>> org.apache.beam.sdk.expansion.service.ExpansionService expand
>>> INFO: Expanding 'WriteToKafka' with URN 'beam:external:java:kafka:write:v1'
>>> Aug 13, 2020 7:42:14 PM 
>>> org.apache.beam.sdk.expansion.service.ExpansionService expand
>>> INFO: Expanding 'ReadFromKafka' with URN 'beam:external:java:kafka:read:v1'
>>>
>>> WARNING:root:Make sure that locally built Python SDK docker image has 
>>> Python 3.6 interpreter.
>>> INFO:root:Using Python SDK docker image: 
>>> apache/beam_python3.6_sdk:2.24.0.dev. If the image is not available at 
>>> local, we will try to pull from hub.docker.com
>>> Traceback (most recent call last):
>>>   File "", line 165, in run_filename_as_main
>>>   File "", line 39, in _run_code_in_main
>>

Re: Create External Transform with WindowFn

2020-08-25 Thread Boyuan Zhang
xt)
  File "apache_beam/pipeline.py", line 1266, in from_runner_api
part = context.transforms.get_by_id(transform_id)
  File "apache_beam/runners/pipeline_context.py", line 95, in get_by_id
self._id_to_proto[id], self._pipeline_context)
  File "apache_beam/pipeline.py", line 1272, in from_runner_api
id in proto.outputs.items()
  File "apache_beam/pipeline.py", line 1272, in 
id in proto.outputs.items()
  File "apache_beam/runners/pipeline_context.py", line 95, in get_by_id
self._id_to_proto[id], self._pipeline_context)
  File "apache_beam/pvalue.py", line 217, in from_runner_api
proto.windowing_strategy_id),
  File "apache_beam/runners/pipeline_context.py", line 95, in get_by_id
self._id_to_proto[id], self._pipeline_context)
  File "apache_beam/transforms/core.py", line 2597, in from_runner_api
windowfn=WindowFn.from_runner_api(proto.window_fn, context),
  File "apache_beam/utils/urns.py", line 186, in from_runner_api
parameter_type, constructor = cls._known_urns[fn_proto.urn]
KeyError: 'beam:window_fn:serialized_java:v1'


On Tue, Aug 25, 2020 at 5:12 PM Robert Bradshaw  wrote:

> You should be able to use a WindowInto with any of the common
> windowing operations (e.g. global, fixed, sliding, sessions) in an
> external transform. You should also be able to window into an
> arbitrary WindowFn as long as it produces standards window types, but
> if there's a bug here you could possibly work around it by windowing
> into a more standard windowing fn before returning.
>
> What is the full traceback?
>
> On Tue, Aug 25, 2020 at 5:02 PM Boyuan Zhang  wrote:
> >
> > Hi team,
> >
> > I'm trying to create an External transform in Java SDK, which expands
> into several ParDo and a Window.into(FixWindow). When I use this transform
> in Python SDK, I get an pipeline construction error:
> >
> > apache_beam/utils/urns.py", line 186, in from_runner_api
> > parameter_type, constructor = cls._known_urns[fn_proto.urn]
> > KeyError: 'beam:window_fn:serialized_java:v1'
> >
> > Is it expected that I cannot use a Window.into when building External
> Ptransform? Or do I miss anything here?
> >
> >
> > Thanks for your help!
>


Create External Transform with WindowFn

2020-08-25 Thread Boyuan Zhang
Hi team,

I'm trying to create an External transform in Java SDK, which expands into
several ParDo and a Window.into(FixWindow). When I use this transform in
Python SDK, I get an pipeline construction error:

apache_beam/utils/urns.py", line 186, in from_runner_api
parameter_type, constructor = cls._known_urns[fn_proto.urn]
KeyError: 'beam:window_fn:serialized_java:v1'

Is it expected that I cannot use a Window.into when building External
Ptransform? Or do I miss anything here?


Thanks for your help!


Re: Output timestamp for Python event timers

2020-08-12 Thread Boyuan Zhang
Thanks for your help! I'll take a look at the PR.

On Wed, Aug 12, 2020 at 2:27 AM Maximilian Michels  wrote:

> Thanks for your suggestions!
>
> It makes sense to complete the work on this feature by exposing it in
> the Python API. We can do this as a next step. (There might be questions
> on how to do that exactly)
>
> For now, I'm concerned with getting the semantics right and unblocking
> users from stalling pipelines.
>
> I wasn't aware that processing timers used the input timestamp as the
> timer output timestamp. I've updated the PR accordingly. Please take a
> look: https://github.com/apache/beam/pull/12531
>
> -Max
>
> On 12.08.20 05:03, Luke Cwik wrote:
> > +1 on what Boyuan said. It is important that the defaults for processing
> > time domain differ from the defaults for the event time domain.
> >
> > On Tue, Aug 11, 2020 at 12:36 PM Yichi Zhang  > <mailto:zyi...@google.com>> wrote:
> >
> > +1 to expose set_output_timestamp and enrich python set timer api.
> >
> > On Tue, Aug 11, 2020 at 12:01 PM Boyuan Zhang  > <mailto:boyu...@google.com>> wrote:
> >
> > Hi Maximilian,
> >
> > It makes sense to set  hold_timestamp as fire_timestamp when the
> > fire_timestamp is in the event time domain. Otherwise, the
> > system may advance the watermark incorrectly.
> > I think we can do something similar to Java FnApiRunner[1]:
> >
> >   * Expose set_output_timestamp API to python timer as well
> >   * If set_output_timestamp is not specified and timer is in
> > event domain, we can use fire_timestamp as hold_timestamp
> >   * Otherwise, use input_timestamp as hold_timestamp.
> >
> > What do you think?
> >
> > [1]
> >
> https://github.com/apache/beam/blob/edb42952f6b0aa99477f5c7baca6d6a0d93deb4f/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java#L1433-L1493
> >
> >
> >
> >
> > On Tue, Aug 11, 2020 at 9:00 AM Maximilian Michels
> > mailto:m...@apache.org>> wrote:
> >
> > We ran into problems setting event time timers per-element
> > in the Python
> > SDK. Pipeline progress would stall.
> >
> > Turns out, although the Python SDK does not expose the timer
> > output
> > timestamp feature to the user, it sets the timer output
> > timestamp to the
> > current input timestamp of an element.
> >
> > This will lead to holding back the watermark until the timer
> > fires (the
> > Flink Runner respects the timer output timestamp when
> > advancing the
> > output watermark). We had set the fire timestamp to a
> > timestamp so far
> > in the future, that pipeline progress would completely stall
> > for
> > downstream transforms, due to the held back watermark.
> >
> > Considering that this feature is not even exposed to the
> > user in the
> > Python SDK, I think we should set the default output
> > timestamp to the
> > fire timestamp, and not to the input timestamp. This is also
> > how timer
> > work in the Java SDK.
> >
> > Let me know what you think.
> >
> > -Max
> >
> > PR: https://github.com/apache/beam/pull/12531
> >
>


Re: Output timestamp for Python event timers

2020-08-11 Thread Boyuan Zhang
Hi Maximilian,

It makes sense to set  hold_timestamp as fire_timestamp when the
fire_timestamp is in the event time domain. Otherwise, the system may
advance the watermark incorrectly.
I think we can do something similar to Java FnApiRunner[1]:

   - Expose set_output_timestamp API to python timer as well
   - If set_output_timestamp is not specified and timer is in event domain,
   we can use fire_timestamp as hold_timestamp
   - Otherwise, use input_timestamp as hold_timestamp.

What do you think?

[1]
https://github.com/apache/beam/blob/edb42952f6b0aa99477f5c7baca6d6a0d93deb4f/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java#L1433-L1493




On Tue, Aug 11, 2020 at 9:00 AM Maximilian Michels  wrote:

> We ran into problems setting event time timers per-element in the Python
> SDK. Pipeline progress would stall.
>
> Turns out, although the Python SDK does not expose the timer output
> timestamp feature to the user, it sets the timer output timestamp to the
> current input timestamp of an element.
>
> This will lead to holding back the watermark until the timer fires (the
> Flink Runner respects the timer output timestamp when advancing the
> output watermark). We had set the fire timestamp to a timestamp so far
> in the future, that pipeline progress would completely stall for
> downstream transforms, due to the held back watermark.
>
> Considering that this feature is not even exposed to the user in the
> Python SDK, I think we should set the default output timestamp to the
> fire timestamp, and not to the input timestamp. This is also how timer
> work in the Java SDK.
>
> Let me know what you think.
>
> -Max
>
> PR: https://github.com/apache/beam/pull/12531
>


Re: Unknown accumulator coder error when running cross-language SpannerIO Write

2020-08-04 Thread Boyuan Zhang
Hi Piotr,

Are you using the beam master head to dev? Can you share your code? The
x-lang transform can be tested with Flink runner, where SDF is also
supported, such as
https://github.com/apache/beam/blob/master/sdks/python/apache_beam/runners/portability/flink_runner_test.py#L205-L261

On Tue, Aug 4, 2020 at 9:42 AM Piotr Szuberski 
wrote:

> Is there a simple way to register the splittable dofn for cross-language
> usage? It's a bit a black box to me right now.
>
> The most meaningful logs for Flink are the ones I pasted and the following:
>
> apache_beam.utils.subprocess_server: INFO: b'[grpc-default-executor-0]
> WARN org.apache.beam.runners.jobsubmission.InMemoryJobService - Encountered
> Unexpected Exception during validation'
> apache_beam.utils.subprocess_server: INFO: b'java.lang.RuntimeException:
> Failed to validate transform ref_AppliedPTransform_Write to Spanner/Write
> mutations to Cloud Spanner/Schema
> View/Combine.GloballyAsSingletonView/Combine.globally(Singleton)/Combine.perKey(Singleton)_31'
>
> and a shortened oneline message:
> [...] DEBUG: Stages: ['ref_AppliedPTransform_Generate input/Impulse_3\n
> Generate input/Impulse:beam:transform:impulse:v1\n  must follow: \n
> downstream_side_inputs: ', 'ref_AppliedPTransform_Generate
> input/FlatMap()_4\n  Generate input/FlatMap( at core.py:2826>):beam:transform:pardo:v1\n  must follow: \n
> downstream_side_inputs: ', 'ref_AppliedPTransform_Generate
> input/Map(decode)_6\n [...]
>
> On 2020/08/03 23:40:42, Brian Hulette  wrote:
> > The DirectRunner error looks like it's because the FnApiRunner doesn't
> > support SDF.
> >
> > What is the coder id for the Flink error? It looks like the full stack
> > trace should contain it.
> >
> > On Mon, Aug 3, 2020 at 10:09 AM Piotr Szuberski <
> piotr.szuber...@polidea.com>
> > wrote:
> >
> > > I'm Writing SpannerIO.Write cross-language transform and when I try to
> run
> > > it from python I receive errors:
> > >
> > > On Flink:
> > > apache_beam.utils.subprocess_server: INFO: b'Caused by:
> > > java.lang.IllegalArgumentException: Transform external_1HolderCoder
> uses
> > > unknown accumulator coder id %s'
> > > apache_beam.utils.subprocess_server: INFO: b'\tat
> > >
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument(Preconditions.java:216)'
> > > apache_beam.utils.subprocess_server: INFO: b'\tat
> > >
> org.apache.beam.runners.core.construction.graph.PipelineValidator.validateCombine(PipelineValidator.java:273)'
> > >
> > > On DirectRunner:
> > >   File
> > >
> "/Users/piotr/beam/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py",
> > > line 181, in run_via_runner_api
> > > self._validate_requirements(pipeline_proto)
> > >   File
> > >
> "/Users/piotr/beam/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py",
> > > line 264, in _validate_requirements
> > > raise ValueError(
> > > ValueError: Missing requirement declaration:
> > > {'beam:requirement:pardo:splittable_dofn:v1'}
> > >
> > > I suppose that SpannerIO.Write uses a transform that cannot be
> translated
> > > in cross-language usage? I'm not sure whether there is something I can
> do
> > > about it.
> > >
> > >
> >
>


Re: Needed help identifying a error in running a SDF

2020-08-04 Thread Boyuan Zhang
Hi Mayank,

Which runner do you want to run your pipeline? You should add 'beam_fn_api'
when you launch the pipeline --experiments=beam_fn_api.
In your code:

class TestDoFn(beam.DoFn):
def process(
self,
element,
restriction_tracker=beam.DoFn.RestrictionParam(
TestProvider())):
import pdb; pdb.set_trace()
cur = restriction_tracker.current_restriction().start
while restriction_tracker.try_claim(cur):
  return element -> yield element; cur += 1



On Tue, Aug 4, 2020 at 11:07 AM Mayank Ketkar  wrote:

> Hello Team,
>
> I was hoping to get anyones help with an error I'm encountering in
> running SDF.
>
> Posted the question imn stack overflow (includes code)
>
> https://stackoverflow.com/questions/63252327/error-in-running-apache-beam-python-splittabledofn
>
> However I am receiving a error
> RuntimeError: Transform node
> AppliedPTransform(ParDo(TestDoFn)/ProcessKeyedElements/GroupByKey/GroupByKey,
> _GroupByKeyOnly) was not replaced as expected.
>
> when trying to apply a SDF to a pubsubIO source
>
> Thanks in advance!! Really!!
>
> Mayank
>


Re: KafkaIO sending KafkaRecords in CrossLanguage - where is the coder registered?

2020-07-16 Thread Boyuan Zhang
Hi Piotr,

X-Lang uses TypedWithoutMetadata
,
which outputs the KV directly instead of KafkaRecord: see here
.
Given the limit that x-lang can only work with well-known coders, if you
want to process the KV in python output from KafkaIO, the coders of key and
value should be well-known in beam. By default, the key and value are
bytes: see here

.

On Thu, Jul 16, 2020 at 8:48 AM Piotr Szuberski 
wrote:

> I'm writing a python wrappers for KinesisIO and I encountered a problem
> that Read transform creates a PCollection with KinesisRecord class which's
> coder by default is assigned as 'beam:coders:javasdk:0.1'. I managed to
> register this coder using CoderTranslatorRegistrar which adds the coder to
> the KNOWN_CODER_URNS and therefore is sent with my custom urn.
>
> Kafka's cross language Write transform uses KV<>, is  encoded by default
> in beam.
>
> But I can't see how KafkaRecordCoder is translated in cross-language usage
> to python? I can't see any place in code where it gets registered.
>
> I just don't get how KafkaIO.Read works in cross-language. Could someone
> clarify me how does it work?
>
> Thanks in advance!
>


Re: [DISCUSS] ReadAll pattern and consistent use in IO connectors

2020-06-29 Thread Boyuan Zhang
It seems like most of us agree on the idea that ReadAll should read from
Read. I'm going to update the Kafka ReadAll with the same pattern.
Thanks for all your help!

On Fri, Jun 26, 2020 at 12:12 PM Chamikara Jayalath 
wrote:

>
>
> On Fri, Jun 26, 2020 at 11:49 AM Luke Cwik  wrote:
>
>> I would also like to suggest that transforms that implement ReadAll via
>> Read should also provide methods like:
>>
>> // Uses the specified values if unspecified in the input element from the
>> PCollection.
>> withDefaults(Read read);
>> // Uses the specified values regardless of what the input element from
>> the PCollection specifies.
>> withOverrides(Read read);
>>
>> and only adds methods that are required at construction time (e.g.
>> coders). This way the majority of documentation sits on the Read transform.
>>
>
> +0 from me. Sounds like benefits outweigh the drawbacks here and some of
> the drawbacks related to cross-language can be overcome through future
> advancements.
> Thanks for bringing this up Ismaël.
>
> - Cham
>
>
>>
>> On Fri, Jun 26, 2020 at 9:58 AM Luke Cwik  wrote:
>>
>>> Ismael, it is good to hear that using Read as the input didn't have a
>>> bunch of parameters that were being skipped/ignored. Also, for the
>>> polymorphism issue you have to rely on the user correctly telling you the
>>> type in such a way where it is a common ancestor of all the runtime types
>>> that will ever be used. This usually boils down to something like
>>> Serializable or DynamicMessage such that the coder that is chosen works for
>>> all the runtime types. Using multiple types is a valid use case and would
>>> allow for a simpler graph with less flattens merging the output from
>>> multiple sources.
>>>
>>> Boyuan, as you have mentioned we can have a coder for KafkaIO.Read which
>>> uses schemas even if some of the parameters can't be represented in a
>>> meaningful way beyond "bytes". This would be helpful for cross language as
>>> well since every parameter would become available if a language could
>>> support it (e.g. it could serialize a java function up front and keep it
>>> saved as raw bytes within said language). Even if we figure out a better
>>> way to do this in the future, we'll have to change the schema for the new
>>> way anyway. This would mean that the external version of the transform
>>> adopts Row to Read and we drop KafkaSourceDescriptor. The conversion from
>>> Row to Read could validate that the parameters make sense (e.g. the bytes
>>> are valid serialized functions). The addition of an
>>> endReadTime/endReadOffset would make sense for KafkaIO.Read as well and
>>> this would enable having a bounded version that could be used for backfills
>>> (this doesn't have to be done as part of any current ongoing PR).
>>> Essentially any parameter that could be added for a single instance of a
>>> Kafka element+restriction would also make sense to the KafkaIO.Read
>>> transform since it too is a single instance. There are parameters that
>>> would apply to the ReadAll that wouldn't apply to a read and these would be
>>> global parameters across all element+restriction pairs such as config
>>> overrides or default values.
>>>
>>> I am convinced that we should do as Ismael is suggesting and use
>>> KafkaIO.Read as the type.
>>>
>>>
>>> On Thu, Jun 25, 2020 at 6:00 PM Chamikara Jayalath 
>>> wrote:
>>>
>>>> Discussion regarding cross-language transforms is a slight tangent
>>>> here. But I think, in general, it's great if we can use existing transforms
>>>> (for example, IO connectors) as cross-language transforms without having to
>>>> build more composites (irrespective of whether in ExternalTransformBuilders
>>>> or a user pipelines) just to make them cross-language compatible. A future
>>>> cross-language compatible SchemaCoder might help (assuming that works for
>>>> Read transform) but I'm not sure we have a good idea when we'll get to that
>>>> state.
>>>>
>>>> Thanks,
>>>> Cham
>>>>
>>>> On Thu, Jun 25, 2020 at 3:13 PM Boyuan Zhang 
>>>> wrote:
>>>>
>>>>> For unbounded SDF in Kafka, we also consider the upgrading/downgrading
>>>>> compatibility in the pipeline update scenario(For detailed discussion,
>>>>> please refer to
>>>>> https://lists.apache.org/thread.html/raf073b8741

Re: [DISCUSS] ReadAll pattern and consistent use in IO connectors

2020-06-25 Thread Boyuan Zhang
 case
> we have not explored so far. I think one easy way to see the limitations
> would
> be in the ongoing KafkaIO SDF based implementation to try to map
> KafkaSourceDescriptor to do the extra PCollection and the Read logic
> on
> the ReadAll with the SDF to see which constraints we hit, the polymorphic
> ones
> will be there for sure, maybe others will appear (not sure). However it
> would be
> interesting to see if we have a real gain in the maintenance points, but
> well
> let’s not forget also that KafkaIO has a LOT of knobs so probably the
> generic
> implementation could be relatively complex.
>
>
>
> On Thu, Jun 25, 2020 at 6:30 PM Luke Cwik  wrote:
> >
> > I had mentioned that approach 1 and approach 2 work for cross language.
> The difference being that the cross language transform would take a well
> known definition and convert it to the Read transform. A normal user would
> have a pipeline that would look like:
> > 1: PCollection -> PTransform(ReadAll) -> PCollection
> > 2: PCollection -> PTransform(ReadAll) ->
> PCollection
> >
> > And in the cross language case this would look like:
> > 1: PCollection -> PTransform(Convert Row to
> Read) -> PCollection -> PTransform(ReadAll) -> PCollection
> > 2: PCollection -> PTransform(Convert Row to
> SourceDescriptor) -> PCollection -> PTransform(ReadAll)
> -> PCollection*
> > * note that PTransform(Convert Row to SourceDescriptor) only exists
> since we haven't solved how to use schemas with language bound types in a
> cross language way. SchemaCoder isn't portable but RowCoder is which is why
> the conversion step exists. We could have a solution for this at some point
> in time.
> >
> > My concern with using Read was around:
> > a) Do all properties set on a Read apply to the ReadAll? For example,
> the Kafka Read implementation allows you to set the key and value
> deserializers which are also used to dictate the output PCollection type.
> It also allows you to set how the watermark should be computed. Technically
> a user may want the watermark computation to be configurable per Read and
> they may also want an output type which is polymorphic (e.g.
> PCollection).
> > b) Read extends PTransform which brings its own object modelling
> concerns.
> >
> > During the implementations of ReadAll(PCollection), was it
> discovered that some properties became runtime errors or were ignored if
> they were set? If no, then the code deduplication is likely worth it
> because we also get a lot of javadoc deduplication, but if yes is this an
> acceptable user experience?
> >
> >
> > On Thu, Jun 25, 2020 at 7:55 AM Alexey Romanenko <
> aromanenko@gmail.com> wrote:
> >>
> >> I believe that the initial goal of unifying ReadAll as a general
> "PTransform, PCollection>” was to reduce the
> amount of code duplication and error-prone approach related to this. It
> makes much sense since usually we have all needed configuration set in Read
> objects and, as Ismaeil mentioned, ReadAll will consist mostly of only
> Split-Shuffle-Read stages.  So this case usually can be unified by using
> PCollection as input.
> >>
> >> On the other hand, we have another need to use Java IOs as
> cross-language transforms (as Luke described) which seems only partly in
> common with previous pattern of ReadAll using.
> >>
> >> I’d be more in favour to have only one concept of read configuration
> for all needs but seems it’s not easy and I’d be more in favour with Luke
> and Boyuan approach with schema. Though, maybe ReadAll is not a very
> suitable name in this case because it will can bring some confusions
> related to previous pattern of ReadAll uses.
> >>
> >> On 25 Jun 2020, at 05:00, Boyuan Zhang  wrote:
> >>
> >> Sorry for the typo. I mean I think we can go with (3) and (4): use the
> data type that is schema-aware as the input of ReadAll.
> >>
> >> On Wed, Jun 24, 2020 at 7:42 PM Boyuan Zhang 
> wrote:
> >>>
> >>> Thanks for the summary, Cham!
> >>>
> >>> I think we can go with (2) and (4): use the data type that is
> schema-aware as the input of ReadAll.
> >>>
> >>> Converting Read into ReadAll helps us to stick with SDF-like IO. But
> only having  (3) is not enough to solve the problem of using ReadAll in
> x-lang case.
> >>>
> >>> The key point of ReadAll is that the input type of ReadAll should be
> able to cross language boundaries and have compatibilities of
> updating/downgrading. After investigating some possibilities(pure java 

Re: [DISCUSS] ReadAll pattern and consistent use in IO connectors

2020-06-24 Thread Boyuan Zhang
Sorry for the typo. I mean I think we can go with *(3)* and (4): use the
data type that is schema-aware as the input of ReadAll.

On Wed, Jun 24, 2020 at 7:42 PM Boyuan Zhang  wrote:

> Thanks for the summary, Cham!
>
> I think we can go with (2) and (4): use the data type that is schema-aware
> as the input of ReadAll.
>
> Converting Read into ReadAll helps us to stick with SDF-like IO. But only
> having  (3) is not enough to solve the problem of using ReadAll in x-lang
> case.
>
> The key point of ReadAll is that the input type of ReadAll should be able
> to cross language boundaries and have compatibilities of
> updating/downgrading. After investigating some possibilities(pure java pojo
> with custom coder, protobuf, row/schema) in Kafka usage, we find that
> row/schema fits our needs most. Here comes (4). I believe that using Read
> as input of ReadAll makes sense in some cases, but I also think not all IOs
> have the same need. I would treat Read as a special type as long as the
> Read is schema-aware.
>
> On Wed, Jun 24, 2020 at 6:34 PM Chamikara Jayalath 
> wrote:
>
>> I see. So it seems like there are three options discussed so far when it
>> comes to defining source descriptors for ReadAll type transforms
>>
>> (1) Use Read PTransform as the element type of the input PCollection
>> (2) Use a POJO that describes the source as the data element of the input
>> PCollection
>> (3) Provide a converter as a function to the Read transform which
>> essentially will convert it to a ReadAll (what Eugene mentioned)
>>
>> I feel like (3) is more suitable for a related set of source descriptions
>> such as files.
>> (1) will allow most code-reuse but seems like will make it hard to use
>> the ReadAll transform as a cross-language transform and will break the
>> separation of construction time and runtime constructs
>> (2) could result to less code reuse if not careful but will make the
>> transform easier to be used as a cross-language transform without
>> additional modifications
>>
>> Also, with SDF, we can create ReadAll-like transforms that are more
>> efficient. So we might be able to just define all sources in that format
>> and make Read transforms just an easy to use composite built on top of that
>> (by adding a preceding Create transform).
>>
>> Thanks,
>> Cham
>>
>> On Wed, Jun 24, 2020 at 11:10 AM Luke Cwik  wrote:
>>
>>> I believe we do require PTransforms to be serializable since anonymous
>>> DoFns typically capture the enclosing PTransform.
>>>
>>> On Wed, Jun 24, 2020 at 10:52 AM Chamikara Jayalath <
>>> chamik...@google.com> wrote:
>>>
>>>> Seems like Read in PCollection refers to a transform, at least
>>>> here:
>>>> https://github.com/apache/beam/blob/master/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseIO.java#L353
>>>>
>>>> I'm in favour of separating construction time transforms from execution
>>>> time data objects that we store in PCollections as Luke mentioned. Also, we
>>>> don't guarantee that PTransform is serializable so users have the
>>>> additional complexity of providing a corder whenever a PTransform is used
>>>> as a data object.
>>>> Also, agree with Boyuan that using simple Java objects that are
>>>> convertible to Beam Rows allow us to make these transforms available to
>>>> other SDKs through the cross-language transforms. Using transforms or
>>>> complex sources as data objects will probably make this difficult.
>>>>
>>>> Thanks,
>>>> Cham
>>>>
>>>>
>>>>
>>>> On Wed, Jun 24, 2020 at 10:32 AM Boyuan Zhang 
>>>> wrote:
>>>>
>>>>> Hi Ismael,
>>>>>
>>>>> I think the ReadAll in the IO connector refers to the IO with SDF
>>>>> implementation despite the type of input, where Read refers to
>>>>> UnboundedSource.  One major pushback of using KafkaIO.Read as source
>>>>> description is that not all configurations of KafkaIO.Read are meaningful
>>>>> to populate during execution time. Also when thinking about x-lang useage,
>>>>> making source description across language boundaries is also necessary.  
>>>>> As
>>>>> Luke mentioned, it's quite easy to infer a Schema from an AutoValue 
>>>>> object:
>>>>> KafkaSourceDescription.java
>>>>> <https://github.com/boyuanzz/beam/blob/kafka/sdks/java/io/kafka/src/main/ja

Re: [DISCUSS] ReadAll pattern and consistent use in IO connectors

2020-06-24 Thread Boyuan Zhang
Thanks for the summary, Cham!

I think we can go with (2) and (4): use the data type that is schema-aware
as the input of ReadAll.

Converting Read into ReadAll helps us to stick with SDF-like IO. But only
having  (3) is not enough to solve the problem of using ReadAll in x-lang
case.

The key point of ReadAll is that the input type of ReadAll should be able
to cross language boundaries and have compatibilities of
updating/downgrading. After investigating some possibilities(pure java pojo
with custom coder, protobuf, row/schema) in Kafka usage, we find that
row/schema fits our needs most. Here comes (4). I believe that using Read
as input of ReadAll makes sense in some cases, but I also think not all IOs
have the same need. I would treat Read as a special type as long as the
Read is schema-aware.

On Wed, Jun 24, 2020 at 6:34 PM Chamikara Jayalath 
wrote:

> I see. So it seems like there are three options discussed so far when it
> comes to defining source descriptors for ReadAll type transforms
>
> (1) Use Read PTransform as the element type of the input PCollection
> (2) Use a POJO that describes the source as the data element of the input
> PCollection
> (3) Provide a converter as a function to the Read transform which
> essentially will convert it to a ReadAll (what Eugene mentioned)
>
> I feel like (3) is more suitable for a related set of source descriptions
> such as files.
> (1) will allow most code-reuse but seems like will make it hard to use the
> ReadAll transform as a cross-language transform and will break the
> separation of construction time and runtime constructs
> (2) could result to less code reuse if not careful but will make the
> transform easier to be used as a cross-language transform without
> additional modifications
>
> Also, with SDF, we can create ReadAll-like transforms that are more
> efficient. So we might be able to just define all sources in that format
> and make Read transforms just an easy to use composite built on top of that
> (by adding a preceding Create transform).
>
> Thanks,
> Cham
>
> On Wed, Jun 24, 2020 at 11:10 AM Luke Cwik  wrote:
>
>> I believe we do require PTransforms to be serializable since anonymous
>> DoFns typically capture the enclosing PTransform.
>>
>> On Wed, Jun 24, 2020 at 10:52 AM Chamikara Jayalath 
>> wrote:
>>
>>> Seems like Read in PCollection refers to a transform, at least
>>> here:
>>> https://github.com/apache/beam/blob/master/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseIO.java#L353
>>>
>>> I'm in favour of separating construction time transforms from execution
>>> time data objects that we store in PCollections as Luke mentioned. Also, we
>>> don't guarantee that PTransform is serializable so users have the
>>> additional complexity of providing a corder whenever a PTransform is used
>>> as a data object.
>>> Also, agree with Boyuan that using simple Java objects that are
>>> convertible to Beam Rows allow us to make these transforms available to
>>> other SDKs through the cross-language transforms. Using transforms or
>>> complex sources as data objects will probably make this difficult.
>>>
>>> Thanks,
>>> Cham
>>>
>>>
>>>
>>> On Wed, Jun 24, 2020 at 10:32 AM Boyuan Zhang 
>>> wrote:
>>>
>>>> Hi Ismael,
>>>>
>>>> I think the ReadAll in the IO connector refers to the IO with SDF
>>>> implementation despite the type of input, where Read refers to
>>>> UnboundedSource.  One major pushback of using KafkaIO.Read as source
>>>> description is that not all configurations of KafkaIO.Read are meaningful
>>>> to populate during execution time. Also when thinking about x-lang useage,
>>>> making source description across language boundaries is also necessary.  As
>>>> Luke mentioned, it's quite easy to infer a Schema from an AutoValue object:
>>>> KafkaSourceDescription.java
>>>> <https://github.com/boyuanzz/beam/blob/kafka/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaSourceDescription.java#L41>.
>>>> Then the coder of this schema-aware object will be a SchemaCoder
>>>> <https://github.com/boyuanzz/beam/blob/kafka/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaSourceDescription.java#L84>.
>>>> When crossing language boundaries, it's also easy to convert a Row into the
>>>> source description: Convert.fromRows
>>>> <https://github.com/boyuanzz/beam/blob/kafka/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L1480>
>>>> 

Re: [DISCUSS] ReadAll pattern and consistent use in IO connectors

2020-06-24 Thread Boyuan Zhang
Hi Ismael,

I think the ReadAll in the IO connector refers to the IO with SDF
implementation despite the type of input, where Read refers to
UnboundedSource.  One major pushback of using KafkaIO.Read as source
description is that not all configurations of KafkaIO.Read are meaningful
to populate during execution time. Also when thinking about x-lang useage,
making source description across language boundaries is also necessary.  As
Luke mentioned, it's quite easy to infer a Schema from an AutoValue object:
KafkaSourceDescription.java
.
Then the coder of this schema-aware object will be a SchemaCoder
.
When crossing language boundaries, it's also easy to convert a Row into the
source description: Convert.fromRows

.


On Wed, Jun 24, 2020 at 9:51 AM Luke Cwik  wrote:

> To provide additional context, the KafkaIO ReadAll transform takes a
> PCollection. This KafkaSourceDescriptor is a POJO
> that contains the configurable parameters for reading from Kafka. This is
> different from the pattern that Ismael listed because they take
> PCollection as input and the Read is the same as the Read PTransform
> class used for the non read all case.
>
> The KafkaSourceDescriptor does lead to duplication since parameters used
> to configure the transform have to be copied over to the source descriptor
> but decouples how a transform is specified from the object that describes
> what needs to be done. I believe Ismael's point is that we wouldn't need
> such a decoupling.
>
> Another area that hasn't been discussed and I believe is a non-issue is
> that the Beam Java SDK has the most IO connectors and we would want to use
> the IO implementations within Beam Go and Beam Python. This brings in its
> own set of issues related to versioning and compatibility for the wire
> format and how one parameterizes such transforms. The wire format issue can
> be solved with either approach by making sure that the cross language
> expansion always takes the well known format (whatever it may be) and
> converts it into Read/KafkaSourceDescriptor/... object that is then passed
> to the ReadAll transform. Boyuan has been looking to make the
> KafkaSourceDescriptor have a schema so it can be represented as a row and
> this can be done easily using the AutoValue integration (I don't believe
> there is anything preventing someone from writing a schema row -> Read ->
> row adapter or also using the AutoValue configuration if the transform is
> also an AutoValue).
>
> I would be more for the code duplication and separation of concerns
> provided by using a different object to represent the contents of the
> PCollection from the pipeline construction time PTransform.
>
> On Wed, Jun 24, 2020 at 9:09 AM Eugene Kirpichov 
> wrote:
>
>> Hi Ismael,
>>
>> Thanks for taking this on. Have you considered an approach similar (or
>> dual) to FileIO.write(), where we in a sense also have to configure a
>> dynamic number different IO transforms of the same type (file writes)?
>>
>> E.g. how in this example we configure many aspects of many file writes:
>>
>> transactions.apply(FileIO.writeDynamic()
>>  .by(Transaction::getType)
>>  .via(tx -> tx.getType().toFields(tx),  // Convert the data to be
>> written to CSVSink
>>   type -> new CSVSink(type.getFieldNames()))
>>  .to(".../path/to/")
>>  .withNaming(type -> defaultNaming(type + "-transactions", ".csv"));
>>
>> we could do something similar for many JdbcIO reads:
>>
>> PCollection bars;  // user-specific type from which all the read
>> parameters can be inferred
>> PCollection moos = bars.apply(JdbcIO.readAll()
>>   .fromQuery(bar -> ...compute query for this bar...)
>>   .withMapper((bar, resultSet) -> new Moo(...))
>>   .withBatchSize(bar -> ...compute batch size for this bar...)
>>   ...etc);
>>
>>
>> On Wed, Jun 24, 2020 at 6:53 AM Ismaël Mejía  wrote:
>>
>>> Hello,
>>>
>>> (my excuses for the long email but this requires context)
>>>
>>> As part of the move from Source based IOs to DoFn based ones. One pattern
>>> emerged due to the composable nature of DoFn. The idea is to have a
>>> different
>>> kind of composable reads where we take a PCollection of different sorts
>>> of
>>> intermediate specifications e.g. tables, queries, etc, for example:
>>>
>>> JdbcIO:
>>> ReadAll extends
>>> PTransform, PCollection>
>>>
>>> RedisIO:
>>> ReadAll extends PTransform, PCollection>> String>>>
>>>
>>> HBaseIO:
>>> ReadAll extends PTransform, PCollection>
>>>
>>> These patterns enabled richer use cases like doing multiple queries in
>>> the same
>>> Pipeline, querying based on key patterns or 

Re: Python Cross-language wrappers for Java IOs

2020-06-15 Thread Boyuan Zhang
The change should be schema change, mostly adding new fields.

On Mon, Jun 15, 2020 at 11:32 AM Brian Hulette  wrote:

>
>
> On Mon, Jun 15, 2020 at 11:12 AM Robert Bradshaw 
> wrote:
>
>> On Fri, Jun 12, 2020 at 4:12 PM Brian Hulette 
>> wrote:
>>
>>> > are unknown fields propagated through if the user only reads/modifies
>>> a row?
>>> I'm not sure I understand this question. Are you asking about handling
>>> schema changes?
>>> The wire format includes the number of fields in the schema,
>>> specifically so that we can detect when the schema changes. This is
>>> restricted to added or removed fields at the end of the schema. i.e. if we
>>> receive an element that says it has N more fields than the schema this
>>> coder was created with we assume the pipeline was updated with a schema
>>> that drops the last N fields and ignore the extra fields. Similarly if we
>>> receive an element with N fewer fields than we expect we'll just fill the
>>> last N fields with nulls.
>>> This logic is implemented in Python [1] and Java [2], but it's not
>>> exercised since no runners actually support pipeline update with schema
>>> changes.
>>>
>>> > how does it work in a pipeline update scenario (downgrade / upgrade)?
>>> It's a standard coder with a defined spec [3] and tests in
>>> standard_coders.yaml [4] (although we could certainly use more coverage
>>> there) so I think pipeline update should work fine, unless I'm missing
>>> something.
>>>
>>
>> The big question is whether the pipeline update will be rejected due to
>> the Coder having "changed."
>>
>>
>
> Do you mean changed because the schema has changed, or due to the vagaries
> of Java serialization?
>
>
>> Brian
>>>
>>> [1]
>>> https://github.com/apache/beam/blob/master/sdks/python/apache_beam/coders/row_coder.py#L177-L189
>>> [2]
>>> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/RowCoderGenerator.java#L341-L356
>>> [3]
>>> https://github.com/apache/beam/blob/master/model/pipeline/src/main/proto/beam_runner_api.proto#L833-L864
>>> [4]
>>> https://github.com/apache/beam/blob/master/model/fn-execution/src/main/resources/org/apache/beam/model/fnexecution/v1/standard_coders.yaml#L344-L364
>>>
>>> On Fri, Jun 12, 2020 at 3:32 PM Luke Cwik  wrote:
>>>
>>>> +Boyuan Zhang 
>>>>
>>>> On Fri, Jun 12, 2020 at 3:32 PM Luke Cwik  wrote:
>>>>
>>>>> What is the update / compat story around schemas?
>>>>> * are unknown fields propagated through if the user only
>>>>> reads/modifies a row?
>>>>> * how does it work in a pipeline update scenario (downgrade / upgrade)?
>>>>>
>>>>> Boyuan has been working on a Kafka via SDF source and have been trying
>>>>> to figure out which interchange format to use for the "source descriptors"
>>>>> that feed into the SDF. Some obvious choices are json, avro, proto, and
>>>>> Beam schemas all with their caveats.
>>>>>
>>>>> On Fri, Jun 12, 2020 at 1:32 PM Brian Hulette 
>>>>> wrote:
>>>>>
>>>>>> Thanks! I see there are jiras for SpannerIO and JdbcIO as part of
>>>>>> that. Are you planning on using row coder for them?
>>>>>> If so I want to make sure you're aware of
>>>>>> https://s.apache.org/beam-schema-io (sent to the dev list last week
>>>>>> [1]). +Scott Lukas  will be working on building
>>>>>> out the ideas there this summer. His work could be useful for making 
>>>>>> these
>>>>>> IOs cross-language (and you would get a mapping to SQL out of it without
>>>>>> much more effort).
>>>>>>
>>>>>> Brian
>>>>>>
>>>>>> [1]
>>>>>> https://lists.apache.org/thread.html/rc1695025d41c5dc38cdf7bc32bea0e7421379b1c543c2d82f69aa179%40%3Cdev.beam.apache.org%3E
>>>>>>
>>>>>> On Tue, Jun 2, 2020 at 9:30 AM Piotr Szuberski <
>>>>>> piotr.szuber...@polidea.com> wrote:
>>>>>>
>>>>>>> Sure, I'll do that
>>>>>>>
>>>>>>> On 2020/05/28 17:54:49, Chamikara Jayalath 
>>>>>>> wrote:
>>>>>>> > Great. Thanks for working on this. Can you please add these tasks
>>>>>>> and JIRAs
>>>>>>> > to the cross-language transforms roadmap under "Connector/transform
>>>>>>> > support".
>>>>>>> > https://beam.apache.org/roadmap/connectors-multi-sdk/
>>>>>>> >
>>>>>>> > Happy to help if you run into any issues during this task.
>>>>>>> >
>>>>>>> > <https://beam.apache.org/roadmap/connectors-multi-sdk/>Thanks,
>>>>>>> > Cham
>>>>>>> >
>>>>>>> > On Thu, May 28, 2020 at 9:59 AM Piotr Szuberski <
>>>>>>> piotr.szuber...@polidea.com>
>>>>>>> > wrote:
>>>>>>> >
>>>>>>> > > I added to Jira task of creating cross-language wrappers for
>>>>>>> Java IOs. It
>>>>>>> > > will soon be in progress.
>>>>>>> > >
>>>>>>> >
>>>>>>>
>>>>>>


Re: Python Cross-language wrappers for Java IOs

2020-06-15 Thread Boyuan Zhang
Thanks Cham. Standard coder is a good point. Does it mean non-standard
coder doesn't work when crossing language boundaries even if it is
implemented in both Java and Python sdk?

On Mon, Jun 15, 2020 at 10:08 AM Chamikara Jayalath 
wrote:

> Thanks. +1 for using RowCoder. We should try to use standard coders [1] in
> the x-lang SDK boundaries.
> If we use other coders (for example, ProtoCoder) it may or may not work
> depending on how various runners implement support for x-lang.
>
> This might require slightly updating existing transforms or adding
> additional conversion transforms to cross-language builders.
>
> [1]
> https://github.com/apache/beam/blob/master/model/pipeline/src/main/proto/beam_runner_api.proto#L669
>
> Thanks,
> Cham
>
> On Mon, Jun 15, 2020 at 10:00 AM Piotr Szuberski <
> piotr.szuber...@polidea.com> wrote:
>
>> Right now I'm working on JdbcIO and I'm using Row and Schema protobuffs.
>> I'm figuring out how to use them properly. Thanks for the article - for
>> sure it will be helpful!
>>
>> On 2020/06/12 20:32:16, Brian Hulette  wrote:
>> > Thanks! I see there are jiras for SpannerIO and JdbcIO as part of that.
>> Are
>> > you planning on using row coder for them?
>> > If so I want to make sure you're aware of
>> > https://s.apache.org/beam-schema-io (sent to the dev list last week
>> > [1]). +Scott
>> > Lukas  will be working on building out the ideas
>> there
>> > this summer. His work could be useful for making these IOs
>> cross-language
>> > (and you would get a mapping to SQL out of it without much more effort).
>> >
>> > Brian
>> >
>> > [1]
>> >
>> https://lists.apache.org/thread.html/rc1695025d41c5dc38cdf7bc32bea0e7421379b1c543c2d82f69aa179%40%3Cdev.beam.apache.org%3E
>> >
>> > On Tue, Jun 2, 2020 at 9:30 AM Piotr Szuberski <
>> piotr.szuber...@polidea.com>
>> > wrote:
>> >
>> > > Sure, I'll do that
>> > >
>> > > On 2020/05/28 17:54:49, Chamikara Jayalath 
>> wrote:
>> > > > Great. Thanks for working on this. Can you please add these tasks
>> and
>> > > JIRAs
>> > > > to the cross-language transforms roadmap under "Connector/transform
>> > > > support".
>> > > > https://beam.apache.org/roadmap/connectors-multi-sdk/
>> > > >
>> > > > Happy to help if you run into any issues during this task.
>> > > >
>> > > > Thanks,
>> > > > Cham
>> > > >
>> > > > On Thu, May 28, 2020 at 9:59 AM Piotr Szuberski <
>> > > piotr.szuber...@polidea.com>
>> > > > wrote:
>> > > >
>> > > > > I added to Jira task of creating cross-language wrappers for Java
>> IOs.
>> > > It
>> > > > > will soon be in progress.
>> > > > >
>> > > >
>> > >
>> >
>>
>


Re: [Discuss] Build Kafka read transform on top of SplittableDoFn

2020-06-15 Thread Boyuan Zhang
Thanks Pablo!
Hi Pedro, as Pablo mentioned, the core PTransform is ReadViaSDF, and the
core DoFn is ReadFromKafkaDoFn. We also have some other IOs in SDF: HBaseIO
<https://github.com/apache/beam/blob/52419e93ee9fa8c823eb505c472969fc7849e247/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseReadSplittableDoFn.java#L38>,
CassandraIO <https://github.com/apache/beam/pull/10546>. Hope this helps : )

On Mon, Jun 15, 2020 at 10:05 AM Pablo Estrada  wrote:

> Hi Pedro,
> Boyuan shared her prototype implementation in [1]. If you're coding a
> SplittableDoFn, I'd guess the relevant piece of code is ReadViaSDF.java
> Best
> -P.
> [1] https://github.com/apache/beam/pull/11749/files
>
> On Mon, Jun 15, 2020 at 10:00 AM Pedro H S Teixeira 
> wrote:
>
>> Hi Boyuan,
>>
>> Is the implementation (even if incomplete) open source / available at
>> this moment?
>>
>> Trying to implement here an IO to a custom source here using
>> SplittableDoFn, and it would be helpful to see more examples :)
>>
>> Thanks,
>> Pedro
>>
>>
>> On 2020/05/29 02:16:49, Boyuan Zhang  wrote:
>> > Hi team,
>> >
>> > I'm Boyuan, currently working on building a Kafka read PTransform on
>> top of
>> > SplittableDoFn[1][2][3]. There are two questions about Kafka usage I
>> want
>> > to discuss with you:
>> >
>> > 1.  Compared to the KafkaIO.Read
>> > <
>> https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L351
>> >,
>> > the SplittableDoFn Kafka version allows taking TopicPartition and
>> > startReadTime as elements and processing them during execution time,
>> > instead of configuring topics at pipeline construction time. I'm
>> wondering
>> > whether there are other configurations we also want to populate during
>> > pipeline execution time instead of construction time. Taking these
>> > configurations as elements would make value when they could be different
>> > for different TopicPartition. For a list of configurations we have now,
>> > please refer to KafkaIO.Read
>> > <
>> https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L351
>> >
>> > .
>> >
>> > 2. I also want to offer a simple way for KafkaIO.Read to expand with the
>> > SDF version PTransform. Almost all configurations can be translated
>> easily
>> > from KafkaIO.Read to the SDF version read except custom
>> > TimestampPolicyFactory (It's easy to translate build-in default types
>> such
>> > as withProcessingTime
>> > <
>> https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L710
>> >,
>> > withCreateTime
>> > <
>> https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L726
>> >
>> > and withLogAppendTime
>> > <
>> https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L699
>> >.).
>> > With SplittableDoFn, we have WatermarkEstimator
>> > <
>> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/WatermarkEstimator.java
>> >
>> > to track watermark per TopicPartition. Thus, instead of
>> > TimestampPolicyFactory
>> > <
>> https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/TimestampPolicyFactory.java
>> >
>> > ,
>> > we need the user to provide a function which can extract output
>> timestamp
>> > from a KafkaRecord(like withTimestampFn
>> > <
>> https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L780
>> >).
>> > My question here is, are the default types enough for current Kafka.Read
>> > users? If the custom TimestampPolicy is really in common? Is it okay to
>> use
>> > current API withTimestampFn
>> > <
>> https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L780
>> >
>> > in
>> > KafkaIO.Read to accept the custom function and populate it to the SDF
>> read
>> > transform?
>> >
>> > Thanks for your help!
>> >
>> > [1] https://beam.apache.org/blog/splittable-do-fn/
>> > [2] https://s.apache.org/splittable-do-fn
>> > [3] My prototype PR https://github.com/apache/beam/pull/11749
>> >
>>
>


  1   2   >