Hi Valeri, For now it’s not possible to write to the different AWS regions from the same write transform instance. There is an open Jira about this [1].
As a workaround (not very effective maybe, I didn’t try) but I guess you can branch your input PCollection into several branches, depending on the number of destination regions, and then apply for every branch a write PTransform configured with a different AWS region. See an example how to branch with multiple outputs here [2] at “A single transform that produces multiple outputs” — Alexey [1] https://issues.apache.org/jira/browse/BEAM-10133 [2] https://beam.apache.org/documentation/pipelines/design-your-pipeline/#branching-pcollections > On 27 Apr 2021, at 16:04, Valeri Tsolov <[email protected]> wrote: > > Hello all, > we are trying to run an Apache Beam pipeline which reads from Pub/Sub and > writes to multiple S3 buckets. The problem comes from the fact that we do not > know the region of destination buckets in Amazon. The items are mixed on the > Pub/Sub side for multiple s3 buckets in multiple regions. > > The code for writing to AWS is below. We are also seeing that it is required > to have a specified concrete region. > Is there a way to write from a single Apache Beam pipeline to multiple AWS > regions? > > Thanks, > Valeri > public class WritePTransform extends > PTransform<PCollection<KV<String, String>>, WriteFilesResult<String>> { > private final String tempDirectory; > private final SerializableFunction<String, FileNaming> namingFn; > > public WritePTransform(String tempDirectory, SerializableFunction<String, > FileNaming> namingFn) { > this.tempDirectory = tempDirectory; > this.namingFn = namingFn; > } > > @Override > public WriteFilesResult<String> expand(PCollection<KV<String, String>> > input) { > return input.apply("WriteFilesToFileSystem", > FileIO.<String, KV<String, String>>writeDynamic() > .by(KV::getKey) > .withDestinationCoder(StringUtf8Coder.of()) > .withTempDirectory(tempDirectory) > .via(Contextful.fn(KV::getValue), TextIO.sink()) > .withNaming(namingFn) > .withNumShards(1)); > } > } > > CONFIDENTIALITY NOTICE – This message and any attached documents contain > information which may be confidential, subject to privilege or exempt from > disclosure under applicable law. These materials may be used only by the > intended recipient of this communication. You are hereby notified that any > distribution, disclosure, printing, copying, storage, modification or the > taking of any action in reliance on this communication is strictly > prohibited. Delivery of this message to any person other than the intended > recipient shall not compromise or waive such confidentially, privilege or > exemption from disclosure as to this communication. If you have received > this communication in error, please immediately notify the sender and delete > the message in its entirety from your system. >
