Hi!

We're working on a project where data is being written to S3 within a Flink 
application.
Running the integration tests locally / IntelliJ (using 
MiniClusterWithClientResource) all the dependencies are correctly resolved and 
the program executes as expected. However, when fat JAR is submitted to a Flink 
setup running on docker, we're getting the following exception:

---------------------------------
java.lang.NoClassDefFoundError: org/apache/hadoop/fs/Path
---------------------------------

Which refers to the usage of that class in a RichSinkFunction while building an 
AvroParquetWriter

---------------------------------
import org.apache.hadoop.fs.Path
import org.apache.parquet.avro.AvroParquetWriter
import org.apache.parquet.hadoop.ParquetFileWriter
// ...
Try {
val writer = AvroParquetWriter
.builder[GenericRecord](new Path(finalFilePath))
.withSchema(new Schema.Parser().parse(schema))
.withDataModel(GenericData.get)
.withWriteMode(ParquetFileWriter.Mode.OVERWRITE)
.build()

elements.foreach(element => writer.write(element))
writer.close()
}
// ...
---------------------------------

Since we have "flink-s3-fs-hadoop" at the plugins folder and therefore being 
dynamically loaded upon task/job manager(s) startup (also, we are keeping 
Flink's default inverted class loading strategy), shouldn't Hadoop dependencies 
be loaded by parent-first? (based on classloader.parent-first-patterns.default)

We also tried to put "flink-shaded-hadoop-2-uber-2.8.3-10.0.jar" on Flink's 
/lib folder, but when doing that we got this error instead:

java.lang.RuntimeException: java.lang.ClassNotFoundException: Class 
org.apache.hadoop.fs.s3a.S3AFileSystem not found

The only way we are being able to make the application work as expected is to 
include the dependency "hadoop-aws" with compile scope, but we get a fat JAR 
and transitive dependencies on Hadoop libraries that we would like to avoid.

What would be the most appropriate way to take advantage of cluster's 
"flink-s3-fs-hadoop" and avoid to deliver any Hadoop related library on our 
application JAR?

The dependencies we're using in the build.sbt file:
---------------------------------
lazy val dependencies =
new {
val flinkVersion = "1.10.0"
val parquetAvroVersion = "1.10.1"
val hadoopVersion = "3.2.1"
val circeVersion = "0.12.3"
val rogachVersion = "3.3.1"
val loggingVersion = "3.7.2"
val scalatestVersion = "3.0.5"
val mockitoVersion = "1.10.0"
val kafkaVersion = "2.2.0"
val scalajVersion = "2.4.2"
val snakeYamlVersion = "1.25"
val slf4jVersion = "1.7.30"
val beanUtilsVersion = "1.9.4"
val shadedHadoopVersion = "2.8.3-10.0"

// Core libraries provided at runtime
val flink = "org.apache.flink" %% "flink-scala" % flinkVersion % "provided"
val flinkStreaming = "org.apache.flink" %% "flink-streaming-scala" % 
flinkVersion % "provided"
val flinks3Hadoop = "org.apache.flink" % "flink-s3-fs-hadoop" % flinkVersion % 
"provided"

// Application specific dependencies.
val flinkConnectorKafka = "org.apache.flink" %% "flink-connector-kafka" % 
flinkVersion
val flinkStateBackendRocksDb = "org.apache.flink" %% 
"flink-statebackend-rocksdb" % flinkVersion
val flinkParquet = "org.apache.flink" %% "flink-parquet" % flinkVersion
val flinkDropwizard = "org.apache.flink" % "flink-metrics-dropwizard" % 
flinkVersion
val parquetAvro = "org.apache.parquet" % "parquet-avro" % parquetAvroVersion
val circeCore = "io.circe" %% "circe-core" % circeVersion
val circeParser = "io.circe" %% "circe-parser" % circeVersion
val circeGeneric = "io.circe" %% "circe-generic" % circeVersion
val scallop = "org.rogach" %% "scallop" % rogachVersion
val logging = "com.typesafe.scala-logging" %% "scala-logging" % loggingVersion
val snakeYaml = "org.yaml" % "snakeyaml" % snakeYamlVersion
val slf4j = "org.slf4j" % "slf4j-log4j12" % slf4jVersion
val beanUtils = "commons-beanutils" % "commons-beanutils" % beanUtilsVersion

// Test libraries
val scalatest = "org.scalatest" %% "scalatest" % scalatestVersion % "test"
val mockito = "org.mockito" %% "mockito-scala" % mockitoVersion % "test"
val flinkTestUtils = "org.apache.flink" %% "flink-test-utils" % flinkVersion % 
"test"
val kafkaStreams = "org.apache.kafka" % "kafka-streams" % kafkaVersion % "test"
val kafkaClients = "org.apache.kafka" % "kafka-clients" % kafkaVersion % "test"
val kafka = "org.apache.kafka" %% "kafka" % kafkaVersion % "test"
val hadoopClient = "org.apache.hadoop" % "hadoop-client" % hadoopVersion % 
"test"

// Test classifiers only
val flinkRuntimeTest = "org.apache.flink" %% "flink-runtime" % flinkVersion % 
"test" classifier "tests"
val kafkaTest = "org.apache.kafka" %% "kafka" % kafkaVersion % "test" 
classifier "test"
val kafkaStreamsTest = "org.apache.kafka" % "kafka-streams" % kafkaVersion % 
"test" classifier "test"
val kafkaClientsTest = "org.apache.kafka" % "kafka-clients" % kafkaVersion % 
"test" classifier "test"
}
---------------------------------



This is the Dockerfile:
---------------------------------
FROM flink:1.10.0-scala_2.12
RUN cp /opt/flink/opt/flink-metrics-prometheus-1.10.0.jar /opt/flink/lib
RUN mkdir /opt/flink/plugins/flink-s3-fs-presto 
/opt/flink/plugins/flink-s3-fs-hadoop
RUN cp /opt/flink/opt/flink-s3-fs-presto-1.10.0.jar 
/opt/flink/plugins/flink-s3-fs-presto/
RUN cp /opt/flink/opt/flink-s3-fs-hadoop-1.10.0.jar 
/opt/flink/plugins/flink-s3-fs-hadoop/
RUN chown -R flink:flink /opt/flink/plugins/flink-s3-fs-presto/
RUN chown -R flink:flink /opt/flink/plugins/flink-s3-fs-hadoop/
---------------------------------

--
Best regards,
Ricardo Cardante.

Reply via email to