Hi Preston,

Sorry about the name mixup, of course I meant to write Preston not Magnus :) See my reply below.

cheers,
Max

On 25.09.19 08:31, Maximilian Michels wrote:
Hi Magnus,

Your observation seems to be correct. There is an issue with the file system registration.

The two types of errors you are seeing, as well as the successful run, are just due to the different structure of the generated transforms. The Flink scheduler will distribute them differently, which results in some pipelines being placed on task managers which happen to execute the FileSystems initialization code and others not.

There is a quick fix to at least initialize the file system in case it has not been initialized, by adding the loading code here: https://github.com/apache/beam/blob/948c6fae909685e09d36b23be643182b34c8df25/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystems.java#L463

However, there we do not have the pipeline options available, which prevents any configuration. The problem is that the error occurs in the coder used in a native Flink operation which does not even run user code.

I believe the only way fix this is to ship the FileSystems initialization code in CoderTypeSerializer where we are sure to execute it in time for any coders which depend on it.

Could you file an issue? I'd be happy to fix this then.

Thanks,
Max

On 24.09.19 09:54, Chamikara Jayalath wrote:
As Magnus mentioned, FileSystems are picked up from the class path and registered here. https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystems.java#L480

Seems like Flink is invoking this method at following locations.
https://github.com/apache/beam/blob/master/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineRunner.java#L142 https://github.com/apache/beam/blob/master/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobServerDriver.java#L63

I'm not too familiar about Flink sure why S3 is not properly being registered when running the Flink job. Ccing some folks who are more familiar about Flink.

+Ankur Goenka <mailto:goe...@google.com> +Maximilian Michels <mailto:m...@apache.org>

Thanks,
Cham


On Sat, Sep 21, 2019 at 9:18 AM Koprivica,Preston Blake <preston.b.kopriv...@cerner.com <mailto:preston.b.kopriv...@cerner.com>> wrote:

    Thanks for the reply Magnus.

    I'm sorry it wasn't more clear in the original message.  I have
    added the aws dependencies and set up the pipeline options with the
    aws options.   For the case where I set the write to ignore
    windowing, everything works.  But the option is deprecated and the
    comments warn against its usage.

    I'm wondering if where no options are set and I see the error that
    that is a case of improperly initialized filesystems in the flink
    runner.   Or maybe someone has some different ideas for the culprit.

    Get Outlook for Android <https://aka.ms/ghei36>

------------------------------------------------------------------------
    *From:* Magnus Runesson <ma...@linuxalert.org
    <mailto:ma...@linuxalert.org>>
    *Sent:* Saturday, September 21, 2019 9:06:03 AM
    *To:* user@beam.apache.org <mailto:user@beam.apache.org>
    <user@beam.apache.org <mailto:user@beam.apache.org>>
    *Subject:* Re: No filesystem found for scheme s3 using FileIO

    Hi!


    You probably miss the S3 filesystem in your classpath.

    If I remember correctly you must include this
https://search.maven.org/search?q=a:beam-sdks-java-io-amazon-web-services <https://nam01.safelinks.protection.outlook.com/?url=https%3A%2F%2Fsearch.maven.org%2Fsearch%3Fq%3Da%3Abeam-sdks-java-io-amazon-web-services&data=02%7C01%7CPreston.B.Koprivica%40cerner.com%7C6dfc70aff6754e8a742d08d73e9ce04e%7Cfbc493a80d244454a815f4ca58e8c09d%7C0%7C0%7C637046715865253070&sdata=ahe8GyTjSswDDCXoOimmVx5u%2FckkbGY4gTS4ZVSRv8Q%3D&reserved=0>
    package in your classpath/fat-jar.

    /Magnus

    On 2019-09-19 23:13, Koprivica,Preston Blake wrote:

    Hello everyone. I’m getting the following error when attempting to
    use the FileIO apis (beam-2.15.0) and integrating with a 3rd party
    filesystem, in this case AWS S3:____

    __ __

    java.lang.IllegalArgumentException: No filesystem found for scheme
    s3____

        at
org.apache.beam.sdk.io.FileSystems.getFileSystemInternal(FileSystems.java:456)____

        at
org.apache.beam.sdk.io.FileSystems.matchNewResource(FileSystems.java:526)____

        at
org.apache.beam.sdk.io.FileBasedSink$FileResultCoder.decode(FileBasedSink.java:1149)____

        at
org.apache.beam.sdk.io.FileBasedSink$FileResultCoder.decode(FileBasedSink.java:1105)____

        at org.apache.beam.sdk.coders.Coder.decode(Coder.java:159)____

        at
org.apache.beam.sdk.transforms.join.UnionCoder.decode(UnionCoder.java:83)____

        at
org.apache.beam.sdk.transforms.join.UnionCoder.decode(UnionCoder.java:32)____

        at
org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:543)____

        at
org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:534)____

        at
org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:480)____

        at
org.apache.beam.runners.flink.translation.types.CoderTypeSerializer.deserialize(CoderTypeSerializer.java:93)____

        at
org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)____

        at
org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:106)____

        at
org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:72)____

        at
org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:47)____

        at
org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:73)____

        at
org.apache.flink.runtime.operators.FlatMapDriver.run(FlatMapDriver.java:107)____

        at
org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:503)____

        at
org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:368)____

        at
    org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)____

        at java.lang.Thread.run(Thread.java:748)____

    __ __

    For reference, the write code resembles this:____

    __ __

    FileIO.Write<?, GenericRecord> write =
    FileIO.<GenericRecord>write()____

                    .via(ParquetIO.sink(schema))____

                    .to(options.getOutputDir()). // will be something
    like: s3://<bucket>/<path>____

                    .withSuffix(".parquet");____

    __ __

    records.apply(String.format("Write(%s)", options.getOutputDir()),
    write);____

    __ __

    I have setup the PipelineOptions with all the relevant AWS options
    and the issue does not appear to be related to ParquetIO.sink()
    directly.  I am able to reliably reproduce the issue using JSON
    formatted records and TextIO.sink(), as well.____

    __ __

    Just trying some different knobs, I went ahead and set the
    following option:____

    __ __

            write = write.withNoSpilling();____

    __ __

    This actually seemed to fix the issue, only to have it reemerge as
    I scaled up the data set size.  The stack trace, while very
    similar, reads:____

    __ __

    java.lang.IllegalArgumentException: No filesystem found for scheme
    s3____

        at
org.apache.beam.sdk.io.FileSystems.getFileSystemInternal(FileSystems.java:456)____

        at
org.apache.beam.sdk.io.FileSystems.matchNewResource(FileSystems.java:526)____

        at
org.apache.beam.sdk.io.FileBasedSink$FileResultCoder.decode(FileBasedSink.java:1149)____

        at
org.apache.beam.sdk.io.FileBasedSink$FileResultCoder.decode(FileBasedSink.java:1105)____

        at org.apache.beam.sdk.coders.Coder.decode(Coder.java:159)____

        at org.apache.beam.sdk.coders.KvCoder.decode(KvCoder.java:82)____

        at org.apache.beam.sdk.coders.KvCoder.decode(KvCoder.java:36)____

        at
org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:543)____

        at
org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:534)____

        at
org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:480)____

        at
org.apache.beam.runners.flink.translation.types.CoderTypeSerializer.deserialize(CoderTypeSerializer.java:93)____

        at
org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)____

        at
org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:106)____

        at
org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:72)____

        at
org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:47)____

        at
org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:73)____

        at
org.apache.flink.runtime.operators.NoOpDriver.run(NoOpDriver.java:94)____

        at
org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:503)____

        at
org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:368)____

        at
    org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)____

        at java.lang.Thread.run(Thread.java:748) ____

    __ __

    I’ll be interested to hear some theories on the
    differences/similarities in the stacks.  And lastly, I tried
    adding the following deprecated option (with and without the
    withNoSpilling() option):____

    __ __

    write = write.withIgnoreWindowing();____

    __ __

    This seemed to fix the issue altogether but aside from having to
    rely on a deprecated feature, there is the bigger issue of why?____

    __ __

    In reading through some of the source, it seems a common pattern
    to have to manually register the pipeline options to seed the
    filesystem registry during the setup part of the operator
    lifecycle, e.g.:
https://github.com/apache/beam/blob/release-2.15.0/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java#L304-L313 <https://nam01.safelinks.protection.outlook.com/?url=https%3A%2F%2Fgithub.com%2Fapache%2Fbeam%2Fblob%2Frelease-2.15.0%2Frunners%2Fflink%2Fsrc%2Fmain%2Fjava%2Forg%2Fapache%2Fbeam%2Frunners%2Fflink%2Ftranslation%2Fwrappers%2Fstreaming%2FDoFnOperator.java%23L304-L313&data=02%7C01%7CPreston.B.Koprivica%40cerner.com%7C6dfc70aff6754e8a742d08d73e9ce04e%7Cfbc493a80d244454a815f4ca58e8c09d%7C0%7C0%7C637046715865253070&sdata=TWbNuwGSmMMscZIW0Iwo8MTZwxipBZMF4Zb0oyHd0do%3D&reserved=0>
    ____

    __ __

    Is it possible that I have hit upon a couple scenarios where that
    has not taken place?  Unfortunately, I’m not yet at a position to
    suggest a fix, but I’m guessing there’s some missing
    initialization code in one or more of the batch operators.  If
    this is indeed a legitimate issue, I’ll be happy to log an issue,
    but I’ll hold off until the community gets a chance to look at it.____

    __ __

    Thanks,____

      * Preston ____

    CONFIDENTIALITY NOTICE This message and any included attachments
    are from Cerner Corporation and are intended only for the
    addressee. The information contained in this message is
    confidential and may constitute inside or non-public information
    under international, federal, or state securities laws.
    Unauthorized forwarding, printing, copying, distribution, or use
    of such information is strictly prohibited and may be unlawful. If
    you are not the addressee, please promptly delete this message and
    notify the sender of the delivery error by e-mail or you may call
    Cerner's corporate offices in Kansas City, Missouri, U.S.A at (+1)
    (816)221-1024 <tel:(816)%20221-1024>.

Reply via email to