[Question] Read Parquet Schema from S3 Directory
Hello, I am a developer trying to use Apache Beam in my Java application, and I'm running into an issue with reading multiple Parquet files from a directory in S3. I'm able to successfully run this line of code, where tempPath = "s3:///*.parquet": PCollection records = pipeline.apply("Read parquet file in as Generic Records", ParquetIO.read(schema).from(tempPath)); My problem is reading the schema beforehand. At runtime, I only have the name of the S3 bucket, which has all the Parquet files I need underneath it. However, I am unable to use that same tempPath above to retrieve my schema. Because the path is not pointing to a singular parquet file, the ParquetFileReader class from Apache Hadoop throws an error: No such file or directory: s3a:///*.parquet. To read my schema, I'm using this chunk of code: Configuration configuration = new Configuration(); configuration.set("fs.s3a.access.key","); configuration.set("fs.s3a.secret.key", ""); configuration.set("fs.s3a.session.token",""); configuration.set("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem"); configuration.set("fs.s3a.server-side-encryption-algorithm", ""); configuration.set("fs.s3a.proxy.host", ""); configuration.set("fs.s3a.proxy.port", ""); configuration.set("fs.s3a.aws.credentials.provider", "org.apache.hadoop.fs.s3a.TemporaryAWSCredentialsProvider"); String hadoopFilePath = new Path("s3a:///*.parquet"); ParquetFileReader r = ParquetFileReader.open(HadoopInputFile.fromPath(hadoopFilePath, configuration)); MessageType messageType = r.getFooter().getFileMetaData().getSchema(); AvroSchemaConverter converter = new AvroSchemaConverter(); Schema schema = converter.convert(messageType); The red line is where the code is failing. Is there maybe a Hadoop Configuration I can set to force Hadoop to read recursively? I realize this is kind of a Beam-adjacent problem, but I've been struggling with this for a while, so any help would be appreciated! Thanks and sincerely, Ramya __ The information contained in this e-mail may be confidential and/or proprietary to Capital One and/or its affiliates and may only be used solely in performance of work or services for Capital One. The information transmitted herewith is intended only for use by the individual or entity to which it is addressed. If the reader of this message is not the intended recipient, you are hereby notified that any review, retransmission, dissemination, distribution, copying or other use of, or taking of any action in reliance upon this information is strictly prohibited. If you have received this communication in error, please contact the sender and delete the material from your computer.
Re: [GitHub Actions] Requiring a test but not running it until requested
OK, just realizing a low-tech way this has been done in my experience: have a workflow that goes red unless the commit / PR message to say something like "TESTED=link to workflow run". I don't love it, and it is easy to get lazy and circumvent, but I can see why it is what people chose to do. A variation on that could be an integration test workflow that runs unconditionally based on the path, but immediately fails early unless the PR contains some [recently added] magic phrase or comment like "Run large tests". When the author is ready, they can lick in and re-run failed jobs through the GHA UI. In theory the draft/ready status could be used for this but I doubt that practice would catch on. Kenn On Wed, Oct 11, 2023 at 12:07 PM Danny McCormick via dev < dev@beam.apache.org> wrote: > I actually don't think GitHub supports path filters for required checks, > so you can't say something like "check X is only required if path Y or Z > are modified", you can only say "check X is required". I'm not 100% sure > on this, but it matches my memory and I could neither find docs with that > feature or get it to function like that on a personal repo. There are ways > to work around this (e.g. always status "good" when not on the path), but > they're messy. You also need a way of kicking off the job (today that is > comment triggers which is probably fine if these are limited to a few > checks per PR). > > Outside of the feasibility question, I'm at least theoretically > interested. This could allow us to turn some of our postcommits into > precommits without burning too much CI compute. I'm also generally +1 on > requiring more checks to pass before merging, especially if we can do so > through tooling. > > On Wed, Oct 11, 2023 at 10:42 AM Kenneth Knowles wrote: > >> From our other thread I had a thought about our "only on request" tests. >> >> Today, in theory: >> >> - The lightweight tests run automatically based on path matching. This >> is an approximate implementation of the ideal of running based on whether >> they could impact a test signal. >> - Heavyweight (and more flaky) tests run on request. >> >> A while ago, our "lightweight" tests were a huge monolith and very flaky >> (Python still is in this state I think). While I was splitting up >> monolithic "lightweight" Java SDK tests to make them run only on relevant >> paths, some of the "heavyweight" tests became small enough that they run >> automatically, so we also have: >> >> - Smaller integration tests (like DirectRunner doing SplunkIO) run >> automatically based on path matching. >> >> Danny mentioned the idea of changing the above to: >> >> - Heavyweight tests run only if the lightweight tests are healthy. >> >> Here's an idea I had about a combination of these that I wanted to know >> if anyone had seen it or thought of how it could happen or why it is a bad >> idea: >> >> - Heavyweight tests are *required but not automatically run* based on >> path matching. A status would show up indicating that the PR is not green >> until you request and pass the heavyweight tests. >> - When the PR is actually ready you request them. >> >> Then I would move even the small integration tests into that latter >> category. Incidentally this also could easily apply to non-hermetic tests >> that make our security posture more difficult, requiring a committer to >> approve running them. >> >> Is this possible? Good? Bad? >> >> Kenn >> >
Re: [GitHub Actions] Requiring a test but not running it until requested
I actually don't think GitHub supports path filters for required checks, so you can't say something like "check X is only required if path Y or Z are modified", you can only say "check X is required". I'm not 100% sure on this, but it matches my memory and I could neither find docs with that feature or get it to function like that on a personal repo. There are ways to work around this (e.g. always status "good" when not on the path), but they're messy. You also need a way of kicking off the job (today that is comment triggers which is probably fine if these are limited to a few checks per PR). Outside of the feasibility question, I'm at least theoretically interested. This could allow us to turn some of our postcommits into precommits without burning too much CI compute. I'm also generally +1 on requiring more checks to pass before merging, especially if we can do so through tooling. On Wed, Oct 11, 2023 at 10:42 AM Kenneth Knowles wrote: > From our other thread I had a thought about our "only on request" tests. > > Today, in theory: > > - The lightweight tests run automatically based on path matching. This is > an approximate implementation of the ideal of running based on whether they > could impact a test signal. > - Heavyweight (and more flaky) tests run on request. > > A while ago, our "lightweight" tests were a huge monolith and very flaky > (Python still is in this state I think). While I was splitting up > monolithic "lightweight" Java SDK tests to make them run only on relevant > paths, some of the "heavyweight" tests became small enough that they run > automatically, so we also have: > > - Smaller integration tests (like DirectRunner doing SplunkIO) run > automatically based on path matching. > > Danny mentioned the idea of changing the above to: > > - Heavyweight tests run only if the lightweight tests are healthy. > > Here's an idea I had about a combination of these that I wanted to know if > anyone had seen it or thought of how it could happen or why it is a bad > idea: > > - Heavyweight tests are *required but not automatically run* based on > path matching. A status would show up indicating that the PR is not green > until you request and pass the heavyweight tests. > - When the PR is actually ready you request them. > > Then I would move even the small integration tests into that latter > category. Incidentally this also could easily apply to non-hermetic tests > that make our security posture more difficult, requiring a committer to > approve running them. > > Is this possible? Good? Bad? > > Kenn >
Re: [YAML] Fileio sink parameterization (streaming, sharding, and naming)
On Wed, Oct 11, 2023 at 6:55 AM Kenneth Knowles wrote: > So, top-posting because the threading got to be a lot for me and I think > it forked a bit too... I may even be restating something someone said, so > apologies for that. > > Very very good point about *required* parameters where if you don't use > them then you will end up with two writers writing to the same file. The > easiest example to work with might be if you omitted SHARD_NUM so all > shards end up clobbering the same file. > > I think there's a unifying perspective between prefix/suffix and the need > to be sure to include critical sharding variables. Essentially it is my > point about it being a "big data fileset". It is perhaps unrealistic but > ideally the user names the big data fileset and then the mandatory other > pieces are added outside of their control. For example if I name my big > data fileset "foo" then that implicitly means that "foo" consists of all > the files named "foo/${SHARD_NUM}-of-${SHARD_TOTAL}". And yes now that I > re-read I see you basically said the same thing. In some cases the required > fields will include $WINDOW, $KEY, and $PANE_INDEX, yes? Even though the > user can think of it as a textual template, if we can use a library that > yields an abstract syntax tree for the expression we can easily check these > requirements in a robust way - or we could do it in a non-robust way be > string-scraping ourselves. > Yes. I think we are talking about the same thing. Users should not have full control over the filename since that could lead to conflicts and data loss when data is being written in parallel from multiple workers. Users can refer to the big data fileset being written using the glob "/**". In addition users have control over the filename and (file extension, for example) which can be useful for some downstream use-cases. Rest of the filename will be filled out by the SDK (window, pane etc.) to make sure that the files written by different workers do not conflict. Thanks, Cham > > We actually are very close to this in FileIO. I think the interpretation > of "prefix" is that it is the filename "foo" as above, and "suffix" is > really something like ".txt" that you stick on the end of everything for > whatever reason. > > Kenn > > On Tue, Oct 10, 2023 at 7:12 PM Robert Bradshaw via dev < > dev@beam.apache.org> wrote: > >> On Tue, Oct 10, 2023 at 4:05 PM Chamikara Jayalath >> wrote: >> >>> >>> On Tue, Oct 10, 2023 at 4:02 PM Robert Bradshaw >>> wrote: >>> On Tue, Oct 10, 2023 at 3:53 PM Chamikara Jayalath < chamik...@google.com> wrote: > > On Tue, Oct 10, 2023 at 3:41 PM Reuven Lax wrote: > >> I suspect some simple pattern templating would solve most use cases. >> We probably would want to support timestamp formatting (e.g. $ $M $D) >> as well. >> >> On Tue, Oct 10, 2023 at 3:35 PM Robert Bradshaw >> wrote: >> >>> On Mon, Oct 9, 2023 at 3:09 PM Chamikara Jayalath < >>> chamik...@google.com> wrote: >>> I would say: sink: type: WriteToParquet config: path: /beam/filesytem/dest prefix: suffix: Underlying SDK will add the middle part of the file names to make sure that files generated by various bundles/windows/shards do not conflict. >>> >>> What's the relationship between path and prefix? Is path the >>> directory part of the full path, or does prefix precede it? >>> >> > prefix would be the first part of the file name so each shard will be > named. > /-- > > This is similar to what we do in existing SDKS. For example, Java > FileIO, > > > https://github.com/apache/beam/blob/65eaf45026e9eeb61a9e05412488e5858faec6de/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileIO.java#L187 > Yeah, although there's no distinction between path and prefix. >>> >>> Ah, for FIleIO, path comes from the "to" call. >>> >>> >>> https://github.com/apache/beam/blob/65eaf45026e9eeb61a9e05412488e5858faec6de/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileIO.java#L1125 >>> >>> >> >> Ah. I guess there's some inconsistency here, e.g. text files are written >> to a filenamePrefix that subsumes both: >> https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/io/TextIO.Write.html#to-java.lang.String- >> >> >>> >>> This will satisfy the vast majority of use-cases I believe. Fully customizing the file pattern sounds like a more advanced use case that can be left for "real" SDKs. >>> >>> Yea, we don't have to do everything. >>> >>> For dynamic destinations, I think just making the "path" component support a lambda that is parameterized by the input should be adequate since this allows customers to direct
[RESULT] [VOTE] Release 2.51.0, release candidate #1
The vote has passed. There are 5 +1 binding votes: - Robert Bradshaw - Jan Lukavský - Ahmet Altay - Jean-Baptiste Onofré - Alexey Romanenko Additionally there are 5 non-binding +1 votes: - Danny McCormick - Svetak Sundhar - XQ Hu - Bruno Volpato - Yi Hu There are no disapproving votes. I will note the known issue(s) as I finalize the release. Kenn On Tue, Oct 10, 2023 at 4:30 PM Ahmet Altay via dev wrote: > Thank you for the information. > > I agree with Kenn in that case. This could wait for the next release. > Unless there is another reason to do the RC2. > > On Tue, Oct 10, 2023 at 12:30 PM Yi Hu wrote: > >> >> Would it impact all python users including breaking the new user, quick >>> start experience? Or would it impact users of a specific IO or >>> configuration? >>> >> >> It is the latter. It will impact users of Specific IO (BigQueryIO read) >> specific configuration (Direct_Read). Note that the default configuration >> for BigQueryIO read is EXPORT. So this won't affect "quick-start" examples >> having default settings. >> >> It also won't affect users using SDK docker containers (e.g. Dataflow >> users and Flink/Spark users running on a remote cluster). It will affect >> users running in direct runner, and local portable runners (e.g. Flink >> local cluster) with LOOPBACK configuration, which is exactly what our >> Python PostComit is doing. >> >> >
Re: [VOTE] Release 2.51.0, release candidate #1
OK I'm ready. +1 (binding) On Tue, Oct 10, 2023 at 4:30 PM Ahmet Altay via dev wrote: > Thank you for the information. > > I agree with Kenn in that case. This could wait for the next release. > Unless there is another reason to do the RC2. > > On Tue, Oct 10, 2023 at 12:30 PM Yi Hu wrote: > >> >> Would it impact all python users including breaking the new user, quick >>> start experience? Or would it impact users of a specific IO or >>> configuration? >>> >> >> It is the latter. It will impact users of Specific IO (BigQueryIO read) >> specific configuration (Direct_Read). Note that the default configuration >> for BigQueryIO read is EXPORT. So this won't affect "quick-start" examples >> having default settings. >> >> It also won't affect users using SDK docker containers (e.g. Dataflow >> users and Flink/Spark users running on a remote cluster). It will affect >> users running in direct runner, and local portable runners (e.g. Flink >> local cluster) with LOOPBACK configuration, which is exactly what our >> Python PostComit is doing. >> >> >
[GitHub Actions] Requiring a test but not running it until requested
>From our other thread I had a thought about our "only on request" tests. Today, in theory: - The lightweight tests run automatically based on path matching. This is an approximate implementation of the ideal of running based on whether they could impact a test signal. - Heavyweight (and more flaky) tests run on request. A while ago, our "lightweight" tests were a huge monolith and very flaky (Python still is in this state I think). While I was splitting up monolithic "lightweight" Java SDK tests to make them run only on relevant paths, some of the "heavyweight" tests became small enough that they run automatically, so we also have: - Smaller integration tests (like DirectRunner doing SplunkIO) run automatically based on path matching. Danny mentioned the idea of changing the above to: - Heavyweight tests run only if the lightweight tests are healthy. Here's an idea I had about a combination of these that I wanted to know if anyone had seen it or thought of how it could happen or why it is a bad idea: - Heavyweight tests are *required but not automatically run* based on path matching. A status would show up indicating that the PR is not green until you request and pass the heavyweight tests. - When the PR is actually ready you request them. Then I would move even the small integration tests into that latter category. Incidentally this also could easily apply to non-hermetic tests that make our security posture more difficult, requiring a committer to approve running them. Is this possible? Good? Bad? Kenn
Re: [YAML] Fileio sink parameterization (streaming, sharding, and naming)
So, top-posting because the threading got to be a lot for me and I think it forked a bit too... I may even be restating something someone said, so apologies for that. Very very good point about *required* parameters where if you don't use them then you will end up with two writers writing to the same file. The easiest example to work with might be if you omitted SHARD_NUM so all shards end up clobbering the same file. I think there's a unifying perspective between prefix/suffix and the need to be sure to include critical sharding variables. Essentially it is my point about it being a "big data fileset". It is perhaps unrealistic but ideally the user names the big data fileset and then the mandatory other pieces are added outside of their control. For example if I name my big data fileset "foo" then that implicitly means that "foo" consists of all the files named "foo/${SHARD_NUM}-of-${SHARD_TOTAL}". And yes now that I re-read I see you basically said the same thing. In some cases the required fields will include $WINDOW, $KEY, and $PANE_INDEX, yes? Even though the user can think of it as a textual template, if we can use a library that yields an abstract syntax tree for the expression we can easily check these requirements in a robust way - or we could do it in a non-robust way be string-scraping ourselves. We actually are very close to this in FileIO. I think the interpretation of "prefix" is that it is the filename "foo" as above, and "suffix" is really something like ".txt" that you stick on the end of everything for whatever reason. Kenn On Tue, Oct 10, 2023 at 7:12 PM Robert Bradshaw via dev wrote: > On Tue, Oct 10, 2023 at 4:05 PM Chamikara Jayalath > wrote: > >> >> On Tue, Oct 10, 2023 at 4:02 PM Robert Bradshaw >> wrote: >> >>> On Tue, Oct 10, 2023 at 3:53 PM Chamikara Jayalath >>> wrote: >>> On Tue, Oct 10, 2023 at 3:41 PM Reuven Lax wrote: > I suspect some simple pattern templating would solve most use cases. > We probably would want to support timestamp formatting (e.g. $ $M $D) > as well. > > On Tue, Oct 10, 2023 at 3:35 PM Robert Bradshaw > wrote: > >> On Mon, Oct 9, 2023 at 3:09 PM Chamikara Jayalath < >> chamik...@google.com> wrote: >> >>> I would say: >>> >>> sink: >>> type: WriteToParquet >>> config: >>> path: /beam/filesytem/dest >>> prefix: >>> suffix: >>> >>> Underlying SDK will add the middle part of the file names to make >>> sure that files generated by various bundles/windows/shards do not >>> conflict. >>> >> >> What's the relationship between path and prefix? Is path the >> directory part of the full path, or does prefix precede it? >> > prefix would be the first part of the file name so each shard will be named. /-- This is similar to what we do in existing SDKS. For example, Java FileIO, https://github.com/apache/beam/blob/65eaf45026e9eeb61a9e05412488e5858faec6de/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileIO.java#L187 >>> >>> Yeah, although there's no distinction between path and prefix. >>> >> >> Ah, for FIleIO, path comes from the "to" call. >> >> >> https://github.com/apache/beam/blob/65eaf45026e9eeb61a9e05412488e5858faec6de/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileIO.java#L1125 >> >> > > Ah. I guess there's some inconsistency here, e.g. text files are written > to a filenamePrefix that subsumes both: > https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/io/TextIO.Write.html#to-java.lang.String- > > >> >>> >> >>> This will satisfy the vast majority of use-cases I believe. Fully >>> customizing the file pattern sounds like a more advanced use case that >>> can >>> be left for "real" SDKs. >>> >> >> Yea, we don't have to do everything. >> >> >>> For dynamic destinations, I think just making the "path" component >>> support a lambda that is parameterized by the input should be adequate >>> since this allows customers to direct files written to different >>> destination directories. >>> >>> sink: >>> type: WriteToParquet >>> config: >>> path: >>> prefix: >>> suffix: >>> >>> I'm not sure what would be the best way to specify a lambda here >>> though. Maybe a regex or the name of a Python callable ? >>> >> >> I'd rather not require Python for a pure Java pipeline, but some kind >> of a pattern template may be sufficient here. >> >> >>> On Mon, Oct 9, 2023 at 2:06 PM Robert Bradshaw via dev < >>> dev@beam.apache.org> wrote: >>> .On Mon, Oct 9, 2023 at 1:49 PM Reuven Lax wrote: > Just FYI - the reason why names (including prefixes) in > DynamicDestinations were parameterized via a lambda
Beam High Priority Issue Report (43)
This is your daily summary of Beam's current high priority issues that may need attention. See https://beam.apache.org/contribute/issue-priorities for the meaning and expectations around issue priorities. Unassigned P1 Issues: https://github.com/apache/beam/issues/28909 [Stuck Test]: GitHub Action issue_comment trigger not scalable https://github.com/apache/beam/issues/28760 [Bug]: EFO Kinesis IO reader provided by apache beam does not pick the event time for watermarking https://github.com/apache/beam/issues/28703 [Failing Test]: Building a wheel for integration tests sometimes times out https://github.com/apache/beam/issues/28383 [Failing Test]: org.apache.beam.runners.dataflow.worker.StreamingDataflowWorkerTest.testMaxThreadMetric https://github.com/apache/beam/issues/28339 Fix failing "beam_PostCommit_XVR_GoUsingJava_Dataflow" job https://github.com/apache/beam/issues/28326 Bug: apache_beam.io.gcp.pubsublite.ReadFromPubSubLite not working https://github.com/apache/beam/issues/28142 [Bug]: [Go SDK] Memory seems to be leaking on 2.49.0 with Dataflow https://github.com/apache/beam/issues/27892 [Bug]: ignoreUnknownValues not working when using CreateDisposition.CREATE_IF_NEEDED https://github.com/apache/beam/issues/27648 [Bug]: Python SDFs (e.g. PeriodicImpulse) running in Flink and polling using tracker.defer_remainder have checkpoint size growing indefinitely https://github.com/apache/beam/issues/27616 [Bug]: Unable to use applyRowMutations() in bigquery IO apache beam java https://github.com/apache/beam/issues/27486 [Bug]: Read from datastore with inequality filters https://github.com/apache/beam/issues/27314 [Failing Test]: bigquery.StorageApiSinkCreateIfNeededIT.testCreateManyTables[1] https://github.com/apache/beam/issues/27238 [Bug]: Window trigger has lag when using Kafka and GroupByKey on Dataflow Runner https://github.com/apache/beam/issues/26981 [Bug]: Getting an error related to SchemaCoder after upgrading to 2.48 https://github.com/apache/beam/issues/26911 [Bug]: UNNEST ARRAY with a nested ROW (described below) https://github.com/apache/beam/issues/26343 [Bug]: apache_beam.io.gcp.bigquery_read_it_test.ReadAllBQTests.test_read_queries is flaky https://github.com/apache/beam/issues/26329 [Bug]: BigQuerySourceBase does not propagate a Coder to AvroSource https://github.com/apache/beam/issues/26041 [Bug]: Unable to create exactly-once Flink pipeline with stream source and file sink https://github.com/apache/beam/issues/25975 [Bug]: Reducing parallelism in FlinkRunner leads to a data loss https://github.com/apache/beam/issues/24776 [Bug]: Race condition in Python SDK Harness ProcessBundleProgress https://github.com/apache/beam/issues/24389 [Failing Test]: HadoopFormatIOElasticTest.classMethod ExceptionInInitializerError ContainerFetchException https://github.com/apache/beam/issues/24313 [Flaky]: apache_beam/runners/portability/portable_runner_test.py::PortableRunnerTestWithSubprocesses::test_pardo_state_with_custom_key_coder https://github.com/apache/beam/issues/23944 beam_PreCommit_Python_Cron regularily failing - test_pardo_large_input flaky https://github.com/apache/beam/issues/23709 [Flake]: Spark batch flakes in ParDoLifecycleTest.testTeardownCalledAfterExceptionInProcessElement and ParDoLifecycleTest.testTeardownCalledAfterExceptionInStartBundle https://github.com/apache/beam/issues/23525 [Bug]: Default PubsubMessage coder will drop message id and orderingKey https://github.com/apache/beam/issues/22913 [Bug]: beam_PostCommit_Java_ValidatesRunner_Flink is flakes in org.apache.beam.sdk.transforms.GroupByKeyTest$BasicTests.testAfterProcessingTimeContinuationTriggerUsingState https://github.com/apache/beam/issues/22605 [Bug]: Beam Python failure for dataflow_exercise_metrics_pipeline_test.ExerciseMetricsPipelineTest.test_metrics_it https://github.com/apache/beam/issues/21714 PulsarIOTest.testReadFromSimpleTopic is very flaky https://github.com/apache/beam/issues/21706 Flaky timeout in github Python unit test action StatefulDoFnOnDirectRunnerTest.test_dynamic_timer_clear_then_set_timer https://github.com/apache/beam/issues/21643 FnRunnerTest with non-trivial (order 1000 elements) numpy input flakes in non-cython environment https://github.com/apache/beam/issues/21476 WriteToBigQuery Dynamic table destinations returns wrong tableId https://github.com/apache/beam/issues/21469 beam_PostCommit_XVR_Flink flaky: Connection refused https://github.com/apache/beam/issues/21424 Java VR (Dataflow, V2, Streaming) failing: ParDoTest$TimestampTests/OnWindowExpirationTests https://github.com/apache/beam/issues/21262 Python AfterAny, AfterAll do not follow spec https://github.com/apache/beam/issues/21260 Python DirectRunner does not emit data at GC time https://github.com/apache/beam/issues/21121 apache_beam.examples.streaming_wordcount_it_test.StreamingWordCountIT.test_streaming_wordcount_it flakey https://github.com/apache/beam/issues/21104 Flaky: