Thanks, did that and now I am getting an out of memory. But I am not
sure where this occurs. It can't be on the spark executor as I have 28GB
allocated to it. It is not the driver because I run this locally and
monitor it via jvisualvm. Unfortunately I can't jmx-monitor hadoop.
From the stacktrace it seems it fails remotelly, after
at org.apache.hadoop.hdfs.DFSClient.listPaths(DFSClient.java:1969)
Maybe at the namenode. Will try to increase it's memory.
java.io.IOException: com.google.protobuf.ServiceException:
java.lang.OutOfMemoryError: GC overhead limit exceeded
at
org.apache.hadoop.ipc.ProtobufHelper.getRemoteException(ProtobufHelper.java:47)
at
org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getListing(ClientNamenodeProtocolTranslatorPB.java:561)
at sun.reflect.GeneratedMethodAccessor24.invoke(Unknown Source)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at
org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:187)
at
org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
at com.sun.proxy.$Proxy10.getListing(Unknown Source)
*_ at org.apache.hadoop.hdfs.DFSClient.listPaths(DFSClient.java:1969)_*
at org.apache.hadoop.hdfs.DFSClient.listPaths(DFSClient.java:1952)
at
org.apache.hadoop.hdfs.DistributedFileSystem.listStatusInternal(DistributedFileSystem.java:724)
at
org.apache.hadoop.hdfs.DistributedFileSystem.access$600(DistributedFileSystem.java:105)
at
org.apache.hadoop.hdfs.DistributedFileSystem$15.doCall(DistributedFileSystem.java:755)
at
org.apache.hadoop.hdfs.DistributedFileSystem$15.doCall(DistributedFileSystem.java:751)
at
org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
at
org.apache.hadoop.hdfs.DistributedFileSystem.listStatus(DistributedFileSystem.java:751)
at org.apache.hadoop.fs.Globber.listStatus(Globber.java:69)
at org.apache.hadoop.fs.Globber.glob(Globber.java:217)
at org.apache.hadoop.fs.FileSystem.globStatus(FileSystem.java:1644)
at
org.apache.hadoop.mapreduce.lib.input.FileInputFormat.singleThreadedListStatus(FileInputFormat.java:292)
at
org.apache.hadoop.mapreduce.lib.input.FileInputFormat.listStatus(FileInputFormat.java:264)
at
org.apache.spark.input.StreamFileInputFormat.setMinPartitions(PortableDataStream.scala:47)
at
org.apache.spark.rdd.BinaryFileRDD.getPartitions(BinaryFileRDD.scala:43)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:217)
at
org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:32)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:217)
at
org.apache.spark.Partitioner$.defaultPartitioner(Partitioner.scala:65)
at org.apache.spark.rdd.RDD.groupBy(RDD.scala:555)
at
com.stratified.crossref.CitationExtractorJob$.extractCitations(CitationExtractorJob.scala:78)
at
com.stratified.crossref.CitationExtractorJob$.execute(CitationExtractorJob.scala:32)
at
com.stratified.crossref.CitationExtractorJob$.main(CitationExtractorJob.scala:20)
at
com.stratified.crossref.CitationExtractorJob.main(CitationExtractorJob.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at
org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:480)
Caused by: com.google.protobuf.ServiceException:
java.lang.OutOfMemoryError: GC overhead limit exceeded
at
org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:274)
at com.sun.proxy.$Proxy9.getListing(Unknown Source)
at
org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getListing(ClientNamenodeProtocolTranslatorPB.java:554)
... 41 more
Caused by: java.lang.OutOfMemoryError: GC overhead limit exceeded
at com.google.protobuf.ByteString.copyFrom(ByteString.java:192)
at
com.google.protobuf.CodedInputStream.readBytes(CodedInputStream.java:324)
at
org.apache.hadoop.hdfs.protocol.proto.HdfsProtos$HdfsFileStatusProto.<init>(HdfsProtos.java:21261)
at
org.apache.hadoop.hdfs.protocol.proto.HdfsProtos$HdfsFileStatusProto.<init>(HdfsProtos.java:21172)
at
org.apache.hadoop.hdfs.protocol.proto.HdfsProtos$HdfsFileStatusProto$1.parsePartialFrom(HdfsProtos.java:21360)
at
org.apache.hadoop.hdfs.protocol.proto.HdfsProtos$HdfsFileStatusProto$1.parsePartialFrom(HdfsProtos.java:21355)
at
com.google.protobuf.CodedInputStream.readMessage(CodedInputStream.java:309)
at
org.apache.hadoop.hdfs.protocol.proto.HdfsProtos$DirectoryListingProto.<init>(HdfsProtos.java:24929)
at
org.apache.hadoop.hdfs.protocol.proto.HdfsProtos$DirectoryListingProto.<init>(HdfsProtos.java:24876)
at
org.apache.hadoop.hdfs.protocol.proto.HdfsProtos$DirectoryListingProto$1.parsePartialFrom(HdfsProtos.java:24970)
at
org.apache.hadoop.hdfs.protocol.proto.HdfsProtos$DirectoryListingProto$1.parsePartialFrom(HdfsProtos.java:24965)
at
com.google.protobuf.CodedInputStream.readMessage(CodedInputStream.java:309)
at
org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$GetListingResponseProto.<init>(ClientNamenodeProtocolProtos.java:26824)
at
org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$GetListingResponseProto.<init>(ClientNamenodeProtocolProtos.java:26771)
at
org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$GetListingResponseProto$1.parsePartialFrom(ClientNamenodeProtocolProtos.java:26862)
at
org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$GetListingResponseProto$1.parsePartialFrom(ClientNamenodeProtocolProtos.java:26857)
at
org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$GetListingResponseProto$Builder.mergeFrom(ClientNamenodeProtocolProtos.java:27167)
at
org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$GetListingResponseProto$Builder.mergeFrom(ClientNamenodeProtocolProtos.java:27050)
at
com.google.protobuf.AbstractMessage$Builder.mergeFrom(AbstractMessage.java:337)
at
com.google.protobuf.AbstractMessage$Builder.mergeFrom(AbstractMessage.java:267)
at
com.google.protobuf.AbstractMessageLite$Builder.mergeFrom(AbstractMessageLite.java:170)
at
com.google.protobuf.AbstractMessage$Builder.mergeFrom(AbstractMessage.java:882)
at
com.google.protobuf.AbstractMessage$Builder.mergeFrom(AbstractMessage.java:267)
at
com.google.protobuf.AbstractMessageLite$Builder.mergeFrom(AbstractMessageLite.java:161)
at
com.google.protobuf.AbstractMessage$Builder.mergeFrom(AbstractMessage.java:875)
at
com.google.protobuf.AbstractMessage$Builder.mergeFrom(AbstractMessage.java:267)
at
org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:264)
at com.sun.proxy.$Proxy9.getListing(Unknown Source)
at
org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getListing(ClientNamenodeProtocolTranslatorPB.java:554)
at sun.reflect.GeneratedMethodAccessor24.invoke(Unknown Source)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
On 08/06/15 15:12, Ewan Leith wrote:
Try putting a * on the end of xmlDir, i.e.
xmlDir = fdfs:///abc/def/*
Rather than
xmlDir = Hdfs://abc/def
and see what happens. I don't know why, but that appears to be more reliable
for me with S3 as the filesystem.
I'm also using binaryFiles, but I've tried running the same command while
wholeTextFiles and had the same error.
Ewan
-----Original Message-----
From: Kostas Kougios [mailto:kostas.koug...@googlemail.com]
Sent: 08 June 2015 15:02
To: user@spark.apache.org
Subject: spark timesout maybe due to binaryFiles() with more than 1 million
files in HDFS
I am reading millions of xml files via
val xmls = sc.binaryFiles(xmlDir)
The operation runs fine locally but on yarn it fails with:
client token: N/A
diagnostics: Application application_1433491939773_0012 failed 2 times due to
ApplicationMaster for attempt appattempt_1433491939773_0012_000002 timed out.
Failing the application.
ApplicationMaster host: N/A
ApplicationMaster RPC port: -1
queue: default
start time: 1433750951883
final status: FAILED
tracking URL:
http://controller01:8088/cluster/app/application_1433491939773_0012
user: ariskk
Exception in thread "main" org.apache.spark.SparkException: Application
finished with failed status at org.apache.spark.deploy.yarn.Client.run(Client.scala:622)
at org.apache.spark.deploy.yarn.Client$.main(Client.scala:647)
at org.apache.spark.deploy.yarn.Client.main(Client.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at
org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:569)
at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:166)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:189)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:110)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
On hadoops/userlogs logs I am frequently getting these messages:
15/06/08 09:15:38 WARN util.AkkaUtils: Error sending message [message =
Heartbeat(1,[Lscala.Tuple2;@2b4f336b,BlockManagerId(1,
controller01.stratified, 58510))] in 2 attempts
java.util.concurrent.TimeoutException: Futures timed out after [30 seconds] at
scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107)
at
scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
at scala.concurrent.Await$.result(package.scala:107)
at org.apache.spark.util.AkkaUtils$.askWithReply(AkkaUtils.scala:195)
at org.apache.spark.executor.Executor$$anon$1.run(Executor.scala:427)
I run my spark job via spark-submit and it works for an other HDFS directory
that contains only 37k files. Any ideas how to resolve this?
--
View this message in context:
http://apache-spark-user-list.1001560.n3.nabble.com/spark-timesout-maybe-due-to-binaryFiles-with-more-than-1-million-files-in-HDFS-tp23208.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.
---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional
commands, e-mail: user-h...@spark.apache.org