Hadoop version 2.7.3

On Tue, Jun 20, 2017 at 11:12 PM, yohann jardin <yohannjar...@hotmail.com>
wrote:

> Which version of Hadoop are you running on?
>
> *Yohann Jardin*
> Le 6/21/2017 à 1:06 AM, N B a écrit :
>
> Ok some more info about this issue to see if someone can shine a light on
> what could be going on. I turned on debug logging for
> org.apache.spark.streaming.scheduler in the driver process and this is
> what gets thrown in the logs and keeps throwing it even after the downed
> HDFS node is restarted. Using Spark 2.1.1 and HDFS 2.7.3 here.
>
> 2017-06-20 22:38:11,302 WARN JobGenerator ReceivedBlockTracker.logWarning
> - Exception thrown while writing record: BatchCleanupEvent(ArrayBuffer())
> to the WriteAheadLog.
> org.apache.spark.SparkException: Exception thrown in awaitResult:
> at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:194)
> at org.apache.spark.streaming.util.BatchedWriteAheadLog.
> write(BatchedWriteAheadLog.scala:83)
> at org.apache.spark.streaming.scheduler.ReceivedBlockTracker.writeToLog(
> ReceivedBlockTracker.scala:234)
> at org.apache.spark.streaming.scheduler.ReceivedBlockTracker.
> cleanupOldBatches(ReceivedBlockTracker.scala:171)
> at org.apache.spark.streaming.scheduler.ReceiverTracker.
> cleanupOldBlocksAndBatches(ReceiverTracker.scala:233)
> at org.apache.spark.streaming.scheduler.JobGenerator.clearCheckpointData(
> JobGenerator.scala:287)
> at org.apache.spark.streaming.scheduler.JobGenerator.org$
> apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.
> scala:187)
> at org.apache.spark.streaming.scheduler.JobGenerator$$anon$
> 1.onReceive(JobGenerator.scala:89)
> at org.apache.spark.streaming.scheduler.JobGenerator$$anon$
> 1.onReceive(JobGenerator.scala:88)
> at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
> Caused by: org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.
> HadoopIllegalArgumentException): Missing storageIDs: It is likely that
> the HDFS client, who made this call, is running in an older version of
> Hadoop which does not support storageIDs. datanodeID.length=1,
> src=/vm/spark-checkpoint/receivedBlockMetadata/log-1497997390799-1497997450799,
> fileId=0, blk=BP-1450953312-10.0.0.9-1490120290209:blk_1081472520_7731872,
> clientName=DFSClient_NONMAPREDUCE_-23097586_1, clientMachine=10.0.0.17
> at org.apache.hadoop.hdfs.server.blockmanagement.DatanodeManager.
> getDatanodeStorageInfos(DatanodeManager.java:514)
> at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.
> getAdditionalDatanode(FSNamesystem.java:3353)
> at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.
> getAdditionalDatanode(NameNodeRpcServer.java:759)
> at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSi
> deTranslatorPB.getAdditionalDatanode(ClientNamenodeProtocolServerSi
> deTranslatorPB.java:515)
> at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$
> ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.
> java)
> at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(
> ProtobufRpcEngine.java:616)
> at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:982)
> at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2049)
> at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2045)
> at java.security.AccessController.doPrivileged(Native Method)
> at javax.security.auth.Subject.doAs(Subject.java:422)
> at org.apache.hadoop.security.UserGroupInformation.doAs(
> UserGroupInformation.java:1698)
> at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2043)
>
> at org.apache.hadoop.ipc.Client.call(Client.java:1347)
> at org.apache.hadoop.ipc.Client.call(Client.java:1300)
> at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.
> invoke(ProtobufRpcEngine.java:206)
> at com.sun.proxy.$Proxy10.getAdditionalDatanode(Unknown Source)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at sun.reflect.NativeMethodAccessorImpl.invoke(
> NativeMethodAccessorImpl.java:62)
> at sun.reflect.DelegatingMethodAccessorImpl.invoke(
> DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(
> RetryInvocationHandler.java:186)
> at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(
> RetryInvocationHandler.java:102)
> at com.sun.proxy.$Proxy10.getAdditionalDatanode(Unknown Source)
> at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslat
> orPB.getAdditionalDatanode(ClientNamenodeProtocolTranslatorPB.java:352)
> at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.
> addDatanode2ExistingPipeline(DFSOutputStream.java:919)
> at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.
> setupPipelineForAppendOrRecovery(DFSOutputStream.java:1031)
> at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.
> processDatanodeError(DFSOutputStream.java:823)
> at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.
> run(DFSOutputStream.java:475)
> polygraph-engine 2017-06-20 22:38:11,302 WARN JobGenerator
> ReceivedBlockTracker.logWarning - Failed to acknowledge batch clean up in
> the Write Ahead Log.
>
> Thanks
> N B
>
>
> On Tue, Jun 20, 2017 at 10:24 AM, N B <nb.nos...@gmail.com> wrote:
>
>> BTW, this is running on Spark 2.1.1.
>>
>> I have been trying to debug this issue and what I have found till now is
>> that it is somehow related to the Spark WAL. The directory named
>> <checkpoint dir in HDFS>/receivedBlockMetadata seems to stop getting
>> written to after the point of an HDFS node being killed and restarted. I
>> have a couple questions around this :
>>
>> 1. Why is the flume receiver even writing to the WAL? I have not enabled
>> the 'spark.streaming.receiver.writeAheadLog.enable' property and even
>> after I set it explicitly to false in the driver, the WAL seems to be
>> getting written to.
>>
>> 2. Why would the receive receive metadata but not write to the WAL after
>> an HDFS node is lost and restarted? HDFS replication factor is at its
>> default of 2.
>>
>> Thanks
>> N B
>>
>>
>> On Mon, Jun 19, 2017 at 6:23 PM, N B <nb.nos...@gmail.com> wrote:
>>
>>> Hi all,
>>>
>>> We are running a Standalone Spark Cluster for running a streaming
>>> application. The application consumes data from Flume using a Flume Polling
>>> stream created as such :
>>>
>>> flumeStream = FlumeUtils.createPollingStream(streamingContext,
>>>     socketAddress.toArray(new InetSocketAddress[socketAddress.size()]),
>>>     StorageLevel.MEMORY_AND_DISK_SER(), *100*, *5*);
>>>
>>> The checkpoint directory is configured to be on an HDFS cluster and
>>> Spark workers have their SPARK_LOCAL_DIRS and SPARK_WORKER_DIR defined to
>>> be on their respective local filesystems.
>>>
>>> What we are seeing is some odd behavior and unable to explain. During
>>> normal operation, everything runs as expected with flume delivering events
>>> to Spark. However, while running, if I kill one of the HDFS nodes (does not
>>> matter which one), the Flume Receiver in Spark stops producing any data to
>>> the data processing.
>>>
>>> I enabled debug logging for org.apache.spark.streaming.flume on Spark
>>> worker nodes and looked at the logs for the one that gets to run the Flume
>>> Receiver and it keeps chugging along receiving data from Flume as shown in
>>> a sample of the log below, but the resulting batches in the Stream start
>>> receiving 0 records soon as the HDFS node is killed, with no errors being
>>> produced to indicate any issue.
>>>
>>> *17/06/20 01:05:42 DEBUG FlumeBatchFetcher: Ack sent for sequence
>>> number: 09fa05f59050*
>>> *17/06/20 01:05:44 DEBUG FlumeBatchFetcher: Received batch of 100 events
>>> with sequence number: 09fa05f59052*
>>> *17/06/20 01:05:44 DEBUG FlumeBatchFetcher: Sending ack for sequence
>>> number: 09fa05f59052*
>>> *17/06/20 01:05:44 DEBUG FlumeBatchFetcher: Ack sent for sequence
>>> number: 09fa05f59052*
>>> *17/06/20 01:05:47 DEBUG FlumeBatchFetcher: Received batch of 100 events
>>> with sequence number: 09fa05f59054*
>>> *17/06/20 01:05:47 DEBUG FlumeBatchFetcher: Sending ack for sequence
>>> number: 09fa05f59054*
>>>
>>> The driver output for the application shows (printed via
>>> Dstream.count().map().print()):
>>>
>>> -------------------------------------------
>>> Time: 1497920770000 ms
>>> -------------------------------------------
>>> Received 0     flume events.
>>>
>>>
>>> Any insights about where to look in order to find the root cause will be
>>> greatly appreciated.
>>>
>>> Thanks
>>> N B
>>>
>>>
>>
>
>

Reply via email to