Hi All,
I’m trying to write streaming processed data in HDFS (Hadoop 2). The buffer is
flushed and closed after each writing. The following errors occurred when
opening the same file to append. I know for sure the error is caused by closing
the file. Any idea?
Here is the code to write HDFS
def appendToFile(id: String, text: String): Unit = {
println("Request to write " + text.getBytes().length + " bytes,
MAX_BUF_SIZE: " + LogConstants.MAX_BUF_SIZE)
println("+++ Write to file id = " + id)
if (bufMap == null) {
init
}
var fsout: FSDataOutputStream = null
val filename = LogConstants.FILE_PATH + id
try {
fsout = getFSDOS(id, filename)
println("Write " + text.getBytes().length + "of bytes in Text to [" +
filename + "]")
fsout.writeBytes(text)
fsout.flush()
// fsout.sync()
// } catch {
// case e: InterruptedException =>
} finally {
if (fsout != null)
fsout.close()
}
}
Here are the errors observed:
+++ Write to file id = 0
Wrote 129820 bytes
+++ Write to file id = 0
14/11/05 18:01:35 ERROR Executor: Exception in task ID 998
14/11/05 18:01:35 ERROR Executor: Exception in task ID 998
org.apache.hadoop.ipc.RemoteException(java.lang.ArrayIndexOutOfBoundsException):
0
at
org.apache.hadoop.hdfs.server.blockmanagement.DatanodeManager.getDatanodeStorageInfos(DatanodeManager.java:467)
at
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.updatePipelineInternal(FSNamesystem.java:5969)
at
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.updatePipeline(FSNamesystem.java:5932)
at
org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.updatePipeline(NameNodeRpcServer.java:651)
at
org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.updatePipeline(ClientNamenodeProtocolServerSideTranslatorPB.java:889)
at
org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
at
org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:585)
at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:1026)
at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:1986)
at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:1982)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:415)
at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1548)
at org.apache.hadoop.ipc.Server$Handler.run(Server.java:1980)
at org.apache.hadoop.ipc.Client.call(Client.java:1347)
at org.apache.hadoop.ipc.Client.call(Client.java:1300)
at
org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:206)
at com.sun.proxy.$Proxy11.updatePipeline(Unknown Source)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at
org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:186)
at
org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
at com.sun.proxy.$Proxy11.updatePipeline(Unknown Source)
at
org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.updatePipeline(ClientNamenodeProtocolTranslatorPB.java:791)
at
org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.setupPipelineForAppendOrRecovery(DFSOutputStream.java:1047)
at
org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:520)
14/11/05 18:01:36 ERROR TaskSetManager: Task 53.0:7 failed 1 times; aborting job
14/11/05 18:01:36 ERROR JobScheduler: Error running job streaming job
1415239295000 ms.0
org.apache.spark.SparkException: Job aborted due to stage failure: Task 53.0:7
failed 1 times, most recent failure: Exception failure in TID 998 on host
localhost:
org.apache.hadoop.ipc.RemoteException(java.lang.ArrayIndexOutOfBoundsException):
0
at
org.apache.hadoop.hdfs.server.blockmanagement.DatanodeManager.getDatanodeStorageInfos(DatanodeManager.java:467)
at
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.updatePipelineInternal(FSNamesystem.java:5969)
at
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.updatePipeline(FSNamesystem.java:5932)
at
org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.updatePipeline(NameNodeRpcServer.java:651)
at
org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.updatePipeline(ClientNamenodeProtocolServerSideTranslatorPB.java:889)
at
org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
at
org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:585)
at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:1026)
at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:1986)
at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:1982)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:415)
at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1548)
at org.apache.hadoop.ipc.Server$Handler.run(Server.java:1980)
org.apache.hadoop.ipc.Client.call(Client.java:1347)
org.apache.hadoop.ipc.Client.call(Client.java:1300)
org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:206)
com.sun.proxy.$Proxy11.updatePipeline(Unknown Source)
sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
java.lang.reflect.Method.invoke(Method.java:606)
org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:186)
org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
com.sun.proxy.$Proxy11.updatePipeline(Unknown Source)
org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.updatePipeline(ClientNamenodeProtocolTranslatorPB.java:791)
org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.setupPipelineForAppendOrRecovery(DFSOutputStream.java:1047)
org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:520)
Driver stacktrace:
at
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1033)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1017)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1015)
at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1015)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:633)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:633)
at scala.Option.foreach(Option.scala:236)
at
org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:633)
at
org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1207)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
at akka.actor.ActorCell.invoke(ActorCell.scala:456)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
at akka.dispatch.Mailbox.run(Mailbox.scala:219)
at
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Exception in thread "main" org.apache.spark.SparkException: Job aborted due to
stage failure: Task 53.0:7 failed 1 times, most recent failure: Exception
failure in TID 998 on host localhost:
org.apache.hadoop.ipc.RemoteException(java.lang.ArrayIndexOutOfBoundsException):
0
at
org.apache.hadoop.hdfs.server.blockmanagement.DatanodeManager.getDatanodeStorageInfos(DatanodeManager.java:467)
at
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.updatePipelineInternal(FSNamesystem.java:5969)
at
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.updatePipeline(FSNamesystem.java:5932)
at
org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.updatePipeline(NameNodeRpcServer.java:651)
at
org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.updatePipeline(ClientNamenodeProtocolServerSideTranslatorPB.java:889)
at
org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
at
org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:585)
at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:1026)
at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:1986)
at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:1982)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:415)
at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1548)
at org.apache.hadoop.ipc.Server$Handler.run(Server.java:1980)
org.apache.hadoop.ipc.Client.call(Client.java:1347)
org.apache.hadoop.ipc.Client.call(Client.java:1300)
org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:206)
com.sun.proxy.$Proxy11.updatePipeline(Unknown Source)
sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
java.lang.reflect.Method.invoke(Method.java:606)
org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:186)
org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
com.sun.proxy.$Proxy11.updatePipeline(Unknown Source)
org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.updatePipeline(ClientNamenodeProtocolTranslatorPB.java:791)
org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.setupPipelineForAppendOrRecovery(DFSOutputStream.java:1047)
org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:520)
Driver stacktrace:
at
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1033)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1017)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1015)
at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1015)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:633)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:633)
at scala.Option.foreach(Option.scala:236)
at
org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:633)
at
org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1207)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
at akka.actor.ActorCell.invoke(ActorCell.scala:456)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
at akka.dispatch.Mailbox.run(Mailbox.scala:219)
at
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
^C14/11/05 18:01:38 ERROR DFSClient: Failed to close file
/user/training/apollo/logs/0