Also, could this have to do with the fact that there is a "/" in the path to the S3 resource? Thanks! Ognen
On Sat, Jan 18, 2014 at 1:49 PM, Ognen Duzlevski <[email protected]>wrote: > I am trying to run a simple job on a 3-machine Spark cluster. All three > machines are Amazon instances within the VPC (xlarge size instances). All I > am doing is reading a (rather large - about 20GB) file from an S3 bucket > and doing some basic filtering on each line. > > Here I start the spark shell: > > sparkuser@spark-master:~$ MASTER=spark://10.10.0.200:7077 spark-shell > SLF4J: Class path contains multiple SLF4J bindings. > SLF4J: Found binding in > [jar:file:/home/sparkuser/spark/tools/target/scala-2.9.3/spark-tools-assembly-0.8.1-incubating.jar!/org/slf4j/impl/StaticLoggerBinder.class] > SLF4J: Found binding in > [jar:file:/home/sparkuser/spark/assembly/target/scala-2.9.3/spark-assembly-0.8.1-incubating-hadoop1.0.4.jar!/org/slf4j/impl/StaticLoggerBinder.class] > SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an > explanation. > SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory] > Welcome to > ____ __ > / __/__ ___ _____/ /__ > _\ \/ _ \/ _ `/ __/ '_/ > /___/ .__/\_,_/_/ /_/\_\ version 0.8.1 > /_/ > > Using Scala version 2.9.3 (OpenJDK 64-Bit Server VM, Java 1.6.0_27) > Initializing interpreter... > 14/01/18 13:29:53 WARN Utils: Your hostname, spark-master resolves to a > loopback address: 127.0.0.1; using 10.10.0.200 instead (on interface eth0) > 14/01/18 13:29:53 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to > another address > Creating SparkContext... > Spark context available as sc. > Type in expressions to have them evaluated. > Type :help for more information. > > Here is relevant stuff: > > scala> val f = > sc.textFile("s3n://data-pipeline/large_data/2013-11-30.json") > f: org.apache.spark.rdd.RDD[String] = MappedRDD[1] at textFile at > <console>:12 > > scala> val events = > f.filter(_.split(",")(0).split(":")(1).replace("\"","") == "Sign > Up").map(line => > (line.split(",")(2).split(":")(1).replace("\"",""),0)).cache > events: org.apache.spark.rdd.RDD[(java.lang.String, Int)] = MappedRDD[3] > at map at <console>:14 > > scala> events.count > 14/01/18 13:30:13 WARN NativeCodeLoader: Unable to load native-hadoop > library for your platform... using builtin-java classes where applicable > 14/01/18 13:30:13 WARN LoadSnappy: Snappy native library not loaded > > Here 5 minutes later the failure happens (see below). A few questions: 1. > in order for spark to talk to S3 - do I actually have to have a hadoop > installation somewhere or does Spark do all the talking (once provided with > the AWS credentials) and 2. Is there something I am missing in terms of > open ports, pings and other checks that I may be banning through the > instance security group mask? Thanks! Ognen > > 14/01/18 13:35:12 ERROR Client$ClientActor: Master removed our > application: FAILED; stopping client > 14/01/18 13:35:12 WARN SparkDeploySchedulerBackend: Disconnected from > Spark cluster! Waiting for reconnection... > 14/01/18 13:35:12 ERROR ClusterScheduler: Lost an executor 1 (already > removed): remote Akka client shutdown > 14/01/18 13:36:01 WARN BlockManagerMasterActor: Removing BlockManager > BlockManagerId(1, 10.10.0.200, 33179, 0) with no recent heart beats: > 56653ms exceeds 45000ms > 14/01/18 13:36:14 WARN RestUtils: Retried connection 6 times, which > exceeds the maximum retry count of 5 > org.apache.commons.httpclient.ConnectTimeoutException: The host did not > accept the connection within timeout of 60000 ms > at > org.apache.commons.httpclient.protocol.ReflectionSocketFactory.createSocket(ReflectionSocketFactory.java:155) > at > org.apache.commons.httpclient.protocol.SSLProtocolSocketFactory.createSocket(SSLProtocolSocketFactory.java:130) > at > org.apache.commons.httpclient.HttpConnection.open(HttpConnection.java:707) > at > org.apache.commons.httpclient.MultiThreadedHttpConnectionManager$HttpConnectionAdapter.open(MultiThreadedHttpConnectionManager.java:1361) > at > org.apache.commons.httpclient.HttpMethodDirector.executeWithRetry(HttpMethodDirector.java:387) > at > org.apache.commons.httpclient.HttpMethodDirector.executeMethod(HttpMethodDirector.java:171) > at > org.apache.commons.httpclient.HttpClient.executeMethod(HttpClient.java:397) > at > org.apache.commons.httpclient.HttpClient.executeMethod(HttpClient.java:323) > at > org.jets3t.service.impl.rest.httpclient.RestS3Service.performRequest(RestS3Service.java:342) > at > org.jets3t.service.impl.rest.httpclient.RestS3Service.performRestHead(RestS3Service.java:718) > at > org.jets3t.service.impl.rest.httpclient.RestS3Service.getObjectImpl(RestS3Service.java:1599) > at > org.jets3t.service.impl.rest.httpclient.RestS3Service.getObjectDetailsImpl(RestS3Service.java:1535) > at > org.jets3t.service.S3Service.getObjectDetails(S3Service.java:1987) > at > org.jets3t.service.S3Service.getObjectDetails(S3Service.java:1332) > at > org.apache.hadoop.fs.s3native.Jets3tNativeFileSystemStore.retrieveMetadata(Jets3tNativeFileSystemStore.java:111) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:622) > at > org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:82) > at > org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:59) > at org.apache.hadoop.fs.s3native.$Proxy8.retrieveMetadata(Unknown > Source) > at > org.apache.hadoop.fs.s3native.NativeS3FileSystem.getFileStatus(NativeS3FileSystem.java:326) > at > org.apache.hadoop.fs.FileSystem.getFileStatus(FileSystem.java:1337) > at > org.apache.hadoop.fs.FileSystem.globStatusInternal(FileSystem.java:1045) > at org.apache.hadoop.fs.FileSystem.globStatus(FileSystem.java:987) > at > org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:177) > at > org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:208) > at > org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:141) > at > org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:201) > at > org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:199) > at scala.Option.getOrElse(Option.scala:108) > at org.apache.spark.rdd.RDD.partitions(RDD.scala:199) > at org.apache.spark.rdd.MappedRDD.getPartitions(MappedRDD.scala:26) > at > org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:201) > at > org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:199) > at scala.Option.getOrElse(Option.scala:108) > at org.apache.spark.rdd.RDD.partitions(RDD.scala:199) > at > org.apache.spark.rdd.FilteredRDD.getPartitions(FilteredRDD.scala:27) > at > org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:201) > at > org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:199) > at scala.Option.getOrElse(Option.scala:108) > at org.apache.spark.rdd.RDD.partitions(RDD.scala:199) > at org.apache.spark.rdd.FilteredRDD.getPartitions(FilteredRDD.scala:27) > at > org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:201) > at > org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:199) > at scala.Option.getOrElse(Option.scala:108) > at org.apache.spark.rdd.RDD.partitions(RDD.scala:199) > at org.apache.spark.rdd.MappedRDD.getPartitions(MappedRDD.scala:26) > at > org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:201) > at > org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:199) > at scala.Option.getOrElse(Option.scala:108) > at org.apache.spark.rdd.RDD.partitions(RDD.scala:199) > at org.apache.spark.SparkContext.runJob(SparkContext.scala:886) > at org.apache.spark.rdd.RDD.count(RDD.scala:698) > at <init>(<console>:17) > at <init>(<console>:22) > at <init>(<console>:24) > at <init>(<console>:26) > at <init>(<console>:28) > at .<init>(<console>:32) > at .<clinit>(<console>) > at .<init>(<console>:11) > at .<clinit>(<console>) > at $export(<console>) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:622) > at > org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:629) > at > org.apache.spark.repl.SparkIMain$Request$$anonfun$10.apply(SparkIMain.scala:897) > at > scala.tools.nsc.interpreter.Line$$anonfun$1.apply$mcV$sp(Line.scala:43) > at scala.tools.nsc.io.package$$anon$2.run(package.scala:25) > at java.lang.Thread.run(Thread.java:701) > Caused by: java.net.SocketTimeoutException: connect timed out > at java.net.PlainSocketImpl.socketConnect(Native Method) > at > java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:327) > at > java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:193) > at > java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:180) > at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:385) > at java.net.Socket.connect(Socket.java:546) > at sun.security.ssl.SSLSocketImpl.connect(SSLSocketImpl.java:590) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:622) > at > org.apache.commons.httpclient.protocol.ReflectionSocketFactory.createSocket(ReflectionSocketFactory.java:140) > ... 68 more > >
