Which hadoop release are you using ? Can you check hdfs audit log to see who / when deleted spark/ck/hdfsaudit/ receivedData/0/log-1430139541443-1430139601443 ?
Cheers On Mon, Apr 27, 2015 at 6:21 AM, Sea <261810...@qq.com> wrote: > Hi, all: > I use function updateStateByKey in Spark Streaming, I need to store the > states for one minite, I set "spark.cleaner.ttl" to 120, the duration is 2 > seconds, but it throws Exception > > > Caused by: > org.apache.hadoop.ipc.RemoteException(java.io.FileNotFoundException): File > does not exist: > spark/ck/hdfsaudit/receivedData/0/log-1430139541443-1430139601443 > at > org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:61) > at > org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:51) > at > org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocationsUpdateTimes(FSNamesystem.java:1499) > at > org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocationsInt(FSNamesystem.java:1448) > at > org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocations(FSNamesystem.java:1428) > at > org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocations(FSNamesystem.java:1402) > at > org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.getBlockLocations(NameNodeRpcServer.java:468) > at > org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.getBlockLocations(ClientNamenodeProtocolServerSideTranslatorPB.java:269) > at > org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java:59566) > at > org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:585) > at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:928) > at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2048) > at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2044) > at java.security.AccessController.doPrivileged(Native Method) > at javax.security.auth.Subject.doAs(Subject.java:396) > at > org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1491) > at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2042) > > at org.apache.hadoop.ipc.Client.call(Client.java:1347) > at org.apache.hadoop.ipc.Client.call(Client.java:1300) > at > org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:206) > at com.sun.proxy.$Proxy14.getBlockLocations(Unknown Source) > at > org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getBlockLocations(ClientNamenodeProtocolTranslatorPB.java:188) > at sun.reflect.GeneratedMethodAccessor21.invoke(Unknown Source) > > Why? > > my code is > > ssc = StreamingContext(sc,2) > kvs = KafkaUtils.createStream(ssc, zkQuorum, group, {topic: 1}) > kvs.window(60,2).map(lambda x: analyzeMessage(x[1]))\ > .filter(lambda x: x[1] != None).updateStateByKey(updateStateFunc) \ > .filter(lambda x: x[1]['isExisted'] != 1) \ > .foreachRDD(lambda rdd: rdd.foreachPartition(insertIntoDb)) > > >