Hadoop version 2.7.3 On Tue, Jun 20, 2017 at 11:12 PM, yohann jardin <yohannjar...@hotmail.com> wrote:
> Which version of Hadoop are you running on? > > *Yohann Jardin* > Le 6/21/2017 à 1:06 AM, N B a écrit : > > Ok some more info about this issue to see if someone can shine a light on > what could be going on. I turned on debug logging for > org.apache.spark.streaming.scheduler in the driver process and this is > what gets thrown in the logs and keeps throwing it even after the downed > HDFS node is restarted. Using Spark 2.1.1 and HDFS 2.7.3 here. > > 2017-06-20 22:38:11,302 WARN JobGenerator ReceivedBlockTracker.logWarning > - Exception thrown while writing record: BatchCleanupEvent(ArrayBuffer()) > to the WriteAheadLog. > org.apache.spark.SparkException: Exception thrown in awaitResult: > at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:194) > at org.apache.spark.streaming.util.BatchedWriteAheadLog. > write(BatchedWriteAheadLog.scala:83) > at org.apache.spark.streaming.scheduler.ReceivedBlockTracker.writeToLog( > ReceivedBlockTracker.scala:234) > at org.apache.spark.streaming.scheduler.ReceivedBlockTracker. > cleanupOldBatches(ReceivedBlockTracker.scala:171) > at org.apache.spark.streaming.scheduler.ReceiverTracker. > cleanupOldBlocksAndBatches(ReceiverTracker.scala:233) > at org.apache.spark.streaming.scheduler.JobGenerator.clearCheckpointData( > JobGenerator.scala:287) > at org.apache.spark.streaming.scheduler.JobGenerator.org$ > apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator. > scala:187) > at org.apache.spark.streaming.scheduler.JobGenerator$$anon$ > 1.onReceive(JobGenerator.scala:89) > at org.apache.spark.streaming.scheduler.JobGenerator$$anon$ > 1.onReceive(JobGenerator.scala:88) > at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) > Caused by: org.apache.hadoop.ipc.RemoteException(org.apache.hadoop. > HadoopIllegalArgumentException): Missing storageIDs: It is likely that > the HDFS client, who made this call, is running in an older version of > Hadoop which does not support storageIDs. datanodeID.length=1, > src=/vm/spark-checkpoint/receivedBlockMetadata/log-1497997390799-1497997450799, > fileId=0, blk=BP-1450953312-10.0.0.9-1490120290209:blk_1081472520_7731872, > clientName=DFSClient_NONMAPREDUCE_-23097586_1, clientMachine=10.0.0.17 > at org.apache.hadoop.hdfs.server.blockmanagement.DatanodeManager. > getDatanodeStorageInfos(DatanodeManager.java:514) > at org.apache.hadoop.hdfs.server.namenode.FSNamesystem. > getAdditionalDatanode(FSNamesystem.java:3353) > at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer. > getAdditionalDatanode(NameNodeRpcServer.java:759) > at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSi > deTranslatorPB.getAdditionalDatanode(ClientNamenodeProtocolServerSi > deTranslatorPB.java:515) > at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ > ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos. > java) > at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call( > ProtobufRpcEngine.java:616) > at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:982) > at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2049) > at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2045) > at java.security.AccessController.doPrivileged(Native Method) > at javax.security.auth.Subject.doAs(Subject.java:422) > at org.apache.hadoop.security.UserGroupInformation.doAs( > UserGroupInformation.java:1698) > at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2043) > > at org.apache.hadoop.ipc.Client.call(Client.java:1347) > at org.apache.hadoop.ipc.Client.call(Client.java:1300) > at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker. > invoke(ProtobufRpcEngine.java:206) > at com.sun.proxy.$Proxy10.getAdditionalDatanode(Unknown Source) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at sun.reflect.NativeMethodAccessorImpl.invoke( > NativeMethodAccessorImpl.java:62) > at sun.reflect.DelegatingMethodAccessorImpl.invoke( > DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod( > RetryInvocationHandler.java:186) > at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke( > RetryInvocationHandler.java:102) > at com.sun.proxy.$Proxy10.getAdditionalDatanode(Unknown Source) > at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslat > orPB.getAdditionalDatanode(ClientNamenodeProtocolTranslatorPB.java:352) > at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer. > addDatanode2ExistingPipeline(DFSOutputStream.java:919) > at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer. > setupPipelineForAppendOrRecovery(DFSOutputStream.java:1031) > at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer. > processDatanodeError(DFSOutputStream.java:823) > at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer. > run(DFSOutputStream.java:475) > polygraph-engine 2017-06-20 22:38:11,302 WARN JobGenerator > ReceivedBlockTracker.logWarning - Failed to acknowledge batch clean up in > the Write Ahead Log. > > Thanks > N B > > > On Tue, Jun 20, 2017 at 10:24 AM, N B <nb.nos...@gmail.com> wrote: > >> BTW, this is running on Spark 2.1.1. >> >> I have been trying to debug this issue and what I have found till now is >> that it is somehow related to the Spark WAL. The directory named >> <checkpoint dir in HDFS>/receivedBlockMetadata seems to stop getting >> written to after the point of an HDFS node being killed and restarted. I >> have a couple questions around this : >> >> 1. Why is the flume receiver even writing to the WAL? I have not enabled >> the 'spark.streaming.receiver.writeAheadLog.enable' property and even >> after I set it explicitly to false in the driver, the WAL seems to be >> getting written to. >> >> 2. Why would the receive receive metadata but not write to the WAL after >> an HDFS node is lost and restarted? HDFS replication factor is at its >> default of 2. >> >> Thanks >> N B >> >> >> On Mon, Jun 19, 2017 at 6:23 PM, N B <nb.nos...@gmail.com> wrote: >> >>> Hi all, >>> >>> We are running a Standalone Spark Cluster for running a streaming >>> application. The application consumes data from Flume using a Flume Polling >>> stream created as such : >>> >>> flumeStream = FlumeUtils.createPollingStream(streamingContext, >>> socketAddress.toArray(new InetSocketAddress[socketAddress.size()]), >>> StorageLevel.MEMORY_AND_DISK_SER(), *100*, *5*); >>> >>> The checkpoint directory is configured to be on an HDFS cluster and >>> Spark workers have their SPARK_LOCAL_DIRS and SPARK_WORKER_DIR defined to >>> be on their respective local filesystems. >>> >>> What we are seeing is some odd behavior and unable to explain. During >>> normal operation, everything runs as expected with flume delivering events >>> to Spark. However, while running, if I kill one of the HDFS nodes (does not >>> matter which one), the Flume Receiver in Spark stops producing any data to >>> the data processing. >>> >>> I enabled debug logging for org.apache.spark.streaming.flume on Spark >>> worker nodes and looked at the logs for the one that gets to run the Flume >>> Receiver and it keeps chugging along receiving data from Flume as shown in >>> a sample of the log below, but the resulting batches in the Stream start >>> receiving 0 records soon as the HDFS node is killed, with no errors being >>> produced to indicate any issue. >>> >>> *17/06/20 01:05:42 DEBUG FlumeBatchFetcher: Ack sent for sequence >>> number: 09fa05f59050* >>> *17/06/20 01:05:44 DEBUG FlumeBatchFetcher: Received batch of 100 events >>> with sequence number: 09fa05f59052* >>> *17/06/20 01:05:44 DEBUG FlumeBatchFetcher: Sending ack for sequence >>> number: 09fa05f59052* >>> *17/06/20 01:05:44 DEBUG FlumeBatchFetcher: Ack sent for sequence >>> number: 09fa05f59052* >>> *17/06/20 01:05:47 DEBUG FlumeBatchFetcher: Received batch of 100 events >>> with sequence number: 09fa05f59054* >>> *17/06/20 01:05:47 DEBUG FlumeBatchFetcher: Sending ack for sequence >>> number: 09fa05f59054* >>> >>> The driver output for the application shows (printed via >>> Dstream.count().map().print()): >>> >>> ------------------------------------------- >>> Time: 1497920770000 ms >>> ------------------------------------------- >>> Received 0 flume events. >>> >>> >>> Any insights about where to look in order to find the root cause will be >>> greatly appreciated. >>> >>> Thanks >>> N B >>> >>> >> > >