Hi Valeri,

For now it’s not possible to write to the different AWS regions from the same 
write transform instance. There is an open Jira about this [1].

As a workaround (not very effective maybe, I didn’t try) but I guess you can 
branch your input PCollection into several branches, depending on the number of 
destination regions, and then apply for every branch a write PTransform 
configured with a different AWS region. See an example how to branch with 
multiple outputs here [2] at “A single transform that produces multiple outputs”

—
Alexey

[1] https://issues.apache.org/jira/browse/BEAM-10133
[2] 
https://beam.apache.org/documentation/pipelines/design-your-pipeline/#branching-pcollections

> On 27 Apr 2021, at 16:04, Valeri Tsolov <[email protected]> wrote:
> 
> Hello all,
> we are trying to run an Apache Beam pipeline which reads from Pub/Sub and 
> writes to multiple S3 buckets. The problem comes from the fact that we do not 
> know the region of destination buckets in Amazon. The items are mixed on the 
> Pub/Sub side for multiple s3 buckets in multiple regions.
> 
> The code for writing to AWS is below. We are also seeing that it is required 
> to have a specified concrete region. 
> Is there a way to write from a single Apache Beam pipeline to multiple AWS 
> regions? 
> 
> Thanks,
> Valeri
> public class WritePTransform extends
>     PTransform<PCollection<KV<String, String>>, WriteFilesResult<String>> {
>   private final String tempDirectory;
>   private final SerializableFunction<String, FileNaming> namingFn;
> 
>   public WritePTransform(String tempDirectory, SerializableFunction<String, 
> FileNaming> namingFn) {
>     this.tempDirectory = tempDirectory;
>     this.namingFn = namingFn;
>   }
> 
>   @Override
>   public WriteFilesResult<String> expand(PCollection<KV<String, String>> 
> input) {
>     return input.apply("WriteFilesToFileSystem",
>         FileIO.<String, KV<String, String>>writeDynamic()
>             .by(KV::getKey)
>             .withDestinationCoder(StringUtf8Coder.of())
>             .withTempDirectory(tempDirectory)
>             .via(Contextful.fn(KV::getValue), TextIO.sink())
>             .withNaming(namingFn)
>             .withNumShards(1));
>   }
> }
> 
> CONFIDENTIALITY NOTICE – This message and any attached documents contain 
> information which may be confidential, subject to privilege or exempt from 
> disclosure under applicable law.  These materials may be used only by the 
> intended recipient of this communication.  You are hereby notified that any 
> distribution, disclosure, printing, copying, storage, modification or the 
> taking of any action in reliance on this communication is strictly 
> prohibited.  Delivery of this message to any person other than the intended 
> recipient shall not compromise or waive such confidentially, privilege or 
> exemption from disclosure as to this communication.  If you have received 
> this communication in error, please immediately notify the sender and delete 
> the message in its entirety from your system.
> 

Reply via email to