Hello everyone. I’m getting the following error when attempting to
use the FileIO apis (beam-2.15.0) and integrating with a 3rd party
filesystem, in this case AWS S3:____
__ __
java.lang.IllegalArgumentException: No filesystem found for scheme
s3____
at
org.apache.beam.sdk.io.FileSystems.getFileSystemInternal(FileSystems.java:456)____
at
org.apache.beam.sdk.io.FileSystems.matchNewResource(FileSystems.java:526)____
at
org.apache.beam.sdk.io.FileBasedSink$FileResultCoder.decode(FileBasedSink.java:1149)____
at
org.apache.beam.sdk.io.FileBasedSink$FileResultCoder.decode(FileBasedSink.java:1105)____
at org.apache.beam.sdk.coders.Coder.decode(Coder.java:159)____
at
org.apache.beam.sdk.transforms.join.UnionCoder.decode(UnionCoder.java:83)____
at
org.apache.beam.sdk.transforms.join.UnionCoder.decode(UnionCoder.java:32)____
at
org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:543)____
at
org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:534)____
at
org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:480)____
at
org.apache.beam.runners.flink.translation.types.CoderTypeSerializer.deserialize(CoderTypeSerializer.java:93)____
at
org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)____
at
org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:106)____
at
org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:72)____
at
org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:47)____
at
org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:73)____
at
org.apache.flink.runtime.operators.FlatMapDriver.run(FlatMapDriver.java:107)____
at
org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:503)____
at
org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:368)____
at
org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)____
at java.lang.Thread.run(Thread.java:748)____
__ __
For reference, the write code resembles this:____
__ __
FileIO.Write<?, GenericRecord> write =
FileIO.<GenericRecord>write()____
.via(ParquetIO.sink(schema))____
.to(options.getOutputDir()). // will be something
like: s3://<bucket>/<path>____
.withSuffix(".parquet");____
__ __
records.apply(String.format("Write(%s)", options.getOutputDir()),
write);____
__ __
I have setup the PipelineOptions with all the relevant AWS options
and the issue does not appear to be related to ParquetIO.sink()
directly. I am able to reliably reproduce the issue using JSON
formatted records and TextIO.sink(), as well.____
__ __
Just trying some different knobs, I went ahead and set the
following option:____
__ __
write = write.withNoSpilling();____
__ __
This actually seemed to fix the issue, only to have it reemerge as
I scaled up the data set size. The stack trace, while very
similar, reads:____
__ __
java.lang.IllegalArgumentException: No filesystem found for scheme
s3____
at
org.apache.beam.sdk.io.FileSystems.getFileSystemInternal(FileSystems.java:456)____
at
org.apache.beam.sdk.io.FileSystems.matchNewResource(FileSystems.java:526)____
at
org.apache.beam.sdk.io.FileBasedSink$FileResultCoder.decode(FileBasedSink.java:1149)____
at
org.apache.beam.sdk.io.FileBasedSink$FileResultCoder.decode(FileBasedSink.java:1105)____
at org.apache.beam.sdk.coders.Coder.decode(Coder.java:159)____
at
org.apache.beam.sdk.coders.KvCoder.decode(KvCoder.java:82)____
at
org.apache.beam.sdk.coders.KvCoder.decode(KvCoder.java:36)____
at
org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:543)____
at
org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:534)____
at
org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:480)____
at
org.apache.beam.runners.flink.translation.types.CoderTypeSerializer.deserialize(CoderTypeSerializer.java:93)____
at
org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)____
at
org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:106)____
at
org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:72)____
at
org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:47)____
at
org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:73)____
at
org.apache.flink.runtime.operators.NoOpDriver.run(NoOpDriver.java:94)____
at
org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:503)____
at
org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:368)____
at
org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)____
at java.lang.Thread.run(Thread.java:748) ____
__ __
I’ll be interested to hear some theories on the
differences/similarities in the stacks. And lastly, I tried
adding the following deprecated option (with and without the
withNoSpilling() option):____
__ __
write = write.withIgnoreWindowing();____
__ __
This seemed to fix the issue altogether but aside from having to
rely on a deprecated feature, there is the bigger issue of why?____
__ __
In reading through some of the source, it seems a common pattern
to have to manually register the pipeline options to seed the
filesystem registry during the setup part of the operator
lifecycle, e.g.:
https://github.com/apache/beam/blob/release-2.15.0/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java#L304-L313
<https://nam01.safelinks.protection.outlook.com/?url=https%3A%2F%2Fgithub.com%2Fapache%2Fbeam%2Fblob%2Frelease-2.15.0%2Frunners%2Fflink%2Fsrc%2Fmain%2Fjava%2Forg%2Fapache%2Fbeam%2Frunners%2Fflink%2Ftranslation%2Fwrappers%2Fstreaming%2FDoFnOperator.java%23L304-L313&data=02%7C01%7CPreston.B.Koprivica%40cerner.com%7C6dfc70aff6754e8a742d08d73e9ce04e%7Cfbc493a80d244454a815f4ca58e8c09d%7C0%7C0%7C637046715865253070&sdata=TWbNuwGSmMMscZIW0Iwo8MTZwxipBZMF4Zb0oyHd0do%3D&reserved=0>
____
__ __
Is it possible that I have hit upon a couple scenarios where that
has not taken place? Unfortunately, I’m not yet at a position to
suggest a fix, but I’m guessing there’s some missing
initialization code in one or more of the batch operators. If
this is indeed a legitimate issue, I’ll be happy to log an issue,
but I’ll hold off until the community gets a chance to look at
it.____
__ __
Thanks,____
* Preston ____
CONFIDENTIALITY NOTICE This message and any included attachments
are from Cerner Corporation and are intended only for the
addressee. The information contained in this message is
confidential and may constitute inside or non-public information
under international, federal, or state securities laws.
Unauthorized forwarding, printing, copying, distribution, or use
of such information is strictly prohibited and may be unlawful. If
you are not the addressee, please promptly delete this message and
notify the sender of the delivery error by e-mail or you may call
Cerner's corporate offices in Kansas City, Missouri, U.S.A at (+1)
(816)221-1024 <tel:(816)%20221-1024>.