Hi! We're working on a project where data is being written to S3 within a Flink application. Running the integration tests locally / IntelliJ (using MiniClusterWithClientResource) all the dependencies are correctly resolved and the program executes as expected. However, when fat JAR is submitted to a Flink setup running on docker, we're getting the following exception:
--------------------------------- java.lang.NoClassDefFoundError: org/apache/hadoop/fs/Path --------------------------------- Which refers to the usage of that class in a RichSinkFunction while building an AvroParquetWriter --------------------------------- import org.apache.hadoop.fs.Path import org.apache.parquet.avro.AvroParquetWriter import org.apache.parquet.hadoop.ParquetFileWriter // ... Try { val writer = AvroParquetWriter .builder[GenericRecord](new Path(finalFilePath)) .withSchema(new Schema.Parser().parse(schema)) .withDataModel(GenericData.get) .withWriteMode(ParquetFileWriter.Mode.OVERWRITE) .build() elements.foreach(element => writer.write(element)) writer.close() } // ... --------------------------------- Since we have "flink-s3-fs-hadoop" at the plugins folder and therefore being dynamically loaded upon task/job manager(s) startup (also, we are keeping Flink's default inverted class loading strategy), shouldn't Hadoop dependencies be loaded by parent-first? (based on classloader.parent-first-patterns.default) We also tried to put "flink-shaded-hadoop-2-uber-2.8.3-10.0.jar" on Flink's /lib folder, but when doing that we got this error instead: java.lang.RuntimeException: java.lang.ClassNotFoundException: Class org.apache.hadoop.fs.s3a.S3AFileSystem not found The only way we are being able to make the application work as expected is to include the dependency "hadoop-aws" with compile scope, but we get a fat JAR and transitive dependencies on Hadoop libraries that we would like to avoid. What would be the most appropriate way to take advantage of cluster's "flink-s3-fs-hadoop" and avoid to deliver any Hadoop related library on our application JAR? The dependencies we're using in the build.sbt file: --------------------------------- lazy val dependencies = new { val flinkVersion = "1.10.0" val parquetAvroVersion = "1.10.1" val hadoopVersion = "3.2.1" val circeVersion = "0.12.3" val rogachVersion = "3.3.1" val loggingVersion = "3.7.2" val scalatestVersion = "3.0.5" val mockitoVersion = "1.10.0" val kafkaVersion = "2.2.0" val scalajVersion = "2.4.2" val snakeYamlVersion = "1.25" val slf4jVersion = "1.7.30" val beanUtilsVersion = "1.9.4" val shadedHadoopVersion = "2.8.3-10.0" // Core libraries provided at runtime val flink = "org.apache.flink" %% "flink-scala" % flinkVersion % "provided" val flinkStreaming = "org.apache.flink" %% "flink-streaming-scala" % flinkVersion % "provided" val flinks3Hadoop = "org.apache.flink" % "flink-s3-fs-hadoop" % flinkVersion % "provided" // Application specific dependencies. val flinkConnectorKafka = "org.apache.flink" %% "flink-connector-kafka" % flinkVersion val flinkStateBackendRocksDb = "org.apache.flink" %% "flink-statebackend-rocksdb" % flinkVersion val flinkParquet = "org.apache.flink" %% "flink-parquet" % flinkVersion val flinkDropwizard = "org.apache.flink" % "flink-metrics-dropwizard" % flinkVersion val parquetAvro = "org.apache.parquet" % "parquet-avro" % parquetAvroVersion val circeCore = "io.circe" %% "circe-core" % circeVersion val circeParser = "io.circe" %% "circe-parser" % circeVersion val circeGeneric = "io.circe" %% "circe-generic" % circeVersion val scallop = "org.rogach" %% "scallop" % rogachVersion val logging = "com.typesafe.scala-logging" %% "scala-logging" % loggingVersion val snakeYaml = "org.yaml" % "snakeyaml" % snakeYamlVersion val slf4j = "org.slf4j" % "slf4j-log4j12" % slf4jVersion val beanUtils = "commons-beanutils" % "commons-beanutils" % beanUtilsVersion // Test libraries val scalatest = "org.scalatest" %% "scalatest" % scalatestVersion % "test" val mockito = "org.mockito" %% "mockito-scala" % mockitoVersion % "test" val flinkTestUtils = "org.apache.flink" %% "flink-test-utils" % flinkVersion % "test" val kafkaStreams = "org.apache.kafka" % "kafka-streams" % kafkaVersion % "test" val kafkaClients = "org.apache.kafka" % "kafka-clients" % kafkaVersion % "test" val kafka = "org.apache.kafka" %% "kafka" % kafkaVersion % "test" val hadoopClient = "org.apache.hadoop" % "hadoop-client" % hadoopVersion % "test" // Test classifiers only val flinkRuntimeTest = "org.apache.flink" %% "flink-runtime" % flinkVersion % "test" classifier "tests" val kafkaTest = "org.apache.kafka" %% "kafka" % kafkaVersion % "test" classifier "test" val kafkaStreamsTest = "org.apache.kafka" % "kafka-streams" % kafkaVersion % "test" classifier "test" val kafkaClientsTest = "org.apache.kafka" % "kafka-clients" % kafkaVersion % "test" classifier "test" } --------------------------------- This is the Dockerfile: --------------------------------- FROM flink:1.10.0-scala_2.12 RUN cp /opt/flink/opt/flink-metrics-prometheus-1.10.0.jar /opt/flink/lib RUN mkdir /opt/flink/plugins/flink-s3-fs-presto /opt/flink/plugins/flink-s3-fs-hadoop RUN cp /opt/flink/opt/flink-s3-fs-presto-1.10.0.jar /opt/flink/plugins/flink-s3-fs-presto/ RUN cp /opt/flink/opt/flink-s3-fs-hadoop-1.10.0.jar /opt/flink/plugins/flink-s3-fs-hadoop/ RUN chown -R flink:flink /opt/flink/plugins/flink-s3-fs-presto/ RUN chown -R flink:flink /opt/flink/plugins/flink-s3-fs-hadoop/ --------------------------------- -- Best regards, Ricardo Cardante.