Hello everyone. I’m getting the following error when attempting to use the 
FileIO apis (beam-2.15.0) and integrating with a 3rd party filesystem, in this 
case AWS S3:

java.lang.IllegalArgumentException: No filesystem found for scheme s3
    at 
org.apache.beam.sdk.io.FileSystems.getFileSystemInternal(FileSystems.java:456)
    at org.apache.beam.sdk.io.FileSystems.matchNewResource(FileSystems.java:526)
    at 
org.apache.beam.sdk.io.FileBasedSink$FileResultCoder.decode(FileBasedSink.java:1149)
    at 
org.apache.beam.sdk.io.FileBasedSink$FileResultCoder.decode(FileBasedSink.java:1105)
    at org.apache.beam.sdk.coders.Coder.decode(Coder.java:159)
    at org.apache.beam.sdk.transforms.join.UnionCoder.decode(UnionCoder.java:83)
    at org.apache.beam.sdk.transforms.join.UnionCoder.decode(UnionCoder.java:32)
    at 
org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:543)
    at 
org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:534)
    at 
org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:480)
    at 
org.apache.beam.runners.flink.translation.types.CoderTypeSerializer.deserialize(CoderTypeSerializer.java:93)
    at 
org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)
    at 
org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:106)
    at 
org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:72)
    at 
org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:47)
    at 
org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:73)
    at 
org.apache.flink.runtime.operators.FlatMapDriver.run(FlatMapDriver.java:107)
    at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:503)
    at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:368)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
    at java.lang.Thread.run(Thread.java:748)

For reference, the write code resembles this:

FileIO.Write<?, GenericRecord> write = FileIO.<GenericRecord>write()
                .via(ParquetIO.sink(schema))
                .to(options.getOutputDir()). // will be something like: 
s3://<bucket>/<path>
                .withSuffix(".parquet");

records.apply(String.format("Write(%s)", options.getOutputDir()), write);

I have setup the PipelineOptions with all the relevant AWS options and the 
issue does not appear to be related to ParquetIO.sink() directly.  I am able to 
reliably reproduce the issue using JSON formatted records and TextIO.sink(), as 
well.

Just trying some different knobs, I went ahead and set the following option:

        write = write.withNoSpilling();

This actually seemed to fix the issue, only to have it reemerge as I scaled up 
the data set size.  The stack trace, while very similar, reads:

java.lang.IllegalArgumentException: No filesystem found for scheme s3
    at 
org.apache.beam.sdk.io.FileSystems.getFileSystemInternal(FileSystems.java:456)
    at org.apache.beam.sdk.io.FileSystems.matchNewResource(FileSystems.java:526)
    at 
org.apache.beam.sdk.io.FileBasedSink$FileResultCoder.decode(FileBasedSink.java:1149)
    at 
org.apache.beam.sdk.io.FileBasedSink$FileResultCoder.decode(FileBasedSink.java:1105)
    at org.apache.beam.sdk.coders.Coder.decode(Coder.java:159)
    at org.apache.beam.sdk.coders.KvCoder.decode(KvCoder.java:82)
    at org.apache.beam.sdk.coders.KvCoder.decode(KvCoder.java:36)
    at 
org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:543)
    at 
org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:534)
    at 
org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:480)
    at 
org.apache.beam.runners.flink.translation.types.CoderTypeSerializer.deserialize(CoderTypeSerializer.java:93)
    at 
org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)
    at 
org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:106)
    at 
org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:72)
    at 
org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:47)
    at 
org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:73)
    at org.apache.flink.runtime.operators.NoOpDriver.run(NoOpDriver.java:94)
    at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:503)
    at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:368)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
    at java.lang.Thread.run(Thread.java:748)

I’ll be interested to hear some theories on the differences/similarities in the 
stacks.  And lastly, I tried adding the following deprecated option (with and 
without the withNoSpilling() option):

write = write.withIgnoreWindowing();

This seemed to fix the issue altogether but aside from having to rely on a 
deprecated feature, there is the bigger issue of why?

In reading through some of the source, it seems a common pattern to have to 
manually register the pipeline options to seed the filesystem registry during 
the setup part of the operator lifecycle, e.g.: 
https://github.com/apache/beam/blob/release-2.15.0/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java#L304-L313

Is it possible that I have hit upon a couple scenarios where that has not taken 
place?  Unfortunately, I’m not yet at a position to suggest a fix, but I’m 
guessing there’s some missing initialization code in one or more of the batch 
operators.  If this is indeed a legitimate issue, I’ll be happy to log an 
issue, but I’ll hold off until the community gets a chance to look at it.

Thanks,

  *   Preston



CONFIDENTIALITY NOTICE This message and any included attachments are from 
Cerner Corporation and are intended only for the addressee. The information 
contained in this message is confidential and may constitute inside or 
non-public information under international, federal, or state securities laws. 
Unauthorized forwarding, printing, copying, distribution, or use of such 
information is strictly prohibited and may be unlawful. If you are not the 
addressee, please promptly delete this message and notify the sender of the 
delivery error by e-mail or you may call Cerner's corporate offices in Kansas 
City, Missouri, U.S.A at (+1) (816)221-1024.

Reply via email to