Thank you all for the replies!

I did as @Maminspapin suggested and indeed the previous error
disappeared, but now the exception is
```
java.io.IOException: Cannot instantiate file system for URI:
hdfs://node-1:9000/flink
//...
Caused by: java.lang.NumberFormatException: For input string: "30s"
// this is thrown by the flink-shaded-hadoop library
```
I thought that it relates to the windowing I do, which has a slide
interval of 30 seconds, but removing it displays the same error.

I also added the dependency to the maven pom, but without effect.

Since I use Hadoop 3.2.1, I also tried
https://mvnrepository.com/artifact/org.apache.flink/flink-shaded-hadoop-3-uber
but with this I can't even start a cluster (`TaskManager initialization
failed`).



@Robert, Flink includes roughly 100 hdfs jars.
`hadoop-hdfs-client-3.2.1.jar` is one of them and is supposed to contain
`DistributedFileSystem.class`, which I checked running `jar tvf
hadoop-3.2.1/share/hadoop/hdfs/hadoop-hdfs-client-3.2.1.jar | grep
DistributedFileSystem`. How can I verify that the class is really
accessible?

Cheers,
Matthias

On 3/26/21 10:20 AM, Robert Metzger wrote:
> Hey Matthias,
>
> Maybe the classpath contains hadoop libraries, but not the HDFS
> libraries? The "DistributedFileSystem" class needs to be accessible to
> the classloader. Can you check if that class is available?
>
> Best,
> Robert
>
> On Thu, Mar 25, 2021 at 11:10 AM Matthias Seiler
> <matthias.sei...@campus.tu-berlin.de
> <mailto:matthias.sei...@campus.tu-berlin.de>> wrote:
>
>     Hello everybody,
>
>     I set up a a Flink (1.12.1) and Hadoop (3.2.1) cluster on two
>     machines.
>     The job should store the checkpoints on HDFS like so:
>     ```java
>     StreamExecutionEnvironment env =
>     StreamExecutionEnvironment.getExecutionEnvironment();
>     env.enableCheckpointing(15000, CheckpointingMode.EXACTLY_ONCE);
>     env.setStateBackend(new FsStateBackend("hdfs://node-1:9000/flink"));
>     ```
>
>     Unfortunately, the JobManager throws
>     ```
>     org.apache.flink.core.fs.UnsupportedFileSystemSchemeException:
>     Could not
>     find a file system implementation for scheme 'hdfs'. The scheme is not
>     directly supported by Flink and no Hadoop file system to support this
>     scheme could be loaded. For a full list of supported file systems,
>     please see
>     https://ci.apache.org/projects/flink/flink-docs-stable/ops/filesystems/
>     <https://ci.apache.org/projects/flink/flink-docs-stable/ops/filesystems/>.
>     // ...
>     Caused by:
>     org.apache.flink.core.fs.UnsupportedFileSystemSchemeException:
>     Hadoop is
>     not in the classpath/dependencies.
>     ```
>     and I don't understand why.
>
>     `echo $HADOOP_CLASSPATH` returns the path of Hadoop libraries with
>     wildcards. Flink's JobManger prints the classpath which includes
>     specific packages from these Hadoop libraries. Besides that, Flink
>     creates the state directories on HDFS, but no content.
>
>     Thank you for any advice,
>     Matthias
>

Reply via email to