The reason this doesn't work is that your application works directly
against Hadoop.
The filesystems in the plugins directory are only loaded via specific
code-paths, specifically when the Flink FileSystem class is used.
Since you are using Hadoop directly you are side-stepping the plugin
mechanism.
So you have to make sure that Hadoop + Hadoop's S3 filesystem is
available to the client.
On 06/08/2021 08:02, tarun joshi wrote:
Hey All,
I am running flink in docker containers (image Tag
:flink:scala_2.11-java11) on EC2 and getting exception as I am trying
to submit a job through the local ./opt/flink/bin
*/org.apache.flink.client.program.ProgramInvocationException: The main
method caused an error: No FileSystem for scheme "s3"/*
at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:372)
at
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222)
at
org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)
at
org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:812)
at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:246)
at
org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1054)
at
org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1132)
at
org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:28)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1132)
Caused by: org.apache.hadoop.fs.UnsupportedFileSystemException: No
FileSystem for scheme "s3"
at
org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:3443)
at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:3466)
at org.apache.hadoop.fs.FileSystem.access$300(FileSystem.java:174)
at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:3574)
at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:3521)
at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:540)
at org.apache.hadoop.fs.Path.getFileSystem(Path.java:365)
at
org.apache.parquet.hadoop.util.HadoopInputFile.fromPath(HadoopInputFile.java:38)
at
org.apache.flink.examples.java.wordcount.WordCount.printParquetData(WordCount.java:142)
at
org.apache.flink.examples.java.wordcount.WordCount.main(WordCount.java:83)
at
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native
Method)
at
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(Unknown
Source)
at
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown
Source)
at java.base/java.lang.reflect.Method.invoke(Unknown Source)
at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355)
... 8 more
This is the way I am invoking Flink Built_IN S3 plugins for the
Jobmanager and TaskManager :
*/docker run \
--rm \
--volume /root/:/root/ \
--env JOB_MANAGER_RPC_ADDRESS="${JOB_MANAGER_RPC_ADDRESS}" \
--env
TASK_MANAGER_NUMBER_OF_TASK_SLOTS="${TASK_MANAGER_NUMBER_OF_TASK_SLOTS}" \
--env
ENABLE_BUILT_IN_PLUGINS="flink-s3-fs-hadoop-1.13.1.jar;flink-s3-fs-presto-1.13.1.jar"
\
--name=jobmanager \
--network flink-network \
--publish 8081:8081 \
flink:scala_2.11-java11 jobmanager &/*
*/
/*
*/docker run \
--rm \
--env JOB_MANAGER_RPC_ADDRESS="${JOB_MANAGER_RPC_ADDRESS}" \
--env
TASK_MANAGER_NUMBER_OF_TASK_SLOTS="${TASK_MANAGER_NUMBER_OF_TASK_SLOTS}" \
--env
ENABLE_BUILT_IN_PLUGINS="flink-s3-fs-hadoop-1.13.1.jar;flink-s3-fs-presto-1.13.1.jar"
\
--name=taskmanager_0 \
--network flink-network \
flink:scala_2.11-java11 taskmanager &
/*
*/
/*
This is how I am defining dependencies in my pom.xml (I am working
upon the Flink-Examples project from Flink Github repo).
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${project.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-scala_${scala.binary.version}</artifactId>
<version>${project.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_${scala.binary.version}</artifactId>
<version>${project.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.parquet</groupId>
<artifactId>parquet-avro</artifactId>
<version>1.12.0</version>
</dependency>
<dependency>
<groupId>org.apache.parquet</groupId>
<artifactId>parquet-column</artifactId>
<version>1.12.0</version>
</dependency>
<dependency>
<groupId>org.apache.parquet</groupId>
<artifactId>parquet-hadoop</artifactId>
<version>1.12.0</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>3.3.1</version>
</dependency>
</dependencies>
I am also able to see plugins being loaded for JobManager and
TaskManager :
*/
/*
*/Linking flink-s3-fs-hadoop-1.13.1.jar to plugin directory
Successfully enabled flink-s3-fs-hadoop-1.13.1.jar
Linking flink-s3-fs-presto-1.13.1.jar to plugin directory
Successfully enabled flink-s3-fs-presto-1.13.1.jar
/*
*/
/*
Let me if I am doing anything wrong.
/Thanks for the help! /
*/
/*