Re: [DISCUSS] Avro dependency update, design doc

2023-01-02 Thread Reuven Lax via dev
Be very careful with the auto schema stuff around Avro. These classes
dynamically inspect Avro-generated classes (to codegen schema accessors) so
it will be easy to break this in a way that is not seen at compile time.

On Mon, Jan 2, 2023 at 12:17 PM Alexey Romanenko 
wrote:

> Here is the recent update on the progress for this topic.
>
> After receiving a feedback on the design document [1] presented to
> community before and having the several discussions after (many thanks for
> this!), it was decided to take an “*option 4*” (*“Move Avro from “core”
> to generic Avro extensions using multiple Avro version specific adapters to
> handle breaking changes”*) as a way to move forward.
>
> We created an umbrella issue to track the progress [2] and the* first
> step* (“*Create Avro extension for Java SDK*”) of this [3] is already
> finished and merged. This new created extension (“
> *sdks/java/extensions/avro/*") replicates the same Avro support behaviour
> as it's currently implemented in Java SDK “*core*”. It required almost no
> changes for the current user API (only relaxation of access modifiers for
> several class members and methods to provide an access from other packages
> to them), so it should *not* introduce any potential breaking changes for
> users, especially if they still use the current Beam Avro's version
> (1.8.2).
>
> The *next step* will be to switch all Beam Java modules to use the new
> Avro extension instead of using the “core” Avro classes. Again, we don’t
> expect any user API breaking changes for this step.
>
> *Note*: As a price for smooth and not breakable transition, we have to
> support two equal versions of Beam Avro's code (in “*core*" and in “
> *extensions/avro*”) until the old code will be deprecated (it’s expected
> to be the *third step*). So, till this, please apply your Java SDK
> Avro-related changes (if any) in two places to keep them in sync.
>
>
> Also, please, share any of your feedback, questions, ideas or concerns on
> this topic.
>
>
> [1]
> https://docs.google.com/document/d/1tKIyTk_-HhkmVuJsxvWP5eTELESpCBe_Vmb1nJ3Ia34/edit?usp=sharing
> [2] https://github.com/apache/beam/issues/24292
> [3] https://github.com/apache/beam/issues/24293
>
> —
> Alexey
>
>
>
> On 18 Nov 2022, at 15:56, Alexey Romanenko 
> wrote:
>
> Since there are no principal objections against the proposed option 2
> (extract Avro-related code from “core” to Avro extension but keep it in
> “core” for some time because of transition period), then we will try to
> move forward and take this path.
>
> I’m pretty sure that we will face some hidden issues while working on
> this, so I’ll keep you posted =)
>
> —
> Alexey
>
> On 11 Nov 2022, at 18:05, Austin Bennett  wrote:
>
> @Moritz: I *think* should be fine, and don't have anything specific to
> offer for what might go wrong throughout the process.  :-) :shrug:
>
>
>
> On Fri, Nov 11, 2022 at 2:07 AM Moritz Mack  wrote:
>
>> Thanks a lot for the feedback so far! I can only second Alexey. It was
>> painful to come to realize that the only feasible option seems to be
>> copying a lot of code during the transition phase.
>>
>> For that reason, it will be critical to be disciplined about the removal
>> of the to-be deprecated code in core and, ahead of time, agree on when to
>> remove it again. Any thought on how long the transition phase should be?
>>
>>
>>
>>  *I am concerned of what could go wrong for users in the
>> in-between/transition state while more slowly transitioning avro to
>> extension.*
>>
>>
>>
>> @Austin Do you have any specific concern in mind here?
>>
>> To minimize this risk, we propose that all APIs should be kept as is to
>> make the migration as easy as possible and kick off with the Avro version
>> used in core. The only thing that changes will be package names.
>>
>>
>>
>> / Moritz
>>
>>
>>
>> On 10.11.22, 22:46, "Kenneth Knowles"  wrote:
>>
>>
>>
>> Thank you for writing this document. It really helps to understand the
>> options. I agree that option 2 (make a new extension and deprecate from
>> core) seems best. I think +Reuven Lax might have the most context on any
>> technical issue we will
>>
>> Thank you for writing this document. It really helps to understand the
>> options. I agree that option 2 (make a new extension and deprecate from
>> core) seems best. I think +Reuven Lax  might have the
>> most context on any technical issue we will encounter around schema codegen.
>>
>>
>>
>> Kenn
>>
>>
>>
>> On Thu, Nov 10, 2022 at 7:24 AM Alexey Romanenko <
>> aromanenko@gmail.com> wrote:
>>
>> Personally, I think that keeping two mostly identical versions of
>> Avro-related code in two different places (“core" and "extension") is rathe
>> bad practice, especially, in case of need to fix some issues there -
>> though, it’s a very low risk there since this code is quite mature and it’s
>> not touched often. On the other hand, it should give time for users
>> (several Beam releases) to update their code and use Avro from 

Re: [DISCUSS] Avro dependency update, design doc

2023-01-02 Thread Alexey Romanenko
Here is the recent update on the progress for this topic.

After receiving a feedback on the design document [1] presented to community 
before and having the several discussions after (many thanks for this!), it was 
decided to take an “option 4” (“Move Avro from “core” to generic Avro 
extensions using multiple Avro version specific adapters to handle breaking 
changes”) as a way to move forward. 

We created an umbrella issue to track the progress [2] and the first step 
(“Create Avro extension for Java SDK”) of this [3] is already finished and 
merged. This new created extension (“sdks/java/extensions/avro/") replicates 
the same Avro support behaviour as it's currently implemented in Java SDK 
“core”. It required almost no changes for the current user API (only relaxation 
of access modifiers for several class members and methods to provide an access 
from other packages to them), so it should not introduce any potential breaking 
changes for users, especially if they still use the current Beam Avro's version 
(1.8.2). 

The next step will be to switch all Beam Java modules to use the new Avro 
extension instead of using the “core” Avro classes. Again, we don’t expect any 
user API breaking changes for this step.

Note: As a price for smooth and not breakable transition, we have to support 
two equal versions of Beam Avro's code (in “core" and in “extensions/avro”) 
until the old code will be deprecated (it’s expected to be the third step). So, 
till this, please apply your Java SDK Avro-related changes (if any) in two 
places to keep them in sync.


Also, please, share any of your feedback, questions, ideas or concerns on this 
topic.

 
[1] 
https://docs.google.com/document/d/1tKIyTk_-HhkmVuJsxvWP5eTELESpCBe_Vmb1nJ3Ia34/edit?usp=sharing
[2] https://github.com/apache/beam/issues/24292
[3] https://github.com/apache/beam/issues/24293

—
Alexey



> On 18 Nov 2022, at 15:56, Alexey Romanenko  wrote:
> 
> Since there are no principal objections against the proposed option 2 
> (extract Avro-related code from “core” to Avro extension but keep it in 
> “core” for some time because of transition period), then we will try to move 
> forward and take this path. 
> 
> I’m pretty sure that we will face some hidden issues while working on this, 
> so I’ll keep you posted =)
> 
> —
> Alexey
> 
>> On 11 Nov 2022, at 18:05, Austin Bennett  wrote:
>> 
>> @Moritz: I *think* should be fine, and don't have anything specific to offer 
>> for what might go wrong throughout the process.  :-) :shrug:
>> 
>> 
>> 
>> On Fri, Nov 11, 2022 at 2:07 AM Moritz Mack > > wrote:
>>> Thanks a lot for the feedback so far! I can only second Alexey. It was 
>>> painful to come to realize that the only feasible option seems to be 
>>> copying a lot of code during the transition phase.
>>> 
>>> For that reason, it will be critical to be disciplined about the removal of 
>>> the to-be deprecated code in core and, ahead of time, agree on when to 
>>> remove it again. Any thought on how long the transition phase should be?
>>> 
>>>  
>>> 
>>>  I am concerned of what could go wrong for users in the 
>>> in-between/transition state while more slowly transitioning avro to 
>>> extension.
>>> 
>>>  
>>> 
>>> @Austin Do you have any specific concern in mind here?
>>> 
>>> To minimize this risk, we propose that all APIs should be kept as is to 
>>> make the migration as easy as possible and kick off with the Avro version 
>>> used in core. The only thing that changes will be package names.
>>> 
>>>  
>>> 
>>> / Moritz
>>> 
>>>  
>>> 
>>> On 10.11.22, 22:46, "Kenneth Knowles" >> > wrote:
>>> 
>>>  
>>> 
>>> Thank you for writing this document. It really helps to understand the 
>>> options. I agree that option 2 (make a new extension and deprecate from 
>>> core) seems best. I think +Reuven Lax might have the most context on any 
>>> technical issue we will
>>> 
>>> Thank you for writing this document. It really helps to understand the 
>>> options. I agree that option 2 (make a new extension and deprecate from 
>>> core) seems best. I think +Reuven Lax  might have 
>>> the most context on any technical issue we will encounter around schema 
>>> codegen.
>>> 
>>>  
>>> 
>>> Kenn
>>> 
>>>  
>>> 
>>> On Thu, Nov 10, 2022 at 7:24 AM Alexey Romanenko >> > wrote:
>>> 
>>> Personally, I think that keeping two mostly identical versions of 
>>> Avro-related code in two different places (“core" and "extension") is rathe 
>>> bad practice, especially, in case of need to fix some issues there - 
>>> though, it’s a very low risk there since this code is quite mature and it’s 
>>> not touched often. On the other hand, it should give time for users 
>>> (several Beam releases) to update their code and use Avro from extension 
>>> artifact instead of core.
>>> 
>>>  
>>> 
>>> Though, if we accept that this breaking change at compile time is 
>>> allowable, 

Join streams with different frequencies

2023-01-02 Thread Ifat Afek (Nokia)
Hi,

We are trying to implement the following use case:
We have a stream of DataX events that arrive every 5 minutes and require some 
processing. Each event holds data for a specific non-unique ID (we keep getting 
updated data for each ID). There might be up to 1,000,000 IDs.
In addition, there is a stream of DataY events for some of these IDs, that 
arrive in a variable frequency. Could be after a minute and then again after 5 
hours.
We would like to join the current DataX and latest DataY events by ID (and 
process only IDs that have both DataX and DataY events).

We thought of holding a state of DataY events per ID in a global window, and 
then use it as a side input for filtering the DataX events stream. The state 
should hold the latest (by timestamp) DataY event that arrived.
The problem is: if we are using discardingFiredPanes(), then each DataY event 
is fired only once and cannot be reused later on for filtering. On the other 
hand, if we are using accumulatingFiredPanes(), then a list of all DataY events 
that arrived is fired.

Are we missing something? what is the best practice for combining two streams, 
one with a variable frequency?

Thanks,
Ifat



Re: SparkRunner - ensure SDF output does not need to fit in memory

2023-01-02 Thread Jan Lukavský
There are different translations of streaming and batch Pipelines in 
SparkRunner, this thread was focused on the batch part, if I understand 
it correctly. Unbounded PCollections are not supported in batch Spark 
(by definition). I agree that fixing the splitting is a valid option, 
though it still requires unnecessarily big heap for buffering and/or 
might induce some overhead with splitting the restriction. Not to 
mention, that the splitting is somewhat optional in the contract of SDF 
(the DoFn might not support it, if it is bounded), so it might not solve 
the issue for all SDFs. The source might not even be splittable at all 
(e.g. a completely compressed blob, without any blocks).


 Jan

On 1/2/23 16:22, Daniel Collins via dev wrote:
If spark's SDF solution doesn't support splitting, fixing that seems 
like the best solution to me. Splitting is the mechanism exposed by 
the model to actually limit the amount of data produced in a bundle. 
If unsupported, then unbounded-per-element SDFs wouldn't be supported 
at all.


-Daniel

On Mon, Jan 2, 2023 at 7:46 AM Jan Lukavský  wrote:

Hi Jozef,

I agree that this issue is most likely related to Spark for the
reason how Spark uses functional style for doing flatMap().

It could be fixed with the following two options:

 a) SparkRunner's SDF implementation does not use splitting - it
could be fixed so that the SDF is stopped after N elements
buffered via trySplit, buffer gets flushed and the restriction is
resumed

 b) alternatively use two threads and a BlockingQueue between
them, which is what you propose

The number of output elements per input element is bounded (we are
talking about batch case anyway), but bounded does not mean it has
to fit to memory. Furthermore, unnecessary buffering of large
number of elements is memory-inefficient, which is why I think
that the two-thread approach (b) should be the most efficient. The
option (a) seems orthogonal and might be implemented as well.

It rises the question of how to determine if the runner should do
some special translation of SDF in this case. There are probably
only these options:

 1) translate all SDFs to two-thread execution

 2) add runtime flag, that will turn the translation on (once
turned on, it will translate all SDFs) - this is the current proposal

 3) extend @DoFn.BoundedPerElement annotation with some kind of
(optional) hint - e.g.
@DoFn.BoundedPerElement(Bounded.POSSIBLY_HUGE), the default would
be Bounded.FITS_IN_MEMORY (which is the current approach)

The approach (3) seems to give more information to all runners and
might result in the ability to apply various optimizations for
multiple runners, so I'd say that this might be the ideal variant.

  Jan

On 12/29/22 13:07, Jozef Vilcek wrote:

I am surprised to hear that Dataflow runner ( which I never used
) would have this kind oflimitation. I see that the
`OutputManager` interface is implemented to write to `Receiver`
[1] which follows the push model. Do you have a reference I can
take a look to review the must fit memory limitation?

In Spark, the problem is that the leaf operator pulls data from
previous ones by consuming an `Iterator` of values. As per your
suggestion, this is not a problem with `sources` because they
hold e.g. source file and can pull data as they are being
requested. This gets problematic exactly with SDF and flatMaps
and not sources. It could be one of the reasons why SDF performed
badly on Spark where community reported performance degradation
[2] and increases memory use [3]

My proposed solution is to, similar as Dataflow, use
`Receiver`-like implementation for DoFns which can output large
number of elements. For now, this WIP targets SDFs only.

[1]

https://github.com/apache/beam/blob/v2.43.0/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/SimpleParDoFn.java#L285
[2] https://github.com/apache/beam/pull/14755
[3]

https://issues.apache.org/jira/browse/BEAM-10670?focusedCommentId=17332005=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-17332005



On Wed, Dec 28, 2022 at 8:52 PM Daniel Collins via dev
 wrote:

I believe that for dataflow runner, the result of
processElement must also fit in memory, so this is not just a
constraint for the spark runner.

The best approach at present might be to convert the source
from a flatMap to an SDF that reads out chunks of the file at
a time, and supports runner checkpointing (i.e. with a file
seek point to resume from) to chunk your data in a way that
doesn't 

Re: SparkRunner - ensure SDF output does not need to fit in memory

2023-01-02 Thread Daniel Collins via dev
If spark's SDF solution doesn't support splitting, fixing that seems like
the best solution to me. Splitting is the mechanism exposed by the model to
actually limit the amount of data produced in a bundle. If unsupported,
then unbounded-per-element SDFs wouldn't be supported at all.

-Daniel

On Mon, Jan 2, 2023 at 7:46 AM Jan Lukavský  wrote:

> Hi Jozef,
>
> I agree that this issue is most likely related to Spark for the reason how
> Spark uses functional style for doing flatMap().
>
> It could be fixed with the following two options:
>
>  a) SparkRunner's SDF implementation does not use splitting - it could be
> fixed so that the SDF is stopped after N elements buffered via trySplit,
> buffer gets flushed and the restriction is resumed
>
>  b) alternatively use two threads and a BlockingQueue between them, which
> is what you propose
>
> The number of output elements per input element is bounded (we are talking
> about batch case anyway), but bounded does not mean it has to fit to
> memory. Furthermore, unnecessary buffering of large number of elements is
> memory-inefficient, which is why I think that the two-thread approach (b)
> should be the most efficient. The option (a) seems orthogonal and might be
> implemented as well.
>
> It rises the question of how to determine if the runner should do some
> special translation of SDF in this case. There are probably only these
> options:
>
>  1) translate all SDFs to two-thread execution
>
>  2) add runtime flag, that will turn the translation on (once turned on,
> it will translate all SDFs) - this is the current proposal
>
>  3) extend @DoFn.BoundedPerElement annotation with some kind of (optional)
> hint - e.g. @DoFn.BoundedPerElement(Bounded.POSSIBLY_HUGE), the default
> would be Bounded.FITS_IN_MEMORY (which is the current approach)
>
> The approach (3) seems to give more information to all runners and might
> result in the ability to apply various optimizations for multiple runners,
> so I'd say that this might be the ideal variant.
>
>   Jan
> On 12/29/22 13:07, Jozef Vilcek wrote:
>
> I am surprised to hear that Dataflow runner ( which I never used ) would
> have this kind oflimitation. I see that the `OutputManager` interface is
> implemented to write to `Receiver` [1] which follows the push model. Do you
> have a reference I can take a look to review the must fit memory
> limitation?
>
> In Spark, the problem is that the leaf operator pulls data from previous
> ones by consuming an `Iterator` of values. As per your suggestion, this is
> not a problem with `sources` because they hold e.g. source file and can
> pull data as they are being requested. This gets problematic exactly with
> SDF and flatMaps and not sources. It could be one of the reasons why SDF
> performed badly on Spark where community reported performance degradation
> [2] and increases memory use [3]
>
> My proposed solution is to, similar as Dataflow, use `Receiver`-like
> implementation for DoFns which can output large number of elements. For
> now, this WIP targets SDFs only.
>
> [1]
> https://github.com/apache/beam/blob/v2.43.0/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/SimpleParDoFn.java#L285
> [2] https://github.com/apache/beam/pull/14755
> [3]
> https://issues.apache.org/jira/browse/BEAM-10670?focusedCommentId=17332005=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-17332005
>
> On Wed, Dec 28, 2022 at 8:52 PM Daniel Collins via dev <
> dev@beam.apache.org> wrote:
>
>> I believe that for dataflow runner, the result of processElement must
>> also fit in memory, so this is not just a constraint for the spark runner.
>>
>> The best approach at present might be to convert the source from a
>> flatMap to an SDF that reads out chunks of the file at a time, and supports
>> runner checkpointing (i.e. with a file seek point to resume from) to chunk
>> your data in a way that doesn't require the runner to support unbounded
>> outputs from any individual @ProcessElements downcall.
>>
>> -Daniel
>>
>> On Wed, Dec 28, 2022 at 1:36 PM Jozef Vilcek 
>> wrote:
>>
>>> Hello,
>>>
>>> I am working on an issue which currently limits spark runner by
>>> requiring the result of processElement to fit the memory [1]. This is
>>> problematic e.g for flatMap where the input element is file split and
>>> generates possibly large output.
>>>
>>> The intended fix is to add an option to have dofn processing over input
>>> in one thread and consumption of outputs and forwarding them to downstream
>>> operators in another thread. One challenge for me is to identify which DoFn
>>> should be using this async approach.
>>>
>>> Here [2] is a commit which is WIP and use async processing only for SDF
>>> naive expansion. I would like to get feedback on:
>>>
>>> 1) does the approach make sense overall
>>>
>>> 2) to target DoFn which needs an async processing __ generates possibly
>>> large output __ I am currently just checking if it is 

Re: SparkRunner - ensure SDF output does not need to fit in memory

2023-01-02 Thread Jan Lukavský

Hi Jozef,

I agree that this issue is most likely related to Spark for the reason 
how Spark uses functional style for doing flatMap().


It could be fixed with the following two options:

 a) SparkRunner's SDF implementation does not use splitting - it could 
be fixed so that the SDF is stopped after N elements buffered via 
trySplit, buffer gets flushed and the restriction is resumed


 b) alternatively use two threads and a BlockingQueue between them, 
which is what you propose


The number of output elements per input element is bounded (we are 
talking about batch case anyway), but bounded does not mean it has to 
fit to memory. Furthermore, unnecessary buffering of large number of 
elements is memory-inefficient, which is why I think that the two-thread 
approach (b) should be the most efficient. The option (a) seems 
orthogonal and might be implemented as well.


It rises the question of how to determine if the runner should do some 
special translation of SDF in this case. There are probably only these 
options:


 1) translate all SDFs to two-thread execution

 2) add runtime flag, that will turn the translation on (once turned 
on, it will translate all SDFs) - this is the current proposal


 3) extend @DoFn.BoundedPerElement annotation with some kind of 
(optional) hint - e.g. @DoFn.BoundedPerElement(Bounded.POSSIBLY_HUGE), 
the default would be Bounded.FITS_IN_MEMORY (which is the current approach)


The approach (3) seems to give more information to all runners and might 
result in the ability to apply various optimizations for multiple 
runners, so I'd say that this might be the ideal variant.


  Jan

On 12/29/22 13:07, Jozef Vilcek wrote:
I am surprised to hear that Dataflow runner ( which I never used ) 
would have this kind oflimitation. I see that the `OutputManager` 
interface is implemented to write to `Receiver` [1] which follows the 
push model. Do you have a reference I can take a look to review the 
must fit memory limitation?


In Spark, the problem is that the leaf operator pulls data from 
previous ones by consuming an `Iterator` of values. As per your 
suggestion, this is not a problem with `sources` because they hold 
e.g. source file and can pull data as they are being requested. This 
gets problematic exactly with SDF and flatMaps and not sources. It 
could be one of the reasons why SDF performed badly on Spark where 
community reported performance degradation [2] and increases memory 
use [3]


My proposed solution is to, similar as Dataflow, use `Receiver`-like 
implementation for DoFns which can output large number of elements. 
For now, this WIP targets SDFs only.


[1] 
https://github.com/apache/beam/blob/v2.43.0/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/SimpleParDoFn.java#L285

[2] https://github.com/apache/beam/pull/14755
[3] 
https://issues.apache.org/jira/browse/BEAM-10670?focusedCommentId=17332005=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-17332005 



On Wed, Dec 28, 2022 at 8:52 PM Daniel Collins via dev 
 wrote:


I believe that for dataflow runner, the result of processElement
must also fit in memory, so this is not just a constraint for the
spark runner.

The best approach at present might be to convert the source from a
flatMap to an SDF that reads out chunks of the file at a time, and
supports runner checkpointing (i.e. with a file seek point to
resume from) to chunk your data in a way that doesn't require the
runner to support unbounded outputs from any individual
@ProcessElements downcall.

-Daniel

On Wed, Dec 28, 2022 at 1:36 PM Jozef Vilcek
 wrote:

Hello,

I am working on an issue which currently limits spark runner
by requiring the result of processElement to fit the memory
[1]. This is problematic e.g for flatMap where the input
element is file split and generates possibly large output.

The intended fix is to add an option to have dofn processing
over input in one thread and consumption of outputs and
forwarding them to downstream operators in another thread. One
challenge for me is to identify which DoFn should be using
this async approach.

Here [2] is a commit which is WIP and use async processing
only for SDF naive expansion. I would like to get feedback on:

1) does the approach make sense overall

2) to target DoFn which needs an async processing __ generates
possibly large output __ I am currently just checking if it is
DoFn of SDF naive expansion type [3]. I failed to find a
better / more systematic approach for identifying which DoFn
should benefit from that. I would appreciate any thoughts how
to make 

Beam High Priority Issue Report (41)

2023-01-02 Thread beamactions
This is your daily summary of Beam's current high priority issues that may need 
attention.

See https://beam.apache.org/contribute/issue-priorities for the meaning and 
expectations around issue priorities.

Unassigned P1 Issues:

https://github.com/apache/beam/issues/24787 [Failing Test]: 
sklearnInferenceTest is failing in Python 3.9 PostCommit
https://github.com/apache/beam/issues/24776 [Bug]: Race condition in Python SDK 
Harness ProcessBundleProgress
https://github.com/apache/beam/issues/24655 [Bug]: Pipeline fusion should break 
at @RequiresStableInput boundary
https://github.com/apache/beam/issues/24389 [Failing Test]: 
HadoopFormatIOElasticTest.classMethod ExceptionInInitializerError 
ContainerFetchException
https://github.com/apache/beam/issues/24367 [Bug]: workflow.tar.gz cannot be 
passed to flink runner
https://github.com/apache/beam/issues/24313 [Flaky]: 
apache_beam/runners/portability/portable_runner_test.py::PortableRunnerTestWithSubprocesses::test_pardo_state_with_custom_key_coder
https://github.com/apache/beam/issues/24267 [Failing Test]: Timeout waiting to 
lock gradle
https://github.com/apache/beam/issues/23944  beam_PreCommit_Python_Cron 
regularily failing - test_pardo_large_input flaky
https://github.com/apache/beam/issues/23709 [Flake]: Spark batch flakes in 
ParDoLifecycleTest.testTeardownCalledAfterExceptionInProcessElement and 
ParDoLifecycleTest.testTeardownCalledAfterExceptionInStartBundle
https://github.com/apache/beam/issues/22969 Discrepancy in behavior of 
`DoFn.process()` when `yield` is combined with `return` statement, or vice versa
https://github.com/apache/beam/issues/22961 [Bug]: WriteToBigQuery silently 
skips most of records without job fail
https://github.com/apache/beam/issues/22913 [Bug]: 
beam_PostCommit_Java_ValidatesRunner_Flink is flakes in 
org.apache.beam.sdk.transforms.GroupByKeyTest$BasicTests.testAfterProcessingTimeContinuationTriggerUsingState
https://github.com/apache/beam/issues/21713 404s in BigQueryIO don't get output 
to Failed Inserts PCollection
https://github.com/apache/beam/issues/21695 DataflowPipelineResult does not 
raise exception for unsuccessful states.
https://github.com/apache/beam/issues/21643 FnRunnerTest with non-trivial 
(order 1000 elements) numpy input flakes in non-cython environment
https://github.com/apache/beam/issues/21469 beam_PostCommit_XVR_Flink flaky: 
Connection refused
https://github.com/apache/beam/issues/21262 Python AfterAny, AfterAll do not 
follow spec
https://github.com/apache/beam/issues/21260 Python DirectRunner does not emit 
data at GC time
https://github.com/apache/beam/issues/21121 
apache_beam.examples.streaming_wordcount_it_test.StreamingWordCountIT.test_streaming_wordcount_it
 flakey
https://github.com/apache/beam/issues/21104 Flaky: 
apache_beam.runners.portability.fn_api_runner.fn_runner_test.FnApiRunnerTestWithGrpcAndMultiWorkers
https://github.com/apache/beam/issues/20976 
apache_beam.runners.portability.flink_runner_test.FlinkRunnerTestOptimized.test_flink_metrics
 is flaky
https://github.com/apache/beam/issues/20974 Python GHA PreCommits flake with 
grpc.FutureTimeoutError on SDK harness startup
https://github.com/apache/beam/issues/20689 Kafka commitOffsetsInFinalize OOM 
on Flink
https://github.com/apache/beam/issues/20108 Python direct runner doesn't emit 
empty pane when it should
https://github.com/apache/beam/issues/19814 Flink streaming flakes in 
ParDoLifecycleTest.testTeardownCalledAfterExceptionInStartBundleStateful and 
ParDoLifecycleTest.testTeardownCalledAfterExceptionInProcessElementStateful
https://github.com/apache/beam/issues/19241 Python Dataflow integration tests 
should export the pipeline Job ID and console output to Jenkins Test Result 
section


P1 Issues with no update in the last week:

https://github.com/apache/beam/issues/24464 [Epic]: Implement 
FileWriteSchemaTransformProvider
https://github.com/apache/beam/issues/23875 [Bug]: beam.Row.__eq__ returns true 
for unequal rows
https://github.com/apache/beam/issues/22605 [Bug]: Beam Python failure for 
dataflow_exercise_metrics_pipeline_test.ExerciseMetricsPipelineTest.test_metrics_it
https://github.com/apache/beam/issues/22115 [Bug]: 
apache_beam.runners.portability.portable_runner_test.PortableRunnerTestWithSubprocesses
 is flaky
https://github.com/apache/beam/issues/21714 
PulsarIOTest.testReadFromSimpleTopic is very flaky
https://github.com/apache/beam/issues/21708 beam_PostCommit_Java_DataflowV2, 
testBigQueryStorageWrite30MProto failing consistently
https://github.com/apache/beam/issues/21706 Flaky timeout in github Python unit 
test action 
StatefulDoFnOnDirectRunnerTest.test_dynamic_timer_clear_then_set_timer
https://github.com/apache/beam/issues/21700 
--dataflowServiceOptions=use_runner_v2 is broken
https://github.com/apache/beam/issues/21645 
beam_PostCommit_XVR_GoUsingJava_Dataflow fails on some test transforms
https://github.com/apache/beam/issues/21476 WriteToBigQuery Dynamic table 
destinations returns