Is the operation slow every time or does it run normally if you repeat the operation within the same app?
Nick On Thu, Dec 18, 2014 at 8:56 AM, Jon Chase <jon.ch...@gmail.com> wrote: > I'm running a very simple Spark application that downloads files from S3, > does a bit of mapping, then uploads new files. Each file is roughly 2MB > and is gzip'd. I was running the same code on Amazon's EMR w/Spark and not > having any download speed issues (Amazon's EMR provides a custom > implementation of the s3n:// file system, FWIW). > > When I say exceedingly slow, I mean that it takes about 2 minutes to > download and process a 2MB file (this was taking ~2 seconds on the same > instance types in Amazon's EMR). When I download the same file from the > EC2 machine with wget or curl, it downloads in ~ 1 second. I've also done > other bandwidth checks for downloads from other external hosts - no speed > problems there. > > Tried this w/Spark 1.1.0 and 1.1.1. > > When I do a thread dump on a worker, I typically see this a lot: > > > > "Executor task launch worker-7" daemon prio=10 tid=0x00007fd174039000 > nid=0x59e9 runnable [0x00007fd1f7dfb000] > java.lang.Thread.State: RUNNABLE > at java.net.SocketInputStream.socketRead0(Native Method) > at java.net.SocketInputStream.read(SocketInputStream.java:152) > at java.net.SocketInputStream.read(SocketInputStream.java:122) > at sun.security.ssl.InputRecord.readFully(InputRecord.java:442) > at sun.security.ssl.InputRecord.read(InputRecord.java:480) > at sun.security.ssl.SSLSocketImpl.readRecord(SSLSocketImpl.java:927) > - locked <0x00000007e44dd140> (a java.lang.Object) > at sun.security.ssl.SSLSocketImpl.readDataRecord(SSLSocketImpl.java:884) > at sun.security.ssl.AppInputStream.read(AppInputStream.java:102) > - locked <0x00000007e44e1350> (a sun.security.ssl.AppInputStream) > at java.io.BufferedInputStream.fill(BufferedInputStream.java:235) > at java.io.BufferedInputStream.read(BufferedInputStream.java:254) > - locked <0x00000007e44ea800> (a java.io.BufferedInputStream) > at org.apache.commons.httpclient.HttpParser.readRawLine(HttpParser.java:78) > at org.apache.commons.httpclient.HttpParser.readLine(HttpParser.java:106) > at > org.apache.commons.httpclient.HttpConnection.readLine(HttpConnection.java:1116) > at > org.apache.commons.httpclient.MultiThreadedHttpConnectionManager$HttpConnectionAdapter.readLine(MultiThreadedHttpConnectionManager.java:1413) > at > org.apache.commons.httpclient.HttpMethodBase.readStatusLine(HttpMethodBase.java:1973) > at > org.apache.commons.httpclient.HttpMethodBase.readResponse(HttpMethodBase.java:1735) > at > org.apache.commons.httpclient.HttpMethodBase.execute(HttpMethodBase.java:1098) > at > org.apache.commons.httpclient.HttpMethodDirector.executeWithRetry(HttpMethodDirector.java:398) > at > org.apache.commons.httpclient.HttpMethodDirector.executeMethod(HttpMethodDirector.java:171) > at > org.apache.commons.httpclient.HttpClient.executeMethod(HttpClient.java:397) > at > org.apache.commons.httpclient.HttpClient.executeMethod(HttpClient.java:323) > at > org.jets3t.service.impl.rest.httpclient.RestS3Service.performRequest(RestS3Service.java:342) > at > org.jets3t.service.impl.rest.httpclient.RestS3Service.performRestHead(RestS3Service.java:718) > at > org.jets3t.service.impl.rest.httpclient.RestS3Service.getObjectImpl(RestS3Service.java:1599) > at > org.jets3t.service.impl.rest.httpclient.RestS3Service.getObjectDetailsImpl(RestS3Service.java:1535) > at org.jets3t.service.S3Service.getObjectDetails(S3Service.java:1987) > at org.jets3t.service.S3Service.getObjectDetails(S3Service.java:1332) > at > org.apache.hadoop.fs.s3native.Jets3tNativeFileSystemStore.retrieveMetadata(Jets3tNativeFileSystemStore.java:111) > at sun.reflect.GeneratedMethodAccessor15.invoke(Unknown Source) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:606) > at > org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:82) > at > org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:59) > at org.apache.hadoop.fs.s3native.$Proxy6.retrieveMetadata(Unknown Source) > at > org.apache.hadoop.fs.s3native.NativeS3FileSystem.getFileStatus(NativeS3FileSystem.java:330) > at > org.apache.hadoop.fs.s3native.NativeS3FileSystem.mkdir(NativeS3FileSystem.java:432) > at > org.apache.hadoop.fs.s3native.NativeS3FileSystem.mkdirs(NativeS3FileSystem.java:425) > at org.apache.hadoop.fs.FileSystem.mkdirs(FileSystem.java:1126) > at > org.apache.hadoop.mapred.FileOutputCommitter.getWorkPath(FileOutputCommitter.java:256) > at > org.apache.hadoop.mapred.FileOutputFormat.getTaskOutputPath(FileOutputFormat.java:244) > at > org.apache.hadoop.mapred.TextOutputFormat.getRecordWriter(TextOutputFormat.java:126) > at > org.apache.hadoop.mapred.lib.MultipleTextOutputFormat.getBaseRecordWriter(MultipleTextOutputFormat.java:44) > at > org.apache.hadoop.mapred.lib.MultipleOutputFormat$1.write(MultipleOutputFormat.java:99) > at org.apache.spark.SparkHadoopWriter.write(SparkHadoopWriter.scala:94) > at > org.apache.spark.rdd.PairRDDFunctions$$anonfun$13.apply(PairRDDFunctions.scala:986) > at > org.apache.spark.rdd.PairRDDFunctions$$anonfun$13.apply(PairRDDFunctions.scala:974) > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62) > at org.apache.spark.scheduler.Task.run(Task.scala:54) > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) > at java.lang.Thread.run(Thread.java:745) > > > > > Here's my pseudo code: > > Launch cluster: > ./spark-ec2 -k x -i x --wait=480 -m m3.xlarge -t m3.xlarge -s 2 > --spot-price=.1 -r eu-west-1 --master-opts=-Dspark.eventLog.enabled=true > launch spark-ec2 > > > Run job (from the master): > ~/spark/bin/spark-submit --class com.MyClass --master > spark://ec2-xx-xx-xx-xx:7077 --deploy-mode client --driver-memory 6G > --executor-memory 2G --conf spark.eventLog.enabled=true ~/my.jar --in-path > s3n://bucket/path/prefix-* --out-path s3n://bucket/outpath > > > Job (Java pseudo): > > rdd= ctx.textFile(cmd.inPath) > .map(return parser.parse(line)) > .filter(return targetDate.equals(logLine.timestamp)) > .keyBy(return p.partitionKeyFor(logLine)) > > > FileOutputFormat.setCompressOutput(jobConf, true); > FileOutputFormat.setOutputCompressorClass(jobConf, > GzipCodec.class); > > > rdd.saveAsHadoopDataset(jobConf); > > > Unfortunately, I haven't been able to get debugging turned up - I'm using > slf4j/logback w/the commons-logging and log4j bridges. Any pointers for > getting that turned up to DEBUG would be helpful too. > > > I've tried everything I can think of and am at my wit's end - any > troubleshooting suggestions would be greatly appreciated! > > >