Hi Rohan, setting this up is currently not really straightforward :( I think we need to improve on this.
For supporting the s3 filesystem, you did it right by placing s3 jars into the plugins directory. Please note, that these are loaded in a separate class loader and also contain a shaded version of hadoop. Parquet unfortunately needs some hadoop classes as well, but it can not re-use the ones used by filesystems, because these are not visible to the parent classloader. Shaded hadoop fs is not really meant to be visible for the parent loader (you've seen this by running into serviceloader related issues), so I'd discourage you going down this path. I think adding a hadoop-common (maybe you may even need hadoop-mapreduce-client-core instead) dependency is a way to go here. You just may have to setup some exclusions, so you don't get conflicts from some transitive dependencies (commons-cli in your case, but there might be others). Best, D. On Wed, Dec 29, 2021 at 10:53 AM Rohan Kumar <rohankumar...@gmail.com> wrote: > I am running flink 1.14.2 > > Thanks > > On Wed, 29 Dec 2021 at 13:18, Rohan Kumar <rohankumar...@gmail.com> wrote: > >> Hello, >> >> I tried to read parquet data in S3 using the filesystem connector but got >> the below error. The jobmanger is not starting. >> I tried the standalone-job in docker. >> I have already included flink-s3-fs-hadoop and flink-s3-fs-presto as >> plugins and they are working fine for checkpointing and Kubernetes HA. The >> issue is when I am reading files from S3 using the table API connector. >> >> Caused by: java.lang.ClassNotFoundException: >> org.apache.hadoop.conf.Configuration >> at jdk.internal.loader.BuiltinClassLoader.loadClass(Unknown Source) ~[?:?] >> at jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(Unknown >> Source) ~[?:?] >> at java.lang.ClassLoader.loadClass(Unknown Source) ~[?:?] >> at >> org.apache.flink.formats.parquet.ParquetFileFormatFactory.getParquetConfiguration(ParquetFileFormatFactory.java:116) >> ~[flink-aws-assembly-0.1-SNAPSHOT-deps.jar:0.1-SNAPSHOT] >> at >> org.apache.flink.formats.parquet.ParquetFileFormatFactory.access$000(ParquetFileFormatFactory.java:51) >> ~[flink-aws-assembly-0.1-SNAPSHOT-deps.jar:0.1-SNAPSHOT] >> at >> org.apache.flink.formats.parquet.ParquetFileFormatFactory$1.createRuntimeDecoder(ParquetFileFormatFactory.java:79) >> ~[flink-aws-assembly-0.1-SNAPSHOT-deps.jar:0.1-SNAPSHOT] >> at >> org.apache.flink.formats.parquet.ParquetFileFormatFactory$1.createRuntimeDecoder(ParquetFileFormatFactory.java:67) >> ~[flink-aws-assembly-0.1-SNAPSHOT-deps.jar:0.1-SNAPSHOT] >> at >> org.apache.flink.table.filesystem.FileSystemTableSource.getScanRuntimeProvider(FileSystemTableSource.java:118) >> ~[flink-table_2.12-1.14.2.jar:1.14.2] >> at >> org.apache.flink.table.planner.connectors.DynamicSourceUtils.validateScanSource(DynamicSourceUtils.java:452) >> ~[flink-table_2.12-1.14.2.jar:1.14.2] >> at >> org.apache.flink.table.planner.connectors.DynamicSourceUtils.prepareDynamicSource(DynamicSourceUtils.java:160) >> ~[flink-table_2.12-1.14.2.jar:1.14.2] >> at >> org.apache.flink.table.planner.connectors.DynamicSourceUtils.convertSourceToRel(DynamicSourceUtils.java:124) >> ~[flink-table_2.12-1.14.2.jar:1.14.2] >> at >> org.apache.flink.table.planner.plan.schema.CatalogSourceTable.toRel(CatalogSourceTable.java:85) >> ~[flink-table_2.12-1.14.2.jar:1.14.2] >> at >> org.apache.calcite.rel.core.RelFactories$TableScanFactoryImpl.createScan(RelFactories.java:495) >> ~[flink-table_2.12-1.14.2.jar:1.14.2] >> at org.apache.calcite.tools.RelBuilder.scan(RelBuilder.java:1099) >> ~[flink-table_2.12-1.14.2.jar:1.14.2] >> at org.apache.calcite.tools.RelBuilder.scan(RelBuilder.java:1123) >> ~[flink-table_2.12-1.14.2.jar:1.14.2] >> at >> org.apache.flink.table.planner.plan.QueryOperationConverter$SingleRelVisitor.visit(QueryOperationConverter.java:351) >> ~[flink-table_2.12-1.14.2.jar:1.14.2] >> at >> org.apache.flink.table.planner.plan.QueryOperationConverter$SingleRelVisitor.visit(QueryOperationConverter.java:154) >> ~[flink-table_2.12-1.14.2.jar:1.14.2] >> at >> org.apache.flink.table.operations.CatalogQueryOperation.accept(CatalogQueryOperation.java:68) >> ~[flink-table_2.12-1.14.2.jar:1.14.2] >> at >> org.apache.flink.table.planner.plan.QueryOperationConverter.defaultMethod(QueryOperationConverter.java:151) >> ~[flink-table_2.12-1.14.2.jar:1.14.2] >> at >> org.apache.flink.table.planner.plan.QueryOperationConverter.defaultMethod(QueryOperationConverter.java:133) >> ~[flink-table_2.12-1.14.2.jar:1.14.2] >> at >> org.apache.flink.table.operations.utils.QueryOperationDefaultVisitor.visit(QueryOperationDefaultVisitor.java:92) >> ~[flink-table_2.12-1.14.2.jar:1.14.2] >> at >> org.apache.flink.table.operations.CatalogQueryOperation.accept(CatalogQueryOperation.java:68) >> ~[flink-table_2.12-1.14.2.jar:1.14.2] >> at >> org.apache.flink.table.planner.calcite.FlinkRelBuilder.queryOperation(FlinkRelBuilder.scala:184) >> ~[flink-table_2.12-1.14.2.jar:1.14.2] >> at >> org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:258) >> ~[flink-table_2.12-1.14.2.jar:1.14.2] >> at >> org.apache.flink.table.planner.delegation.PlannerBase.$anonfun$translate$1(PlannerBase.scala:182) >> ~[flink-table_2.12-1.14.2.jar:1.14.2] >> at >> scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:285) >> ~[flink-aws-assembly-0.1-SNAPSHOT-deps.jar:0.1-SNAPSHOT] >> at scala.collection.Iterator.foreach(Iterator.scala:943) >> ~[flink-aws-assembly-0.1-SNAPSHOT-deps.jar:0.1-SNAPSHOT] >> at scala.collection.Iterator.foreach$(Iterator.scala:943) >> ~[flink-aws-assembly-0.1-SNAPSHOT-deps.jar:0.1-SNAPSHOT] >> at scala.collection.AbstractIterator.foreach(Iterator.scala:1431) >> ~[flink-aws-assembly-0.1-SNAPSHOT-deps.jar:0.1-SNAPSHOT] >> at scala.collection.IterableLike.foreach(IterableLike.scala:74) >> ~[flink-aws-assembly-0.1-SNAPSHOT-deps.jar:0.1-SNAPSHOT] >> at scala.collection.IterableLike.foreach$(IterableLike.scala:73) >> ~[flink-aws-assembly-0.1-SNAPSHOT-deps.jar:0.1-SNAPSHOT] >> at scala.collection.AbstractIterable.foreach(Iterable.scala:56) >> ~[flink-aws-assembly-0.1-SNAPSHOT-deps.jar:0.1-SNAPSHOT] >> at scala.collection.TraversableLike.map(TraversableLike.scala:285) >> ~[flink-aws-assembly-0.1-SNAPSHOT-deps.jar:0.1-SNAPSHOT] >> at scala.collection.TraversableLike.map$(TraversableLike.scala:278) >> ~[flink-aws-assembly-0.1-SNAPSHOT-deps.jar:0.1-SNAPSHOT] >> at scala.collection.AbstractTraversable.map(Traversable.scala:108) >> ~[flink-aws-assembly-0.1-SNAPSHOT-deps.jar:0.1-SNAPSHOT] >> at >> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:182) >> ~[flink-table_2.12-1.14.2.jar:1.14.2] >> at >> org.apache.flink.table.api.bridge.scala.internal.StreamTableEnvironmentImpl.toStreamInternal(StreamTableEnvironmentImpl.scala:297) >> ~[flink-table_2.12-1.14.2.jar:1.14.2] >> at >> org.apache.flink.table.api.bridge.scala.internal.StreamTableEnvironmentImpl.toStreamInternal(StreamTableEnvironmentImpl.scala:288) >> ~[flink-table_2.12-1.14.2.jar:1.14.2] >> at >> org.apache.flink.table.api.bridge.scala.internal.StreamTableEnvironmentImpl.toDataStream(StreamTableEnvironmentImpl.scala:213) >> ~[flink-table_2.12-1.14.2.jar:1.14.2] >> at >> org.apache.flink.table.api.bridge.scala.internal.StreamTableEnvironmentImpl.toDataStream(StreamTableEnvironmentImpl.scala:190) >> ~[flink-table_2.12-1.14.2.jar:1.14.2] >> at >> org.apache.flink.table.api.bridge.scala.TableConversions.toDataStream(TableConversions.scala:52) >> ~[flink-table_2.12-1.14.2.jar:1.14.2] >> at org.example.Job$.main(Job.scala:53) ~[?:?] >> at org.example.Job.main(Job.scala) ~[?:?] >> at jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >> ~[?:?] >> at jdk.internal.reflect.NativeMethodAccessorImpl.invoke(Unknown Source) >> ~[?:?] >> at jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown >> Source) ~[?:?] >> at java.lang.reflect.Method.invoke(Unknown Source) ~[?:?] >> at >> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355) >> ~[flink-dist_2.12-1.14.2.jar:1.14.2] >> at >> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222) >> ~[flink-dist_2.12-1.14.2.jar:1.14.2] >> at >> org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114) >> ~[flink-dist_2.12-1.14.2.jar:1.14.2] >> at >> org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:253) >> ~[flink-dist_2.12-1.14.2.jar:1.14.2] >> ... 13 more >> >> >> >> Here's my code: >> >> val env: StreamExecutionEnvironment = >> StreamExecutionEnvironment.getExecutionEnvironment >> val tEnv: StreamTableEnvironment = StreamTableEnvironment.create(env) >> >> env.setRuntimeMode(RuntimeExecutionMode.BATCH) >> >> val source = tEnv.from(TableDescriptor >> .forConnector("filesystem") >> .option("path", "s3a://source/data") >> .option("format", "parquet") >> .schema(Schema.newBuilder() >> .column("InvoiceNo", DataTypes.BIGINT()) >> .column("InvoiceDate", DataTypes.BIGINT()) >> .column("Quantity", DataTypes.BIGINT()) >> .column("UnitPrice", DataTypes.BIGINT()) >> .column("Description", DataTypes.STRING()) >> .column("CustomerID", DataTypes.STRING()) >> .column("Country", DataTypes.STRING()) >> .column("currentTs", DataTypes.TIMESTAMP(3)) >> .build() >> ) >> .build()) >> >> source.toDataStream.print() >> >> env.execute() >> >> Then I added hadoop-common dependency and got the below error and >> jobmanger didn't start. >> >> Exception in thread "main" java.lang.NoSuchMethodError: >> 'org.apache.commons.cli.Option$Builder >> org.apache.commons.cli.Option.builder(java.lang.String)' >> at >> org.apache.flink.container.entrypoint.StandaloneApplicationClusterConfigurationParserFactory.<clinit>(StandaloneApplicationClusterConfigurationParserFactory.java:49) >> at >> org.apache.flink.container.entrypoint.StandaloneApplicationClusterEntryPoint.main(StandaloneApplicationClusterEntryPoint.java:57) >> >> >> >> Then I built the flink-fs-hadoop-shaded-1.14-SNAPSHOT.jar from flink >> source and included it in the lib directory. The jobmanager started. But, >> got the below error and job kept restarting. >> >> Caused by: java.lang.ClassNotFoundException: Class >> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.security.JniBasedUnixGroupsMappingWithFallback >> not found >> at >> org.apache.hadoop.conf.Configuration.getClassByName(Configuration.java:2310) >> ~[flink-fs-hadoop-shaded-1.14-SNAPSHOT.jar:1.14-SNAPSHOT] >> at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2398) >> ~[flink-fs-hadoop-shaded-1.14-SNAPSHOT.jar:1.14-SNAPSHOT] >> at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2420) >> ~[flink-fs-hadoop-shaded-1.14-SNAPSHOT.jar:1.14-SNAPSHOT] >> at org.apache.hadoop.security.Groups.<init>(Groups.java:107) >> ~[flink-fs-hadoop-shaded-1.14-SNAPSHOT.jar:1.14-SNAPSHOT] >> at org.apache.hadoop.security.Groups.<init>(Groups.java:102) >> ~[flink-fs-hadoop-shaded-1.14-SNAPSHOT.jar:1.14-SNAPSHOT] >> at >> org.apache.hadoop.security.Groups.getUserToGroupsMappingService(Groups.java:451) >> ~[flink-fs-hadoop-shaded-1.14-SNAPSHOT.jar:1.14-SNAPSHOT] >> at >> org.apache.hadoop.security.UserGroupInformation.initialize(UserGroupInformation.java:337) >> ~[flink-fs-hadoop-shaded-1.14-SNAPSHOT.jar:1.14-SNAPSHOT] >> at >> org.apache.hadoop.security.UserGroupInformation.ensureInitialized(UserGroupInformation.java:304) >> ~[flink-fs-hadoop-shaded-1.14-SNAPSHOT.jar:1.14-SNAPSHOT] >> at >> org.apache.hadoop.security.UserGroupInformation.doSubjectLogin(UserGroupInformation.java:1860) >> ~[flink-fs-hadoop-shaded-1.14-SNAPSHOT.jar:1.14-SNAPSHOT] >> at >> org.apache.hadoop.security.UserGroupInformation.createLoginUser(UserGroupInformation.java:718) >> ~[flink-fs-hadoop-shaded-1.14-SNAPSHOT.jar:1.14-SNAPSHOT] >> at >> org.apache.hadoop.security.UserGroupInformation.getLoginUser(UserGroupInformation.java:668) >> ~[flink-fs-hadoop-shaded-1.14-SNAPSHOT.jar:1.14-SNAPSHOT] >> at >> org.apache.hadoop.security.UserGroupInformation.getCurrentUser(UserGroupInformation.java:579) >> ~[flink-fs-hadoop-shaded-1.14-SNAPSHOT.jar:1.14-SNAPSHOT] >> at org.apache.hadoop.fs.FileSystem$Cache$Key.<init>(FileSystem.java:3564) >> ~[flink-fs-hadoop-shaded-1.14-SNAPSHOT.jar:1.14-SNAPSHOT] >> at org.apache.hadoop.fs.FileSystem$Cache$Key.<init>(FileSystem.java:3554) >> ~[flink-fs-hadoop-shaded-1.14-SNAPSHOT.jar:1.14-SNAPSHOT] >> at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:3391) >> ~[flink-fs-hadoop-shaded-1.14-SNAPSHOT.jar:1.14-SNAPSHOT] >> at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:485) >> ~[flink-fs-hadoop-shaded-1.14-SNAPSHOT.jar:1.14-SNAPSHOT] >> at org.apache.hadoop.fs.Path.getFileSystem(Path.java:365) >> ~[flink-fs-hadoop-shaded-1.14-SNAPSHOT.jar:1.14-SNAPSHOT] >> at >> org.apache.parquet.hadoop.util.HadoopInputFile.fromPath(HadoopInputFile.java:38) >> ~[flink-aws-assembly-0.1-SNAPSHOT-deps.jar:0.1-SNAPSHOT] >> at >> org.apache.parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:456) >> ~[flink-aws-assembly-0.1-SNAPSHOT-deps.jar:0.1-SNAPSHOT] >> at >> org.apache.flink.formats.parquet.ParquetVectorizedInputFormat.createReader(ParquetVectorizedInputFormat.java:112) >> ~[flink-aws-assembly-0.1-SNAPSHOT-deps.jar:0.1-SNAPSHOT] >> at >> org.apache.flink.formats.parquet.ParquetVectorizedInputFormat.createReader(ParquetVectorizedInputFormat.java:73) >> ~[flink-aws-assembly-0.1-SNAPSHOT-deps.jar:0.1-SNAPSHOT] >> at >> org.apache.flink.connector.file.src.impl.FileSourceSplitReader.checkSplitOrStartNext(FileSourceSplitReader.java:112) >> ~[flink-table_2.12-1.14.2.jar:1.14.2] >> at >> org.apache.flink.connector.file.src.impl.FileSourceSplitReader.fetch(FileSourceSplitReader.java:65) >> ~[flink-table_2.12-1.14.2.jar:1.14.2] >> at >> org.apache.flink.connector.base.source.reader.fetcher.FetchTask.run(FetchTask.java:58) >> ~[flink-table_2.12-1.14.2.jar:1.14.2] >> at >> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:142) >> ~[flink-table_2.12-1.14.2.jar:1.14.2] >> at >> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:105) >> ~[flink-table_2.12-1.14.2.jar:1.14.2] >> at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source) >> ~[?:?] >> at java.util.concurrent.FutureTask.run(Unknown Source) ~[?:?] >> at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) >> ~[?:?] >> at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) >> ~[?:?] >> ... 1 more >> >