[YAML] Declarative beam (aka YAML) coordination

2023-09-25 Thread Robert Bradshaw via dev
Given the interest in the YAML work by multiple parties, we put together
https://s.apache.org/beam-yaml-contribute to more easily coordinate on this
effort. Nothing that surprising--we're going to continue using the standard
lists, github, etc.--but it should help for folks who want to get started.

Feel free to respond here on the list or comment on the doc if you have any
questions.


Re: Runner Bundling Strategies

2023-09-25 Thread Jan Lukavský

Hi Kenn and Reuven,

I agree with all these points. The only issue here seems to be that 
FlinkRunner does not fulfill these constraints. This is a bug that can 
be fixed, though we need to change some defaults, as 1000 ms default 
bundle "duration" for lower traffic Pipelines can be too much. We are 
also probably missing some @ValidatesReunner tests for this. I created 
[1] and [2] to track this.


One question still remains, the bundle vs. element life-cycle is 
relevant only for cases where processing of element X can affect 
processing of element Y later in the same bundle. Once this influence is 
rules out (i.e. no caching), this information can result in runner 
optimization that yields better performance. Should we consider 
propagate this information from user code to the runner?


[1] https://github.com/apache/beam/issues/28649

[2] https://github.com/apache/beam/issues/28650

On 9/25/23 18:31, Reuven Lax via dev wrote:



On Mon, Sep 25, 2023 at 6:19 AM Jan Lukavský  wrote:


On 9/23/23 18:16, Reuven Lax via dev wrote:

Two separate things here:

1. Yes, a watermark can update in the middle of a bundle.
2. The records in the bundle themselves will prevent the
watermark from updating as they are still in flight until after
finish bundle. Therefore simply caching the records should always
be watermark safe, regardless of the runner. You will only run
into problems if you try and move timestamps "backwards" - which
is why Beam strongly discourages this.

This is not aligned with  FlinkRunner's implementation. And I
actually think it is not aligned conceptually.  As mentioned,
Flink does not have the concept of bundles at all. It achieves
fault tolerance via checkpointing, essentially checkpoint barrier
flowing from sources to sinks, safely snapshotting state of each
operator on the way. Bundles are implemented as a somewhat
arbitrary set of elements between two consecutive checkpoints
(there can be multiple bundles between checkpoints). A bundle is
'committed' (i.e. persistently stored and guaranteed not to retry)
only after the checkpoint barrier passes over the elements in the
bundle (every bundle is finished at the very latest exactly before
a checkpoint). But watermark propagation and bundle finalization
is completely unrelated. This might be a bug in the runner, but
requiring checkpoint for watermark propagation will introduce
insane delays between processing time and watermarks, every
executable stage will delay watermark propagation until a
checkpoint (which is typically the order of seconds). This delay
would add up after each stage.


It's not bundles that hold up processing, rather it is elements, and 
elements are not considered "processed" until FinishBundle.


You are right about Flink. In many cases this is fine - if Flink rolls 
back to the last checkpoint, the watermark will also roll back, and 
everything stays consistent. So in general, one does not need to wait 
for checkpoints for watermark propagation.


Where things get a bit weirder with Flink is whenever one has external 
side effects. In theory, one should wait for checkpoints before 
letting a Sink flush, otherwise one could end up with incorrect 
outputs (especially with a sink like TextIO). Flink itself recognizes 
this, and that's why they provide TwoPhaseCommitSinkFunction 
 which 
waits for a checkpoint. In Beam, this is the reason we introduced 
RequiresStableInput. Of course in practice many Flink users don't do 
this - in which case they are prioritizing latency over data correctness.




Reuven

On Sat, Sep 23, 2023 at 12:03 AM Jan Lukavský 
wrote:

> Watermarks shouldn't be (visibly) advanced until
@FinishBundle is committed, as there's no guarantee that this
work won't be discarded.

There was a thread [1], where the conclusion seemed to be
that updating watermark is possible even in the middle of a
bundle. Actually, handling watermarks is runner-dependent
(e.g. Flink does not store watermarks in checkpoints, they
are always recomputed from scratch on restore).

[1]
https://lists.apache.org/thread/10db7l9bhnhmo484myps723sfxtjwwmv

On 9/22/23 21:47, Robert Bradshaw via dev wrote:

On Fri, Sep 22, 2023 at 10:58 AM Jan Lukavský
 wrote:

On 9/22/23 18:07, Robert Bradshaw via dev wrote:


On Fri, Sep 22, 2023 at 7:23 AM Byron Ellis via dev
 wrote:

I've actually wondered about this specifically for
streaming... if you're writing a pipeline there it
seems like you're often going to want to put high
fixed cost things like database connections even
  

Re: [LAZY CONSENSUS] Create separate repository for Swift SDK

2023-09-25 Thread Valentyn Tymofieiev via dev
On Mon, Sep 25, 2023 at 9:03 AM Kenneth Knowles  wrote:

> Hi all,
>
> I propose to unblock Byron's work by creating a new repository for the
> Beam Swift SDK. This will be the first of its kind, and break from
> tradition of having Beam be kind of a mini-mono-repo.
>
> Discussion of the Swift SDK and request for a separate repo is at
> https://lists.apache.org/thread/25tp4yoptqxzty8t4fqznqxc3cwklpss
>

Additional context (since there was a branching between dev and user
threads):   https://lists.apache.org/thread/pc0s0953z6z09z597h1rwdskk2y00hmo
. From the first message: *the "Swift Way" would be to have it in its own
repo so that it can easily be used from the Swift Package Manager. *



> I have created this thread to clearly separate this one issue, and clearly
> record if we have consensus (or not).
>
> If no one has an objection or further discussion needed in 72 hours, it
> can be considered approved and I will create the repository. See
> https://community.apache.org/committers/lazyConsensus.html
>
> Kenn
>


Re: Runner Bundling Strategies

2023-09-25 Thread Reuven Lax via dev
On Mon, Sep 25, 2023 at 6:19 AM Jan Lukavský  wrote:

>
> On 9/23/23 18:16, Reuven Lax via dev wrote:
>
> Two separate things here:
>
> 1. Yes, a watermark can update in the middle of a bundle.
> 2. The records in the bundle themselves will prevent the watermark from
> updating as they are still in flight until after finish bundle. Therefore
> simply caching the records should always be watermark safe, regardless of
> the runner. You will only run into problems if you try and move timestamps
> "backwards" - which is why Beam strongly discourages this.
>
> This is not aligned with  FlinkRunner's implementation. And I actually
> think it is not aligned conceptually.  As mentioned, Flink does not have
> the concept of bundles at all. It achieves fault tolerance via
> checkpointing, essentially checkpoint barrier flowing from sources to
> sinks, safely snapshotting state of each operator on the way. Bundles are
> implemented as a somewhat arbitrary set of elements between two consecutive
> checkpoints (there can be multiple bundles between checkpoints). A bundle
> is 'committed' (i.e. persistently stored and guaranteed not to retry) only
> after the checkpoint barrier passes over the elements in the bundle (every
> bundle is finished at the very latest exactly before a checkpoint). But
> watermark propagation and bundle finalization is completely unrelated. This
> might be a bug in the runner, but requiring checkpoint for watermark
> propagation will introduce insane delays between processing time and
> watermarks, every executable stage will delay watermark propagation until a
> checkpoint (which is typically the order of seconds). This delay would add
> up after each stage.
>

It's not bundles that hold up processing, rather it is elements, and
elements are not considered "processed" until FinishBundle.

You are right about Flink. In many cases this is fine - if Flink rolls back
to the last checkpoint, the watermark will also roll back, and everything
stays consistent. So in general, one does not need to wait for checkpoints
for watermark propagation.

Where things get a bit weirder with Flink is whenever one has external side
effects. In theory, one should wait for checkpoints before letting a Sink
flush, otherwise one could end up with incorrect outputs (especially with a
sink like TextIO). Flink itself recognizes this, and that's why they
provide TwoPhaseCommitSinkFunction

which
waits for a checkpoint. In Beam, this is the reason we introduced
RequiresStableInput. Of course in practice many Flink users don't do this -
in which case they are prioritizing latency over data correctness.

>
> Reuven
>
> On Sat, Sep 23, 2023 at 12:03 AM Jan Lukavský  wrote:
>
>> > Watermarks shouldn't be (visibly) advanced until @FinishBundle is
>> committed, as there's no guarantee that this work won't be discarded.
>>
>> There was a thread [1], where the conclusion seemed to be that updating
>> watermark is possible even in the middle of a bundle. Actually, handling
>> watermarks is runner-dependent (e.g. Flink does not store watermarks in
>> checkpoints, they are always recomputed from scratch on restore).
>>
>> [1] https://lists.apache.org/thread/10db7l9bhnhmo484myps723sfxtjwwmv
>> On 9/22/23 21:47, Robert Bradshaw via dev wrote:
>>
>> On Fri, Sep 22, 2023 at 10:58 AM Jan Lukavský  wrote:
>>
>>> On 9/22/23 18:07, Robert Bradshaw via dev wrote:
>>>
>>> On Fri, Sep 22, 2023 at 7:23 AM Byron Ellis via dev 
>>> wrote:
>>>
 I've actually wondered about this specifically for streaming... if
 you're writing a pipeline there it seems like you're often going to want to
 put high fixed cost things like database connections even outside of the
 bundle setup. You really only want to do that once in the lifetime of the
 worker itself, not the bundle. Seems like having that boundary be somewhere
 other than an arbitrarily (and probably small in streaming to avoid
 latency) group of elements might be more useful? I suppose this depends
 heavily on the object lifecycle in the sdk worker though.

>>>
>>> +1. This is the difference between @Setup and @StartBundle. The
>>> start/finish bundle operations should be used for bracketing element
>>> processing that must be committed as a unit for correct failure recovery
>>> (e.g. if elements are cached in ProcessElement, they should all be emitted
>>> in FinishBundle). On the other hand, things like open database connections
>>> can and likely should be shared across bundles.
>>>
>>> This is correct, but the caching between @StartBundle and @FinishBundle
>>> has some problems. First, users need to manually set watermark hold for
>>> min(timestamp in bundle), otherwise watermark might overtake the buffered
>>> elements.
>>>
>>
>> Watermarks shouldn't be (visibly) advanced until @FinishBundle is
>> committed, as there's 

[LAZY CONSENSUS] Create separate repository for Swift SDK

2023-09-25 Thread Kenneth Knowles
Hi all,

I propose to unblock Byron's work by creating a new repository for the Beam
Swift SDK. This will be the first of its kind, and break from tradition of
having Beam be kind of a mini-mono-repo.

Discussion of the Swift SDK and request for a separate repo is at
https://lists.apache.org/thread/25tp4yoptqxzty8t4fqznqxc3cwklpss

I have created this thread to clearly separate this one issue, and clearly
record if we have consensus (or not).

If no one has an objection or further discussion needed in 72 hours, it can
be considered approved and I will create the repository. See
https://community.apache.org/committers/lazyConsensus.html

Kenn


Re: Runner Bundling Strategies

2023-09-25 Thread Kenneth Knowles
These are some good points. Replies inline.

On Mon, Sep 25, 2023 at 9:19 AM Jan Lukavský  wrote:

>
> On 9/23/23 18:16, Reuven Lax via dev wrote:
>
> Two separate things here:
>
> 1. Yes, a watermark can update in the middle of a bundle.
> 2. The records in the bundle themselves will prevent the watermark from
> updating as they are still in flight until after finish bundle. Therefore
> simply caching the records should always be watermark safe, regardless of
> the runner. You will only run into problems if you try and move timestamps
> "backwards" - which is why Beam strongly discourages this.
>
> This is not aligned with  FlinkRunner's implementation. And I actually
> think it is not aligned conceptually.  As mentioned, Flink does not have
> the concept of bundles at all. It achieves fault tolerance via
> checkpointing, essentially checkpoint barrier flowing from sources to
> sinks, safely snapshotting state of each operator on the way.
>


> Bundles are implemented as a somewhat arbitrary set of elements between
> two consecutive checkpoints (there can be multiple bundles between
> checkpoints)
>

Yes, it is a good point. To align the runner: an input element is not
processed until it has been through @ProcessElement and then
also @FinishBundle called. Until that happens, the input element is still
"in process" and would hold the watermark. This doesn't mean the watermark
is frozen; it only means it is constrained.


> A bundle is 'committed' (i.e. persistently stored and guaranteed not to
> retry) only after the checkpoint barrier passes over the elements in the
> bundle (every bundle is finished at the very latest exactly before a
> checkpoint).
>

I think this is fine and does not have to be related to bundle processing
or watermarks. Since Flink does global consistency, any downstream work
that depended on the not-persisted results would also be reset back to the
checkpoint so it is fine.


> But watermark propagation and bundle finalization is completely unrelated.
> This might be a bug in the runner, but requiring checkpoint for watermark
> propagation will introduce insane delays between processing time and
> watermarks, every executable stage will delay watermark propagation until a
> checkpoint (which is typically the order of seconds). This delay would add
> up after each stage.
>

I am aware of this conflict. Interestingly, "requires stable input" is the
case where you must wait until checkpoint finalization, since inputs may
spontaneously change on retry before a checkpoint is finalized. This is not
just a mismatch in Beam/Flink but I believe Flink itself cannot correctly
process this kind of data without waiting for checkpoint finalization.

Kenn

>
> Reuven
>
> On Sat, Sep 23, 2023 at 12:03 AM Jan Lukavský  wrote:
>
>> > Watermarks shouldn't be (visibly) advanced until @FinishBundle is
>> committed, as there's no guarantee that this work won't be discarded.
>>
>> There was a thread [1], where the conclusion seemed to be that updating
>> watermark is possible even in the middle of a bundle. Actually, handling
>> watermarks is runner-dependent (e.g. Flink does not store watermarks in
>> checkpoints, they are always recomputed from scratch on restore).
>>
>> [1] https://lists.apache.org/thread/10db7l9bhnhmo484myps723sfxtjwwmv
>> On 9/22/23 21:47, Robert Bradshaw via dev wrote:
>>
>> On Fri, Sep 22, 2023 at 10:58 AM Jan Lukavský  wrote:
>>
>>> On 9/22/23 18:07, Robert Bradshaw via dev wrote:
>>>
>>> On Fri, Sep 22, 2023 at 7:23 AM Byron Ellis via dev 
>>> wrote:
>>>
 I've actually wondered about this specifically for streaming... if
 you're writing a pipeline there it seems like you're often going to want to
 put high fixed cost things like database connections even outside of the
 bundle setup. You really only want to do that once in the lifetime of the
 worker itself, not the bundle. Seems like having that boundary be somewhere
 other than an arbitrarily (and probably small in streaming to avoid
 latency) group of elements might be more useful? I suppose this depends
 heavily on the object lifecycle in the sdk worker though.

>>>
>>> +1. This is the difference between @Setup and @StartBundle. The
>>> start/finish bundle operations should be used for bracketing element
>>> processing that must be committed as a unit for correct failure recovery
>>> (e.g. if elements are cached in ProcessElement, they should all be emitted
>>> in FinishBundle). On the other hand, things like open database connections
>>> can and likely should be shared across bundles.
>>>
>>> This is correct, but the caching between @StartBundle and @FinishBundle
>>> has some problems. First, users need to manually set watermark hold for
>>> min(timestamp in bundle), otherwise watermark might overtake the buffered
>>> elements.
>>>
>>
>> Watermarks shouldn't be (visibly) advanced until @FinishBundle is
>> committed, as there's no guarantee that this work won't be discarded.
>>
>>
>>> Users 

Re: Runner Bundling Strategies

2023-09-25 Thread Jan Lukavský


On 9/23/23 18:16, Reuven Lax via dev wrote:

Two separate things here:

1. Yes, a watermark can update in the middle of a bundle.
2. The records in the bundle themselves will prevent the watermark 
from updating as they are still in flight until after finish bundle. 
Therefore simply caching the records should always be watermark safe, 
regardless of the runner. You will only run into problems if you try 
and move timestamps "backwards" - which is why Beam strongly 
discourages this.
This is not aligned with  FlinkRunner's implementation. And I actually 
think it is not aligned conceptually.  As mentioned, Flink does not have 
the concept of bundles at all. It achieves fault tolerance via 
checkpointing, essentially checkpoint barrier flowing from sources to 
sinks, safely snapshotting state of each operator on the way. Bundles 
are implemented as a somewhat arbitrary set of elements between two 
consecutive checkpoints (there can be multiple bundles between 
checkpoints). A bundle is 'committed' (i.e. persistently stored and 
guaranteed not to retry) only after the checkpoint barrier passes over 
the elements in the bundle (every bundle is finished at the very latest 
exactly before a checkpoint). But watermark propagation and bundle 
finalization is completely unrelated. This might be a bug in the runner, 
but requiring checkpoint for watermark propagation will introduce insane 
delays between processing time and watermarks, every executable stage 
will delay watermark propagation until a checkpoint (which is typically 
the order of seconds). This delay would add up after each stage.


Reuven

On Sat, Sep 23, 2023 at 12:03 AM Jan Lukavský  wrote:

> Watermarks shouldn't be (visibly) advanced until @FinishBundle
is committed, as there's no guarantee that this work won't be
discarded.

There was a thread [1], where the conclusion seemed to be that
updating watermark is possible even in the middle of a bundle.
Actually, handling watermarks is runner-dependent (e.g. Flink does
not store watermarks in checkpoints, they are always recomputed
from scratch on restore).

[1] https://lists.apache.org/thread/10db7l9bhnhmo484myps723sfxtjwwmv

On 9/22/23 21:47, Robert Bradshaw via dev wrote:

On Fri, Sep 22, 2023 at 10:58 AM Jan Lukavský 
wrote:

On 9/22/23 18:07, Robert Bradshaw via dev wrote:


On Fri, Sep 22, 2023 at 7:23 AM Byron Ellis via dev
 wrote:

I've actually wondered about this specifically for
streaming... if you're writing a pipeline there it seems
like you're often going to want to put high fixed cost
things like database connections even outside of the
bundle setup. You really only want to do that once in
the lifetime of the worker itself, not the bundle. Seems
like having that boundary be somewhere other than an
arbitrarily (and probably small in streaming to avoid
latency) group of elements might be more useful? I
suppose this depends heavily on the object lifecycle in
the sdk worker though.


+1. This is the difference between @Setup and @StartBundle.
The start/finish bundle operations should be used for
bracketing element processing that must be committed as a
unit for correct failure recovery (e.g. if elements are
cached in ProcessElement, they should all be emitted in
FinishBundle). On the other hand, things like open database
connections can and likely should be shared across bundles.

This is correct, but the caching between @StartBundle and
@FinishBundle has some problems. First, users need to
manually set watermark hold for min(timestamp in bundle),
otherwise watermark might overtake the buffered elements.


Watermarks shouldn't be (visibly) advanced until @FinishBundle is
committed, as there's no guarantee that this work won't be
discarded.

Users don't have other option than using
timer.withOutputTimestamp for that, as we don't have a
user-facing API to set watermark hold otherwise, thus the
in-bundle caching implies stateful DoFn. The question might
then by, why not use "classical" stateful caching involving
state, as there is full control over the caching in user
code. This triggered me an idea if it would be useful to add
the information about caching to the API (e.g. in Java
@StartBundle(caching=true)), which could solve the above
issues maybe (runner would know to set the hold, it could
work with "stateless" DoFns)?


Really, this is one of the areas that the streaming/batch
abstraction leaks. In batch it was a common pattern to have local
DoFn instance state that persisted from start to finish bundle,
and these were also used as convenient entry points for other
operations 

Beam High Priority Issue Report (42)

2023-09-25 Thread beamactions
This is your daily summary of Beam's current high priority issues that may need 
attention.

See https://beam.apache.org/contribute/issue-priorities for the meaning and 
expectations around issue priorities.

Unassigned P1 Issues:

https://github.com/apache/beam/issues/28383 [Failing Test]: 
org.apache.beam.runners.dataflow.worker.StreamingDataflowWorkerTest.testMaxThreadMetric
https://github.com/apache/beam/issues/28339 Fix failing 
"beam_PostCommit_XVR_GoUsingJava_Dataflow" job
https://github.com/apache/beam/issues/28326 Bug: 
apache_beam.io.gcp.pubsublite.ReadFromPubSubLite not working
https://github.com/apache/beam/issues/28168 [Bug]: BigQuery Storage Write API 
does not write with no complaint
https://github.com/apache/beam/issues/28142 [Bug]: [Go SDK] Memory seems to be 
leaking on 2.49.0 with Dataflow
https://github.com/apache/beam/issues/27892 [Bug]: ignoreUnknownValues not 
working when using CreateDisposition.CREATE_IF_NEEDED 
https://github.com/apache/beam/issues/27648 [Bug]: Python SDFs (e.g. 
PeriodicImpulse) running in Flink and polling using tracker.defer_remainder 
have checkpoint size growing indefinitely 
https://github.com/apache/beam/issues/27616 [Bug]: Unable to use 
applyRowMutations() in bigquery IO apache beam java
https://github.com/apache/beam/issues/27486 [Bug]: Read from datastore with 
inequality filters
https://github.com/apache/beam/issues/27314 [Failing Test]: 
bigquery.StorageApiSinkCreateIfNeededIT.testCreateManyTables[1]
https://github.com/apache/beam/issues/27238 [Bug]: Window trigger has lag when 
using Kafka and GroupByKey on Dataflow Runner
https://github.com/apache/beam/issues/26981 [Bug]: Getting an error related to 
SchemaCoder after upgrading to 2.48
https://github.com/apache/beam/issues/26969 [Failing Test]: Python PostCommit 
is failing due to exceeded rate limits
https://github.com/apache/beam/issues/26911 [Bug]: UNNEST ARRAY with a nested 
ROW (described below)
https://github.com/apache/beam/issues/26354 [Bug]: BigQueryIO direct read not 
reading all rows when set --setEnableBundling=true
https://github.com/apache/beam/issues/26343 [Bug]: 
apache_beam.io.gcp.bigquery_read_it_test.ReadAllBQTests.test_read_queries is 
flaky
https://github.com/apache/beam/issues/26329 [Bug]: BigQuerySourceBase does not 
propagate a Coder to AvroSource
https://github.com/apache/beam/issues/26041 [Bug]: Unable to create 
exactly-once Flink pipeline with stream source and file sink
https://github.com/apache/beam/issues/25975 [Bug]: Reducing parallelism in 
FlinkRunner leads to a data loss
https://github.com/apache/beam/issues/24776 [Bug]: Race condition in Python SDK 
Harness ProcessBundleProgress
https://github.com/apache/beam/issues/24389 [Failing Test]: 
HadoopFormatIOElasticTest.classMethod ExceptionInInitializerError 
ContainerFetchException
https://github.com/apache/beam/issues/24313 [Flaky]: 
apache_beam/runners/portability/portable_runner_test.py::PortableRunnerTestWithSubprocesses::test_pardo_state_with_custom_key_coder
https://github.com/apache/beam/issues/23944  beam_PreCommit_Python_Cron 
regularily failing - test_pardo_large_input flaky
https://github.com/apache/beam/issues/23709 [Flake]: Spark batch flakes in 
ParDoLifecycleTest.testTeardownCalledAfterExceptionInProcessElement and 
ParDoLifecycleTest.testTeardownCalledAfterExceptionInStartBundle
https://github.com/apache/beam/issues/23525 [Bug]: Default PubsubMessage coder 
will drop message id and orderingKey
https://github.com/apache/beam/issues/22913 [Bug]: 
beam_PostCommit_Java_ValidatesRunner_Flink is flakes in 
org.apache.beam.sdk.transforms.GroupByKeyTest$BasicTests.testAfterProcessingTimeContinuationTriggerUsingState
https://github.com/apache/beam/issues/22605 [Bug]: Beam Python failure for 
dataflow_exercise_metrics_pipeline_test.ExerciseMetricsPipelineTest.test_metrics_it
https://github.com/apache/beam/issues/21714 
PulsarIOTest.testReadFromSimpleTopic is very flaky
https://github.com/apache/beam/issues/21708 beam_PostCommit_Java_DataflowV2, 
testBigQueryStorageWrite30MProto failing consistently
https://github.com/apache/beam/issues/21706 Flaky timeout in github Python unit 
test action 
StatefulDoFnOnDirectRunnerTest.test_dynamic_timer_clear_then_set_timer
https://github.com/apache/beam/issues/21643 FnRunnerTest with non-trivial 
(order 1000 elements) numpy input flakes in non-cython environment
https://github.com/apache/beam/issues/21476 WriteToBigQuery Dynamic table 
destinations returns wrong tableId
https://github.com/apache/beam/issues/21469 beam_PostCommit_XVR_Flink flaky: 
Connection refused
https://github.com/apache/beam/issues/21424 Java VR (Dataflow, V2, Streaming) 
failing: ParDoTest$TimestampTests/OnWindowExpirationTests
https://github.com/apache/beam/issues/21262 Python AfterAny, AfterAll do not 
follow spec
https://github.com/apache/beam/issues/21260 Python DirectRunner does not emit 
data at GC time
https://github.com/apache/beam/issues/21121