Hi Arnaud, I suspect the "HdfsTools" are something internal from your company? Are they doing any kerberos-related operations?
Is the local cluster mode also reading files from the secured HDFS cluster? Flink is taking care of sending the authentication tokens from the client to the jobManager and to the TaskManagers. For HDFS Flink should also use these user settings. I don't know whether the HCatalog code / Hadoop compatbililty code is also doing some kerberos operations which are interfering with our efforts. >From the logs, you can see: > Secure Hadoop environment setup detected. Running in secure context. > 15:04:18,005 INFO > org.apache.hadoop.security.UserGroupInformation - Login > successful for user alinz using keytab file /usr/users/alinz/alinz.keytab Is the user "alinz" authorized to access the files in HDFS? I have to admit that I didn't see this issue before. If possible, can you privately send the the full log of the application, using "yarn logs -applicationId <ID>" ? On Thu, Aug 20, 2015 at 4:08 PM, LINZ, Arnaud <al...@bouyguestelecom.fr> wrote: > Hello, > > > > My application handles as input and output some HDFS files in the jobs and > in the driver application. > > It works in local cluster mode, but when I’m trying to submit it to a yarn > client, when I try to use a HadoopInputFormat (that comes from a HCatalog > request), I have the following error: *Delegation Token can be issued > only with kerberos or web authentication *(full stack trace below). > > > > Code which I believe causes the error (It’s not clear in the stack trace, > as the nearest point in my code is “execEnv.execute()”) : > > > > *public* *synchronized* DataSet<T> readTable(String dbName, String > tableName, String filter, ExecutionEnvironment cluster, > > *final* HiveBeanFactory<T> factory) *throws* IOException { > > > > // login kerberos if needed (via > UserGroupInformation.loginUserFromKeytab(getKerberosPrincipal(), > getKerberosKeytab());) > > HdfsTools.*getFileSystem*(); > > > > // Create M/R job and configure it > > *final* Job job = Job.*getInstance*(); > > job.setJobName("Flink source for Hive Table " + dbName + "." + > tableName); > > > > // Crée la source > > @SuppressWarnings({ "unchecked", "rawtypes" }) > > *final* HadoopInputFormat<NullWritable, DefaultHCatRecord> > inputFormat = *new* HadoopInputFormat<NullWritable, // CHECKSTYLE:OFF > > DefaultHCatRecord>(// CHECKSTYLE:ON > > (InputFormat) HCatInputFormat.*setInput*(job, dbName, > tableName, filter), // > > NullWritable.*class*, // > > DefaultHCatRecord.*class*, // > > job); > > > > *final* HCatSchema inputSchema = HCatInputFormat.*getTableSchema*( > job.getConfiguration()); > > @SuppressWarnings("serial") > > *final* DataSet<T> dataSet = cluster > > // Read the table > > .createInput(inputFormat) > > // map bean (key is useless) > > .flatMap(*new* FlatMapFunction<Tuple2<NullWritable, > DefaultHCatRecord>, T>() { > > @Override > > *public* *void* flatMap(Tuple2<NullWritable, > DefaultHCatRecord> value, Collector<T> out) *throws* Exception { // NOPMD > > *final* T record = factory.fromHive(value.f1, > inputSchema); > > *if* (record != *null*) { > > out.collect(record); > > } > > } > > }).returns(beanClass); > > > > *return* dataSet; > > } > > > > Maybe I need to explicitely get a token on each node in the > initialization of HadoopInputFormat() (overriding configure()) ? That > would be difficult since the keyfile is on the driver’s local drive… > > > > StackTrace : > > > > Found YARN properties file /usr/lib/flink/bin/../conf/.yarn-properties > > Using JobManager address from YARN properties > bt1svlmw.bpa.bouyguestelecom.fr/172.19.115.52:50494 > > Secure Hadoop environment setup detected. Running in secure context. > > 2015:08:20 15:04:17 (main) - INFO - > com.bouygtel.kuberasdk.main.Application.mainMethod - Dᅵbut traitement > > 15:04:18,005 INFO > org.apache.hadoop.security.UserGroupInformation - Login > successful for user alinz using keytab file /usr/users/alinz/alinz.keytab > > 15:04:20,139 WARN > org.apache.hadoop.hdfs.shortcircuit.DomainSocketFactory - The > short-circuit local reads feature cannot be used because libhadoop cannot > be loaded. > > Error : Execution Kubera KO : java.lang.IllegalStateException: Error while > executing Flink application > > > com.bouygtel.kuberasdk.main.ApplicationBatch.execCluster(ApplicationBatch.java:84) > > > com.bouygtel.kubera.main.segment.ApplicationGeoSegment.batchExec(ApplicationGeoSegment.java:68) > > com.bouygtel.kuberasdk.main.ApplicationBatch.exec(ApplicationBatch.java:51) > > com.bouygtel.kuberasdk.main.Application.mainMethod(Application.java:81) > > com.bouygtel.kubera.main.segment.MainGeoSmooth.main(MainGeoSmooth.java:44) > > sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > > > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) > > > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > > java.lang.reflect.Method.invoke(Method.java:606) > > > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:437) > > > org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:353) > > org.apache.flink.client.program.Client.run(Client.java:315) > > org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:584) > > org.apache.flink.client.CliFrontend.run(CliFrontend.java:290) > > org.apache.flink.client.CliFrontend$2.run(CliFrontend.java:873) > > org.apache.flink.client.CliFrontend$2.run(CliFrontend.java:870) > > > org.apache.flink.runtime.security.SecurityUtils$1.run(SecurityUtils.java:50) > > java.security.AccessController.doPrivileged(Native Method) > > javax.security.auth.Subject.doAs(Subject.java:415) > > > org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1628) > > > org.apache.flink.runtime.security.SecurityUtils.runSecured(SecurityUtils.java:47) > > org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:870) > > org.apache.flink.client.CliFrontend.main(CliFrontend.java:922) > > > > Caused by: org.apache.flink.client.program.ProgramInvocationException: The > program execution failed: Failed to submit job > dddaf104260eb0f56ff336727ceeb49e (KUBERA-GEO-BRUT2SEGMENT) > > org.apache.flink.client.program.Client.run(Client.java:413) > > org.apache.flink.client.program.Client.run(Client.java:356) > > org.apache.flink.client.program.Client.run(Client.java:349) > > > org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:63) > > > com.bouygtel.kuberasdk.main.ApplicationBatch.execCluster(ApplicationBatch.java:80) > > > com.bouygtel.kubera.main.segment.ApplicationGeoSegment.batchExec(ApplicationGeoSegment.java:68) > > com.bouygtel.kuberasdk.main.ApplicationBatch.exec(ApplicationBatch.java:51) > > com.bouygtel.kuberasdk.main.Application.mainMethod(Application.java:81) > > com.bouygtel.kubera.main.segment.MainGeoSmooth.main(MainGeoSmooth.java:44) > > sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > > > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) > > > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > > java.lang.reflect.Method.invoke(Method.java:606) > > > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:437) > > > org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:353) > > org.apache.flink.client.program.Client.run(Client.java:315) > > org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:584) > > org.apache.flink.client.CliFrontend.run(CliFrontend.java:290) > > org.apache.flink.client.CliFrontend$2.run(CliFrontend.java:873) > > org.apache.flink.client.CliFrontend$2.run(CliFrontend.java:870) > > > org.apache.flink.runtime.security.SecurityUtils$1.run(SecurityUtils.java:50) > > java.security.AccessController.doPrivileged(Native Method) > > javax.security.auth.Subject.doAs(Subject.java:415) > > > org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1628) > > > org.apache.flink.runtime.security.SecurityUtils.runSecured(SecurityUtils.java:47) > > org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:870) > > org.apache.flink.client.CliFrontend.main(CliFrontend.java:922) > > > > Caused by: org.apache.flink.runtime.client.JobExecutionException: Failed > to submit job dddaf104260eb0f56ff336727ceeb49e (KUBERA-GEO-BRUT2SEGMENT) > > org.apache.flink.runtime.jobmanager.JobManager.org > $apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:594) > > > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessages$1.applyOrElse(JobManager.scala:190) > > > scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33) > > > scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33) > > > scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25) > > > org.apache.flink.yarn.ApplicationMasterActor$$anonfun$receiveYarnMessages$1.applyOrElse(ApplicationMasterActor.scala:100) > > scala.PartialFunction$OrElse.apply(PartialFunction.scala:162) > > > org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:36) > > > org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:29) > > scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118) > > > org.apache.flink.runtime.ActorLogMessages$$anon$1.applyOrElse(ActorLogMessages.scala:29) > > akka.actor.Actor$class.aroundReceive(Actor.scala:465) > > > org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:92) > > akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) > > akka.actor.ActorCell.invoke(ActorCell.scala:487) > > akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254) > > akka.dispatch.Mailbox.run(Mailbox.scala:221) > > akka.dispatch.Mailbox.exec(Mailbox.scala:231) > > scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > > > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > > > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > > > > *Caused by: org.apache.flink.runtime.JobException: Creating the input > splits caused an error: Delegation Token can be issued only with kerberos > or web authentication* > > * at > org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getDelegationToken(FSNamesystem.java:7609)* > > * at > org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.getDelegationToken(NameNodeRpcServer.java:522)* > > * at > org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.getDelegationToken(ClientNamenodeProtocolServerSideTranslatorPB.java:977)* > > * at > org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)* > > * at > org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:619)* > > * at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:962)* > > * at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2039)* > > * at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2035)* > > * at java.security.AccessController.doPrivileged(Native Method)* > > * at javax.security.auth.Subject.doAs(Subject.java:415)* > > * at > org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1628)* > > * at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2033)* > > > > > org.apache.flink.runtime.executiongraph.ExecutionJobVertex.<init>(ExecutionJobVertex.java:162) > > > org.apache.flink.runtime.executiongraph.ExecutionGraph.attachJobGraph(ExecutionGraph.java:469) > > org.apache.flink.runtime.jobmanager.JobManager.org > $apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:534) > > > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessages$1.applyOrElse(JobManager.scala:190) > > > scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33) > > > scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33) > > > scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25) > > > org.apache.flink.yarn.ApplicationMasterActor$$anonfun$receiveYarnMessages$1.applyOrElse(ApplicationMasterActor.scala:100) > > scala.PartialFunction$OrElse.apply(PartialFunction.scala:162) > > > org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:36) > > > org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:29) > > scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118) > > > org.apache.flink.runtime.ActorLogMessages$$anon$1.applyOrElse(ActorLogMessages.scala:29) > > akka.actor.Actor$class.aroundReceive(Actor.scala:465) > > > org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:92) > > akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) > > akka.actor.ActorCell.invoke(ActorCell.scala:487) > > akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254) > > akka.dispatch.Mailbox.run(Mailbox.scala:221) > > akka.dispatch.Mailbox.exec(Mailbox.scala:231) > > scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > > > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > > > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > > > > Caused by: org.apache.hadoop.ipc.RemoteException(java.io.IOException): > Delegation Token can be issued only with kerberos or web authentication > > at > org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getDelegationToken(FSNamesystem.java:7609) > > at > org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.getDelegationToken(NameNodeRpcServer.java:522) > > at > org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.getDelegationToken(ClientNamenodeProtocolServerSideTranslatorPB.java:977) > > at > org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java) > > at > org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:619) > > at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:962) > > at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2039) > > at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2035) > > at java.security.AccessController.doPrivileged(Native Method) > > at javax.security.auth.Subject.doAs(Subject.java:415) > > at > org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1628) > > at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2033) > > > > org.apache.hadoop.ipc.Client.call(Client.java:1468) > > org.apache.hadoop.ipc.Client.call(Client.java:1399) > > > org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:232) > > com.sun.proxy.$Proxy14.getDelegationToken(Unknown Source) > > > org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getDelegationToken(ClientNamenodeProtocolTranslatorPB.java:909) > > sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > > > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) > > > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > > java.lang.reflect.Method.invoke(Method.java:606) > > > org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:187) > > > org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102) > > com.sun.proxy.$Proxy15.getDelegationToken(Unknown Source) > > org.apache.hadoop.hdfs.DFSClient.getDelegationToken(DFSClient.java:1029) > > > org.apache.hadoop.hdfs.DistributedFileSystem.getDelegationToken(DistributedFileSystem.java:1355) > > > org.apache.hadoop.fs.FileSystem.collectDelegationTokens(FileSystem.java:529) > > org.apache.hadoop.fs.FileSystem.addDelegationTokens(FileSystem.java:507) > > > org.apache.hadoop.hdfs.DistributedFileSystem.addDelegationTokens(DistributedFileSystem.java:2041) > > > org.apache.hadoop.mapreduce.security.TokenCache.obtainTokensForNamenodesInternal(TokenCache.java:121) > > > org.apache.hadoop.mapreduce.security.TokenCache.obtainTokensForNamenodesInternal(TokenCache.java:100) > > > org.apache.hadoop.mapreduce.security.TokenCache.obtainTokensForNamenodes(TokenCache.java:80) > > > org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:205) > > > org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:313) > > > org.apache.hive.hcatalog.mapreduce.HCatBaseInputFormat.getSplits(HCatBaseInputFormat.java:157) > > > org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormatBase.createInputSplits(HadoopInputFormatBase.java:140) > > > org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormatBase.createInputSplits(HadoopInputFormatBase.java:51) > > > org.apache.flink.runtime.executiongraph.ExecutionJobVertex.<init>(ExecutionJobVertex.java:146) > > > org.apache.flink.runtime.executiongraph.ExecutionGraph.attachJobGraph(ExecutionGraph.java:469) > > org.apache.flink.runtime.jobmanager.JobManager.org > $apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:534) > > > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessages$1.applyOrElse(JobManager.scala:190) > > > scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33) > > > scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33) > > > scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25) > > > org.apache.flink.yarn.ApplicationMasterActor$$anonfun$receiveYarnMessages$1.applyOrElse(ApplicationMasterActor.scala:100) > > scala.PartialFunction$OrElse.apply(PartialFunction.scala:162) > > > org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:36) > > > org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:29) > > scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118) > > > org.apache.flink.runtime.ActorLogMessages$$anon$1.applyOrElse(ActorLogMessages.scala:29) > > akka.actor.Actor$class.aroundReceive(Actor.scala:465) > > > org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:92) > > akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) > > akka.actor.ActorCell.invoke(ActorCell.scala:487) > > akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254) > > akka.dispatch.Mailbox.run(Mailbox.scala:221) > > akka.dispatch.Mailbox.exec(Mailbox.scala:231) > > scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > > > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > > > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > > > > > > > > Do you have any clue? > > > > Best regards, > > Arnaud > > > > > > ------------------------------ > > L'intégrité de ce message n'étant pas assurée sur internet, la société > expéditrice ne peut être tenue responsable de son contenu ni de ses pièces > jointes. Toute utilisation ou diffusion non autorisée est interdite. Si > vous n'êtes pas destinataire de ce message, merci de le détruire et > d'avertir l'expéditeur. > > The integrity of this message cannot be guaranteed on the Internet. The > company that sent this message cannot therefore be held liable for its > content nor attachments. Any unauthorized use or dissemination is > prohibited. If you are not the intended recipient of this message, then > please delete it and notify the sender. >