I'm trying to use the wordcount example with the local file system, but it's 
giving me permissions error or it's not finding it. It works just fine for 
input and output on S3. What is the correct URI usage for the local file system 
and HDFS?

I have installed Flink on EMR and am just using the flink run script to start 
the job:

% flink run -m yarn-cluster -yn 2 
/usr/lib/flink/examples/streaming/WordCount.jar --input 
file:///home/hadoop/LICENSE.txt

<snip>

The program finished with the following exception:

org.apache.flink.client.program.ProgramInvocationException: The program 
execution failed: Failed to submit job 8a9efe4f99c5cad5897c18146fb66309 
(Streaming WordCount)
    at org.apache.flink.client.program.Client.runBlocking(Client.java:381)
    at org.apache.flink.client.program.Client.runBlocking(Client.java:355)
    at 
org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:65)
    at 
org.apache.flink.streaming.examples.wordcount.WordCount.main(WordCount.java:94)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505)
    at 
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403)
    at org.apache.flink.client.program.Client.runBlocking(Client.java:248)
    at 
org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:866)
    at org.apache.flink.client.CliFrontend.run(CliFrontend.java:333)
    at 
org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1192)
    at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1243)
Caused by: org.apache.flink.runtime.client.JobExecutionException: Failed to 
submit job 8a9efe4f99c5cad5897c18146fb66309 (Streaming WordCount)
    at 
org.apache.flink.runtime.jobmanager.JobManager.org$apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:1120)
    at 
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1.applyOrElse(JobManager.scala:381)
    at 
scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
    at 
scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
    at 
scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
    at 
org.apache.flink.yarn.YarnJobManager$$anonfun$handleYarnMessage$1.applyOrElse(YarnJobManager.scala:153)
    at scala.PartialFunction$OrElse.apply(PartialFunction.scala:162)
    at 
org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:36)
    at 
scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
    at 
scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
    at 
scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
    at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33)
    at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28)
    at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
    at 
org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28)
    at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
    at 
org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:107)
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
    at akka.actor.ActorCell.invoke(ActorCell.scala:487)
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
    at akka.dispatch.Mailbox.run(Mailbox.scala:221)
    at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at 
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: org.apache.flink.runtime.JobException: Creating the input splits 
caused an error: File file:/home/hadoop/LICENSE.txt does not exist or the user 
running Flink ('yarn') has insufficient permissions to access it.
    at 
org.apache.flink.runtime.executiongraph.ExecutionJobVertex.<init>(ExecutionJobVertex.java:172)
    at 
org.apache.flink.runtime.executiongraph.ExecutionGraph.attachJobGraph(ExecutionGraph.java:679)
    at 
org.apache.flink.runtime.jobmanager.JobManager.org$apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:1026)
    ... 25 more
Caused by: java.io.FileNotFoundException: File file:/home/hadoop/LICENSE.txt 
does not exist or the user running Flink ('yarn') has insufficient permissions 
to access it.
    at 
org.apache.flink.core.fs.local.LocalFileSystem.getFileStatus(LocalFileSystem.java:108)
    at 
org.apache.flink.api.common.io.FileInputFormat.createInputSplits(FileInputFormat.java:451)
    at 
org.apache.flink.api.common.io.FileInputFormat.createInputSplits(FileInputFormat.java:57)
    at 
org.apache.flink.runtime.executiongraph.ExecutionJobVertex.<init>(ExecutionJobVertex.java:156)
    ... 27 more
Shutting down YARN cluster

% ls -al LICENSE.txt
-rwxr--r-- 1 hadoop hadoop 15419 Aug 23 14:52 LICENSE.txt

Are there really permissions issues and if so how would I correct that (since 
permissions are ostensibly correct for any other application)?

For HDFS, I tried this but got a protobuf exception:

% flink run -m yarn-cluster -yn 2 
/usr/lib/flink/examples/streaming/WordCount.jar --input 
hdfs://ec2-54-208-188-194.compute-1.amazonaws.com:50070/user/hadoop/LICENSE.txt

<snip>
------------------------------------------------------------
The program finished with the following exception:

org.apache.flink.client.program.ProgramInvocationException: The program 
execution failed: Failed to submit job ed49701a40c805d3b6e874568170fc74 
(Streaming WordCount)
    at org.apache.flink.client.program.Client.runBlocking(Client.java:381)
    at org.apache.flink.client.program.Client.runBlocking(Client.java:355)
    at 
org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:65)
    at 
org.apache.flink.streaming.examples.wordcount.WordCount.main(WordCount.java:94)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505)
    at 
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403)
    at org.apache.flink.client.program.Client.runBlocking(Client.java:248)
    at 
org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:866)
    at org.apache.flink.client.CliFrontend.run(CliFrontend.java:333)
    at 
org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1192)
    at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1243)
Caused by: org.apache.flink.runtime.client.JobExecutionException: Failed to 
submit job ed49701a40c805d3b6e874568170fc74 (Streaming WordCount)
    at 
org.apache.flink.runtime.jobmanager.JobManager.org$apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:1120)
    at 
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1.applyOrElse(JobManager.scala:381)
    at 
scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
    at 
scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
    at 
scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
    at 
org.apache.flink.yarn.YarnJobManager$$anonfun$handleYarnMessage$1.applyOrElse(YarnJobManager.scala:153)
    at scala.PartialFunction$OrElse.apply(PartialFunction.scala:162)
    at 
org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:36)
    at 
scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
    at 
scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
    at 
scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
    at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33)
    at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28)
    at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
    at 
org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28)
    at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
    at 
org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:107)
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
    at akka.actor.ActorCell.invoke(ActorCell.scala:487)
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
    at akka.dispatch.Mailbox.run(Mailbox.scala:221)
    at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at 
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: org.apache.flink.runtime.JobException: Creating the input splits 
caused an error: Failed on local exception: java.io.IOException: 
org.apache.flink.hadoop.shaded.com.google.protobuf.InvalidProtocolBufferException:
 Protocol message end-group tag did not match expected tag.; Host Details : 
local host is: "ip-172-31-23-253/172.31.23.253"; destination host is: 
"ec2-54-208-188-194.compute-1.amazonaws.com":50070;
    at 
org.apache.flink.runtime.executiongraph.ExecutionJobVertex.<init>(ExecutionJobVertex.java:172)
    at 
org.apache.flink.runtime.executiongraph.ExecutionGraph.attachJobGraph(ExecutionGraph.java:679)
    at 
org.apache.flink.runtime.jobmanager.JobManager.org$apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:1026)
    ... 25 more
Caused by: java.io.IOException: Failed on local exception: java.io.IOException: 
org.apache.flink.hadoop.shaded.com.google.protobuf.InvalidProtocolBufferException:
 Protocol message end-group tag did not match expected tag.; Host Details : 
local host is: "ip-172-31-23-253/172.31.23.253"; destination host is: 
"ec2-54-208-188-194.compute-1.amazonaws.com":50070;
    at org.apache.hadoop.net.NetUtils.wrapException(NetUtils.java:773)
    at org.apache.hadoop.ipc.Client.call(Client.java:1479)
    at org.apache.hadoop.ipc.Client.call(Client.java:1412)
    at 
org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:229)
    at com.sun.proxy.$Proxy11.getFileInfo(Unknown Source)
    at 
org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getFileInfo(ClientNamenodeProtocolTranslatorPB.java:771)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at 
org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:191)
    at 
org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
    at com.sun.proxy.$Proxy12.getFileInfo(Unknown Source)
    at org.apache.hadoop.hdfs.DFSClient.getFileInfo(DFSClient.java:2108)
    at 
org.apache.hadoop.hdfs.DistributedFileSystem$22.doCall(DistributedFileSystem.java:1305)
    at 
org.apache.hadoop.hdfs.DistributedFileSystem$22.doCall(DistributedFileSystem.java:1301)
    at 
org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
    at 
org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(DistributedFileSystem.java:1301)
    at 
org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.getFileStatus(HadoopFileSystem.java:351)
    at 
org.apache.flink.api.common.io.FileInputFormat.createInputSplits(FileInputFormat.java:451)
    at 
org.apache.flink.api.common.io.FileInputFormat.createInputSplits(FileInputFormat.java:57)
    at 
org.apache.flink.runtime.executiongraph.ExecutionJobVertex.<init>(ExecutionJobVertex.java:156)
    ... 27 more
Caused by: java.io.IOException: 
org.apache.flink.hadoop.shaded.com.google.protobuf.InvalidProtocolBufferException:
 Protocol message end-group tag did not match expected tag.
    at org.apache.hadoop.ipc.Client$Connection$1.run(Client.java:687)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:422)
    at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1657)
    at 
org.apache.hadoop.ipc.Client$Connection.handleSaslConnectionFailure(Client.java:650)
    at org.apache.hadoop.ipc.Client$Connection.setupIOstreams(Client.java:737)
    at org.apache.hadoop.ipc.Client$Connection.access$2900(Client.java:375)
    at org.apache.hadoop.ipc.Client.getConnection(Client.java:1528)
    at org.apache.hadoop.ipc.Client.call(Client.java:1451)
    ... 47 more
Caused by: 
org.apache.flink.hadoop.shaded.com.google.protobuf.InvalidProtocolBufferException:
 Protocol message end-group tag did not match expected tag.
    at 
org.apache.flink.hadoop.shaded.com.google.protobuf.InvalidProtocolBufferException.invalidEndTag(InvalidProtocolBufferException.java:94)
    at 
org.apache.flink.hadoop.shaded.com.google.protobuf.CodedInputStream.checkLastTagWas(CodedInputStream.java:124)
    at 
org.apache.flink.hadoop.shaded.com.google.protobuf.AbstractParser.parsePartialFrom(AbstractParser.java:143)
    at 
org.apache.flink.hadoop.shaded.com.google.protobuf.AbstractParser.parseFrom(AbstractParser.java:176)
    at 
org.apache.flink.hadoop.shaded.com.google.protobuf.AbstractParser.parseFrom(AbstractParser.java:188)
    at 
org.apache.flink.hadoop.shaded.com.google.protobuf.AbstractParser.parseFrom(AbstractParser.java:193)
    at 
org.apache.flink.hadoop.shaded.com.google.protobuf.AbstractParser.parseFrom(AbstractParser.java:49)
    at 
org.apache.hadoop.ipc.protobuf.RpcHeaderProtos$RpcResponseHeaderProto.parseFrom(RpcHeaderProtos.java:3147)
    at 
org.apache.hadoop.ipc.ProtobufRpcEngine$RpcResponseMessageWrapper.parseHeaderFrom(ProtobufRpcEngine.java:441)
    at 
org.apache.hadoop.ipc.ProtobufRpcEngine$RpcResponseMessageWrapper.parseHeaderFrom(ProtobufRpcEngine.java:417)
    at 
org.apache.hadoop.ipc.ProtobufRpcEngine$RpcMessageWithHeader.readFields(ProtobufRpcEngine.java:337)
    at 
org.apache.hadoop.ipc.ProtobufRpcEngine$RpcResponseMessageWrapper.readFields(ProtobufRpcEngine.java:417)
    at 
org.apache.hadoop.security.SaslRpcClient.saslConnect(SaslRpcClient.java:370)
    at 
org.apache.hadoop.ipc.Client$Connection.setupSaslConnection(Client.java:560)
    at org.apache.hadoop.ipc.Client$Connection.access$1900(Client.java:375)
    at org.apache.hadoop.ipc.Client$Connection$2.run(Client.java:729)
    at org.apache.hadoop.ipc.Client$Connection$2.run(Client.java:725)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:422)
    at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1657)
    at org.apache.hadoop.ipc.Client$Connection.setupIOstreams(Client.java:724)
    ... 50 more
Shutting down YARN cluster

Thanks,
Craig


Reply via email to