Hello all,
we are trying to run an Apache Beam pipeline which reads from Pub/Sub and
writes to multiple S3 buckets. The problem comes from the fact that we do
not know the region of destination buckets in Amazon. The items are mixed
on the Pub/Sub side for multiple s3 buckets in multiple regions.
The code for writing to AWS is below. We are also seeing that it is
required to have a specified concrete region.
Is there a way to write from a single Apache Beam pipeline to multiple AWS
regions?
Thanks,
Valeri
public class WritePTransform extends
PTransform<PCollection<KV<String, String>>, WriteFilesResult<String>> {
private final String tempDirectory;
private final SerializableFunction<String, FileNaming> namingFn;
public WritePTransform(String tempDirectory,
SerializableFunction<String, FileNaming> namingFn) {
this.tempDirectory = tempDirectory;
this.namingFn = namingFn;
}
@Override
public WriteFilesResult<String> expand(PCollection<KV<String,
String>> input) {
return input.apply("WriteFilesToFileSystem",
FileIO.<String, KV<String, String>>writeDynamic()
.by(KV::getKey)
.withDestinationCoder(StringUtf8Coder.of())
.withTempDirectory(tempDirectory)
.via(Contextful.fn(KV::getValue), TextIO.sink())
.withNaming(namingFn)
.withNumShards(1));
}
}
--
*CONFIDENTIALITY NOTICE –* This
message and any attached documents
contain information which may be
confidential, subject to privilege or
exempt from disclosure under applicable
law. These materials may be used
only by the intended recipient of this
communication. You are hereby
notified that any distribution, disclosure,
printing, copying, storage,
modification or the taking of any action in
reliance on this communication
is strictly prohibited. Delivery of this message
to any person other than
the intended recipient shall not compromise or waive
such confidentially,
privilege or exemption from disclosure as to this
communication. If you
have received this communication in error, please
immediately notify the
sender and delete the message in its entirety from your
system.