Re: spark streaming actor receiver doesn't play well with kryoserializer
This looks like a bug to me. This happens because we serialize the code that starts the receiver and send it across. And since we have not registered the classes of akka library it does not work. I have not tried myself, but may be by including something like chill-akka ( https://github.com/xitrum-framework/chill-akka) might help. I am not well aware about how kryo works internally, may be someone else can throw some light on this. Prashant Sharma On Sat, Jul 26, 2014 at 6:26 AM, Alan Ngai a...@opsclarity.com wrote: The stack trace was from running the Actor count sample directly, without a spark cluster, so I guess the logs would be from both? I enabled more logging and got this stack trace 4/07/25 17:55:26 [INFO] SecurityManager: Changing view acls to: alan 14/07/25 17:55:26 [INFO] SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(alan) 14/07/25 17:55:26 [DEBUG] AkkaUtils: In createActorSystem, requireCookie is: off 14/07/25 17:55:26 [INFO] Slf4jLogger: Slf4jLogger started 14/07/25 17:55:27 [INFO] Remoting: Starting remoting 14/07/25 17:55:27 [INFO] Remoting: Remoting started; listening on addresses :[akka.tcp://spark@leungshwingchun:52156] 14/07/25 17:55:27 [INFO] Remoting: Remoting now listens on addresses: [ akka.tcp://spark@leungshwingchun:52156] 14/07/25 17:55:27 [INFO] SparkEnv: Registering MapOutputTracker 14/07/25 17:55:27 [INFO] SparkEnv: Registering BlockManagerMaster 14/07/25 17:55:27 [DEBUG] DiskBlockManager: Creating local directories at root dirs '/var/folders/fq/fzcyqkcx7rgbycg4kr8z3m18gn/T/' 14/07/25 17:55:27 [INFO] DiskBlockManager: Created local directory at /var/folders/fq/fzcyqkcx7rgbycg4kr8z3m18gn/T/spark-local-20140725175527-32f2 14/07/25 17:55:27 [INFO] MemoryStore: MemoryStore started with capacity 297.0 MB. 14/07/25 17:55:27 [INFO] ConnectionManager: Bound socket to port 52157 with id = ConnectionManagerId(leungshwingchun,52157) 14/07/25 17:55:27 [INFO] BlockManagerMaster: Trying to register BlockManager 14/07/25 17:55:27 [INFO] BlockManagerInfo: Registering block manager leungshwingchun:52157 with 297.0 MB RAM 14/07/25 17:55:27 [INFO] BlockManagerMaster: Registered BlockManager 14/07/25 17:55:27 [INFO] HttpServer: Starting HTTP Server 14/07/25 17:55:27 [DEBUG] HttpServer: HttpServer is not using security 14/07/25 17:55:27 [INFO] HttpBroadcast: Broadcast server started at http://192.168.1.233:52158 14/07/25 17:55:27 [INFO] HttpFileServer: HTTP File server directory is /var/folders/fq/fzcyqkcx7rgbycg4kr8z3m18gn/T/spark-5254ba11-037b-4761-b92a-3b18d42762de 14/07/25 17:55:27 [INFO] HttpServer: Starting HTTP Server 14/07/25 17:55:27 [DEBUG] HttpServer: HttpServer is not using security 14/07/25 17:55:27 [DEBUG] HttpFileServer: HTTP file server started at: http://192.168.1.233:52159 14/07/25 17:55:27 [INFO] SparkUI: Started SparkUI at http://leungshwingchun:4040 14/07/25 17:55:27 [DEBUG] MutableMetricsFactory: field org.apache.hadoop.metrics2.lib.MutableRate org.apache.hadoop.security.UserGroupInformation$UgiMetrics.loginSuccess with annotation @org.apache.hadoop.metrics2.annotation.Metric(valueName=Time, about=, value=[Rate of successful kerberos logins and latency (milliseconds)], always=false, type=DEFAULT, sampleName=Ops) 14/07/25 17:55:27 [DEBUG] MutableMetricsFactory: field org.apache.hadoop.metrics2.lib.MutableRate org.apache.hadoop.security.UserGroupInformation$UgiMetrics.loginFailure with annotation @org.apache.hadoop.metrics2.annotation.Metric(valueName=Time, about=, value=[Rate of failed kerberos logins and latency (milliseconds)], always=false, type=DEFAULT, sampleName=Ops) 14/07/25 17:55:27 [DEBUG] MetricsSystemImpl: UgiMetrics, User and group related metrics 2014-07-25 17:55:27.796 java[79107:1703] Unable to load realm info from SCDynamicStore 14/07/25 17:55:27 [DEBUG] KerberosName: Kerberos krb5 configuration not found, setting default realm to empty 14/07/25 17:55:27 [DEBUG] Groups: Creating new Groups object 14/07/25 17:55:27 [DEBUG] NativeCodeLoader: Trying to load the custom-built native-hadoop library... 14/07/25 17:55:27 [DEBUG] NativeCodeLoader: Failed to load native-hadoop with error: java.lang.UnsatisfiedLinkError: no hadoop in java.library.path 14/07/25 17:55:27 [DEBUG] NativeCodeLoader: java.library.path= 14/07/25 17:55:27 [WARN] NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 14/07/25 17:55:27 [DEBUG] JniBasedUnixGroupsMappingWithFallback: Falling back to shell based 14/07/25 17:55:27 [DEBUG] JniBasedUnixGroupsMappingWithFallback: Group mapping impl=org.apache.hadoop.security.ShellBasedUnixGroupsMapping 14/07/25 17:55:27 [DEBUG] Groups: Group mapping impl=org.apache.hadoop.security.JniBasedUnixGroupsMappingWithFallback; cacheTimeout=30 14/07/25 17:55:28 [INFO] SparkContext: Added JAR
java.util.concurrent.TimeoutException: Futures timed out after [30 seconds]
Hi All, The data size of my task is about 30mb. It runs smoothly in local mode. However, when I submit it to the cluster, it throws the titled error (Please see below for the complete output). Actually, my output is almost the same with http://stackoverflow.com/questions/24080891/spark-program-hangs-at-job-finished-toarray-workers-throw-java-util-concurren. I also toArray my data, which was the reason of his case. However, how come it runs OK in local but not in the cluster? The memory of each worker is over 60g, and my run command is: $SPARK_HOME/bin/spark-class org.apache.spark.deploy.Client launch spark://10.196.135.101:7077 $jar_path $programname -Dspark.master=spark://10.196.135.101:7077 -Dspark.cores.max=300 -Dspark.executor.memory=20g -spark.jars=$jar_path -Dspark.default.parallelism=100 -Dspark.hadoop.hadoop.job.ugi=$username,$groupname -Dspark.app.name=$appname $in_path $scala_out_path Looking for help and thanks a lot! Below please find the complete output: 14/07/31 15:06:53 WARN Configuration: DEPRECATED: hadoop-site.xml found in the classpath. Usage of hadoop-site.xml is deprecated. Instead use core-site.xml, mapred-site.xml and hdfs-site.xml to override properties of core-default.xml, mapred-default.xml and hdfs-default.xml respectively 14/07/31 15:06:53 INFO SecurityManager: Changing view acls to: spark 14/07/31 15:06:53 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(spark) 14/07/31 15:06:53 INFO Slf4jLogger: Slf4jLogger started 14/07/31 15:06:53 INFO Remoting: Starting remoting 14/07/31 15:06:54 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkExecutor@tdw-10-215-140-22:39446] 14/07/31 15:06:54 INFO Remoting: Remoting now listens on addresses: [akka.tcp://sparkExecutor@tdw-10-215-140-22:39446] 14/07/31 15:06:54 INFO CoarseGrainedExecutorBackend: Connecting to driver: akka.tcp://spark@tdw-10-196-135-106:38502/user/CoarseGrainedScheduler 14/07/31 15:06:54 INFO WorkerWatcher: Connecting to worker akka.tcp://sparkWorker@tdw-10-215-140-22:34755/user/Worker 14/07/31 15:06:54 INFO WorkerWatcher: Successfully connected to akka.tcp://sparkWorker@tdw-10-215-140-22:34755/user/Worker 14/07/31 15:06:56 INFO CoarseGrainedExecutorBackend: Successfully registered with driver 14/07/31 15:06:56 INFO SecurityManager: Changing view acls to: spark 14/07/31 15:06:56 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(spark) 14/07/31 15:06:56 INFO Slf4jLogger: Slf4jLogger started 14/07/31 15:06:56 INFO Remoting: Starting remoting 14/07/31 15:06:56 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://spark@tdw-10-215-140-22:56708] 14/07/31 15:06:56 INFO Remoting: Remoting now listens on addresses: [akka.tcp://spark@tdw-10-215-140-22:56708] 14/07/31 15:06:56 INFO SparkEnv: Connecting to MapOutputTracker: akka.tcp://spark@tdw-10-196-135-106:38502/user/MapOutputTracker 14/07/31 15:06:58 INFO SparkEnv: Connecting to BlockManagerMaster: akka.tcp://spark@tdw-10-196-135-106:38502/user/BlockManagerMaster 14/07/31 15:06:59 INFO DiskBlockManager: Created local directory at /data1/sparkenv/local/spark-local-20140731150659-3f12 14/07/31 15:06:59 INFO DiskBlockManager: Created local directory at /data2/sparkenv/local/spark-local-20140731150659-1602 14/07/31 15:06:59 INFO DiskBlockManager: Created local directory at /data3/sparkenv/local/spark-local-20140731150659-d213 14/07/31 15:06:59 INFO DiskBlockManager: Created local directory at /data4/sparkenv/local/spark-local-20140731150659-f42e 14/07/31 15:06:59 INFO DiskBlockManager: Created local directory at /data5/sparkenv/local/spark-local-20140731150659-63d0 14/07/31 15:06:59 INFO DiskBlockManager: Created local directory at /data6/sparkenv/local/spark-local-20140731150659-9003 14/07/31 15:06:59 INFO DiskBlockManager: Created local directory at /data7/sparkenv/local/spark-local-20140731150659-f260 14/07/31 15:06:59 INFO DiskBlockManager: Created local directory at /data8/sparkenv/local/spark-local-20140731150659-6334 14/07/31 15:06:59 INFO DiskBlockManager: Created local directory at /data9/sparkenv/local/spark-local-20140731150659-3af4 14/07/31 15:06:59 INFO DiskBlockManager: Created local directory at /data10/sparkenv/local/spark-local-20140731150659-133d 14/07/31 15:06:59 INFO DiskBlockManager: Created local directory at /data11/sparkenv/local/spark-local-20140731150659-ed08 14/07/31 15:06:59 INFO MemoryStore: MemoryStore started with capacity 11.5 GB. 14/07/31 15:06:59 INFO ConnectionManager: Bound socket to port 35127 with id = ConnectionManagerId(tdw-10-215-140-22,35127) 14/07/31 15:06:59 INFO BlockManagerMaster: Trying to register BlockManager 14/07/31 15:07:00 INFO BlockManagerMaster: Registered BlockManager 14/07/31 15:07:00 INFO HttpFileServer: HTTP File server directory is /tmp/spark-0914d215-dd22-4d5e-9ec0-724937dbfd8b 14/07/31 15:07:00 INFO
Re: Ports required for running spark
Hi Haiyang, you are right, YARN takes over the resource management, bot I constantly got Exception ConnectionRefused on mentioned port. So, I suppose some spark internal communications are done via this port... but I don't know what exactly and how can I change it... Thank you, Konstantin Kudryavtsev On Thu, Jul 31, 2014 at 2:53 PM, Haiyang Fu haiyangfu...@gmail.com wrote: Hi Konstantin, Would you please post some more details? Error info or exception from the log on what situation?when you run spark job on yarn cluster mode ,yarn will take over all the resource management. On Thu, Jul 31, 2014 at 6:17 PM, Konstantin Kudryavtsev kudryavtsev.konstan...@gmail.com wrote: Hi Larry, I'm afraid this is standalone mode, I'm interesting in YARN Also, I don't see port-in-trouble 33007 which i believe related to Akka Thank you, Konstantin Kudryavtsev On Thu, Jul 31, 2014 at 1:11 PM, Larry Xiao xia...@sjtu.edu.cn wrote: Hi Konstantin, I think you can find it at https://spark.apache.org/docs/latest/spark-standalone.html#configuring-ports-for-network-security and you can specify port for master or worker at conf/spark-env.sh Larry On 7/31/14, 6:04 PM, Konstantin Kudryavtsev wrote: Hi there, I'm trying to run Spark on YARN cluster and face with issued that some ports are closed, particularly port 33007 (I suppose it's used by Akka) Could you please provide me with a list of all ports required for Spark? Also, is it possible to set up these ports? Thank you in advance, Konstantin Kudryavtsev
configuration needed to run twitter(25GB) dataset
We have a 6-nodes cluster , each node has 64GB memory. here is the command: ./bin/spark-submit --class org.apache.spark.examples.graphx.LiveJournalPageRank examples/target/scala-2.10/spark-examples-1.0.1-hadoop1.0.4.jar hdfs://dataset/twitter --tol=0.01 --numEPart=144 --numIter=10 But it ran out of memory. I also try 2D and 1D partition. And I also try Giraph under the same configuration, and it runs for 10 iterations , and then it ran out of memory as well. Actually I don't know whether the command is right. Should the numEPart equal to the number of nodes or number of nodes*cores? I think if numEPart is smaller, it will require less memory, just like the powergraph. Thanks in advance!
Re: Ports required for running spark
Hi Konstantin, Could you please post your first container's stderr log here which is always the AM log?As far as I know, ports except 8020,8080,8081,50070,50071 are all random socket ports determined by each job. So 33007 maybe just a temporary port for data transferation. The deeper reason for 33007 ConnectionRefused maybe something else.would you please check If it is always 33007 when you submit your job again ? haiy On Thu, Jul 31, 2014 at 8:17 PM, Konstantin Kudryavtsev kudryavtsev.konstan...@gmail.com wrote: Hi Haiyang, you are right, YARN takes over the resource management, bot I constantly got Exception ConnectionRefused on mentioned port. So, I suppose some spark internal communications are done via this port... but I don't know what exactly and how can I change it... Thank you, Konstantin Kudryavtsev On Thu, Jul 31, 2014 at 2:53 PM, Haiyang Fu haiyangfu...@gmail.com wrote: Hi Konstantin, Would you please post some more details? Error info or exception from the log on what situation?when you run spark job on yarn cluster mode ,yarn will take over all the resource management. On Thu, Jul 31, 2014 at 6:17 PM, Konstantin Kudryavtsev kudryavtsev.konstan...@gmail.com wrote: Hi Larry, I'm afraid this is standalone mode, I'm interesting in YARN Also, I don't see port-in-trouble 33007 which i believe related to Akka Thank you, Konstantin Kudryavtsev On Thu, Jul 31, 2014 at 1:11 PM, Larry Xiao xia...@sjtu.edu.cn wrote: Hi Konstantin, I think you can find it at https://spark.apache.org/docs/latest/spark-standalone.html#configuring-ports-for-network-security and you can specify port for master or worker at conf/spark-env.sh Larry On 7/31/14, 6:04 PM, Konstantin Kudryavtsev wrote: Hi there, I'm trying to run Spark on YARN cluster and face with issued that some ports are closed, particularly port 33007 (I suppose it's used by Akka) Could you please provide me with a list of all ports required for Spark? Also, is it possible to set up these ports? Thank you in advance, Konstantin Kudryavtsev
How to share a NonSerializable variable among tasks in the same worker node?
As shown here: 2 - Why Is My Spark Job so Slow and Only Using a Single Thread? http://engineering.sharethrough.com/blog/2013/09/13/top-3-troubleshooting-tips-to-keep-you-sparking/ 123456789101112131415 object JSONParser { def parse(raw: String): String = ...}object MyFirstSparkJob { def main(args: Array[String]) {val sc = new SparkContext()val lines = sc.textFileStream(beacons.txt) lines.map(line = JSONParser.parse(line))lines.foreach(line = println(line))ssc.start() }} It says parser instance is now a singleton created in the scope of our driver program which I thought was in the scope of executor. Am I wrong, or why? What if the parser is not serializable, and I want to share it among tasks in the same worker node?
java.lang.OutOfMemoryError: Java heap space
Hi everyone,I have the following configuration. I am currently running my app in local mode. val conf = new SparkConf().setMaster(local[2]).setAppName(ApproxStrMatch).set(spark.executor.memory, 3g).set(spark.storage.memoryFraction, 0.1) I am getting the following error. I tried setting up spark.executor.memory and memory fraction setting, however my UI does not show the increase and I still get these errors. I am loading a TSV file from HDFS (around 5 GB). Does this mean, I should update these settings and add more memory or is it somethign else? Spark master has 24 GB physical memory and workers have 16 GB, but we are running other services (CDH 5.1) on these nodes as well. 14/07/31 09:48:09 INFO BlockFetcherIterator$BasicBlockFetcherIterator: Getting 2 non-empty blocks out of 2 blocks14/07/31 09:48:09 INFO BlockFetcherIterator$BasicBlockFetcherIterator: Getting 2 non-empty blocks out of 2 blocks14/07/31 09:48:09 INFO BlockFetcherIterator$BasicBlockFetcherIterator: Started 0 remote fetches in 6 ms14/07/31 09:48:09 INFO BlockFetcherIterator$BasicBlockFetcherIterator: Started 0 remote fetches in 6 ms14/07/31 09:48:09 INFO BlockFetcherIterator$BasicBlockFetcherIterator: maxBytesInFlight: 50331648, targetRequestSize: 1006632914/07/31 09:48:09 INFO BlockFetcherIterator$BasicBlockFetcherIterator: maxBytesInFlight: 50331648, targetRequestSize: 1006632914/07/31 09:48:09 INFO BlockFetcherIterator$BasicBlockFetcherIterator: Getting 2 non-empty blocks out of 2 blocks14/07/31 09:48:09 INFO BlockFetcherIterator$BasicBlockFetcherIterator: Getting 2 non-empty blocks out of 2 blocks14/07/31 09:48:09 INFO BlockFetcherIterator$BasicBlockFetcherIterator: Started 0 remote fetches in 1 ms14/07/31 09:48:09 INFO BlockFetcherIterator$BasicBlockFetcherIterator: Started 0 remote fetches in 1 ms14/07/31 09:48:17 ERROR Executor: Exception in task ID 5java.lang.OutOfMemoryError: Java heap space at java.util.Arrays.copyOf(Arrays.java:2271)at java.io.ByteArrayOutputStream.toByteArray(ByteArrayOutputStream.java:178)at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:73) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:197)at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:744)14/07/31 09:48:17 ERROR ExecutorUncaughtExceptionHandler: Uncaught exception in thread Thread[Executor task launch worker-3,5,main]java.lang.OutOfMemoryError: Java heap space at java.util.Arrays.copyOf(Arrays.java:2271)at java.io.ByteArrayOutputStream.toByteArray(ByteArrayOutputStream.java:178)at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:73) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:197)at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:744)14/07/31 09:48:17 WARN TaskSetManager: Lost TID 5 (task 1.0:0)14/07/31 09:48:17 WARN TaskSetManager: Loss was due to java.lang.OutOfMemoryErrorjava.lang.OutOfMemoryError: Java heap space at java.util.Arrays.copyOf(Arrays.java:2271)at java.io.ByteArrayOutputStream.toByteArray(ByteArrayOutputStream.java:178)at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:73) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:197)at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:744)14/07/31 09:48:17 ERROR TaskSetManager: Task 1.0:0 failed 1 times; aborting job14/07/31 09:48:17 INFO TaskSchedulerImpl: Cancelling stage 114/07/31 09:48:17 INFO DAGScheduler: Failed to run collect at ComputeScores.scala:7614/07/31 09:48:17 INFO Executor: Executor is trying to kill task 614/07/31 09:48:17 INFO TaskSchedulerImpl: Stage 1 was cancelled
Re: Shark/Spark running on EC2 can read from S3 bucket but cannot write to it - Wrong FS
I am running Spark 0.9.1 and Shark 0.9.1. Sorry I didn't include that. On Thu, Jul 31, 2014 at 9:50 AM, William Cox william@distilnetworks.com wrote: *The Shark-specific group appears to be in moderation pause, so I'm asking here.* I'm running Shark/Spark on EC2. I am using Shark to query data from a S3 bucket and then write the results back to a S3 bucket. The data is read fine, but when I write I get an error: 14/07/31 16:42:30 INFO scheduler.TaskSetManager: Loss was due to java.lang.IllegalArgumentException: Wrong FS: s3n://id:key@shadoop/tmp/hive-root/hive_2014-07-31_16-39-29_825_6436105804053790400/_tmp.-ext-1, expected: hdfs://ecmachine.compute-1.amazonaws.com:9000 [duplicate 3] Is there some setting that I change to allow it to write to a S3 file system? I've tried all sorts of different queries to write to S3. This particular one was: INSERT OVERWRITE DIRECTORY 's3n://id:key@shadoop/bucket' SELECT * FROM table; Thanks for your help! -William
Re: the EC2 setup script often will not allow me to SSH into my machines. Ideas?
Ah, thanks for the help! That worked great. On Wed, Jul 30, 2014 at 10:31 AM, Zongheng Yang zonghen...@gmail.com wrote: To add to this: for this many (= 20) machines I usually use at least --wait 600. On Wed, Jul 30, 2014 at 9:10 AM, Nicholas Chammas nicholas.cham...@gmail.com wrote: William, The error you are seeing is misleading. There is no need to terminate the cluster and start over. Just re-run your launch command, but with the additional --resume option tacked on the end. As Akhil explained, this happens because AWS is not starting up the instances as quickly as the script is expecting. You can increase the wait time to mitigate this problem. Nick On Wed, Jul 30, 2014 at 11:51 AM, Akhil Das ak...@sigmoidanalytics.com wrote: You need to increase the wait time, (-w) the default is 120 seconds, you may set it to a higher number like 300-400. The problem is that EC2 takes some time to initiate the machine (which is 120 seconds sometimes.) Thanks Best Regards On Wed, Jul 30, 2014 at 8:52 PM, William Cox william@distilnetworks.com wrote: TL;DR 50% of the time I can't SSH into either my master or slave nodes and have to terminate all the machines and restart the EC2 cluster setup process. Hello, I'm trying to setup a Spark cluster on Amazon EC2. I am finding the setup script to be delicate and unpredictable in terms of reliably allowing SSH logins to all of the slaves and the master. For instance (I'm running Spark 0.9.1-hadoop1. since I intend to use Shark. I call this command to provision a 32 slave cluster using spot instances: $./spark-ec2 --spot-price=0.1 --zone=us-east-1e -k key -i ~/key.pem -s 32 --instance-type=m1.medium launch cluster_name After waiting for the instances to provision I get the following output: All 32 slaves granted Launched master in us-east-1e, regid = r-f8444a89 Waiting for instances to start up... Waiting 120 more seconds... Generating cluster's SSH key on master... ssh: connect to host ecMASTER.compute-1.amazonaws.com port 22: Connection refused Error executing remote command, retrying after 30 seconds: Command '['ssh', '-o', 'StrictHostKeyChecking=no', '-i', '/Users/user/key.pem', '-t', '-t', u'r...@ecmaster.compute-1.amazonaws.com', \n [ -f ~/.ssh/id_rsa ] ||\n(ssh-keygen -q -t rsa -N '' -f ~/.ssh/id_rsa \n cat ~/.ssh/id_rsa.pub ~/.ssh/authorized_keys)\n]' returned non-zero exit status 255 I have removed the key and machine names with 'MASTER' and 'key'. I wait a few more cycles of the error message and finally, after 3 attempts, the script quits with this message: ssh: connect to host ecMASTER.compute-1.amazonaws.com port 22: Connection refused Error: Failed to SSH to remote host ecMASTER.compute-1.amazonaws.com. Please check that you have provided the correct --identity-file and --key-pair parameters and try again. So, YES, the .pem file is correct - I am currently running a smaller cluster and can provision other machines on EC2 and use that file. Secondly, the node it can't seem to connect to is the MASTER node. I have also gone into the EC2 console and verified that all the machines are using the key that corresponds to key.pem. I have tried this command 2x and on a friends machine with no success. However, I was able to provision a 15 machine cluster using m1.larges. Now I PAUSE for some period of time - 2-3 minutes (to write this email) - and I call the same command with the --resume flag. This time it logs into the master node just fine and begins to give the slaves SSH keys, and it fails on a certain slave. ssh: connect to host ec2-54-237-6-95.compute-1.amazonaws.com port 22: Connection refused Error 255 while executing remote command, retrying after 30 seconds ssh: connect to host ec2-54-237-6-95.compute-1.amazonaws.com port 22: Connection refused Error 255 while executing remote command, retrying after 30 seconds ssh: connect to host ec2-54-237-6-95.compute-1.amazonaws.com port 22: Connection refused Error 255 while executing remote command, retrying after 30 seconds ssh: connect to host ec2-54-237-6-95.compute-1.amazonaws.com port 22: Connection refused Traceback (most recent call last): File ./spark_ec2.py, line 806, in module main() File ./spark_ec2.py, line 799, in main real_main() File ./spark_ec2.py, line 684, in real_main setup_cluster(conn, master_nodes, slave_nodes, opts, True) File ./spark_ec2.py, line 423, in setup_cluster ssh_write(slave.public_dns_name, opts, ['tar', 'x'], dot_ssh_tar) File ./spark_ec2.py, line 640, in ssh_write raise RuntimeError(ssh_write failed with error %s % proc.returncode) RuntimeError: ssh_write failed with error 255 So I log into the EC2 console and TERMINATE that specific machine, and re-resume. Now it
Inconsistent Spark SQL behavior when column names contain dots
I’m working with a dataset where each row is stored as a single-line flat JSON object. I want to leverage Spark SQL to run relational queries on this data. Many of the object keys in this dataset have dots in them, e.g.: { “key.number1”: “value1”, “key.number2”: “value2” … } I can successfully load the data as an RDD in Spark and construct a Spark SQL table using the jsonRDD function. If I print the schema of the table, I see that Spark SQL infers the full object key, dot included, as the column name: sqlTable.printSchema() root |-- key.number1: StringType |-- key.number2: StringType … However, when I try to use one of these column names in a query, it seems that the Spark SQL parser always assumes I’m trying to reference a nested attribute. The same thing happens when using HiveQL. If there’s a way to escape the dot in the column name, I haven’t found it: sqlContext.sql(“SELECT key.number1 FROM TABLE sql_table LIMIT 1”).first == Query Plan == org.apache.spark.sql.catalyst.errors.package$TreeNodeException: Unresolved attributes: ‘key.number1, tree Project [‘key.number1] … This is not a critical issue by any means— it’s simple enough to use map() to transform the dots to underscores after loading the JSON data as text. I just wanted to reach out to the community for some guidance as to whether or not this issue warrants a bug report. To me, this behavior seems to be inconsistent— you can create a table with column names containing dots, but AFAICT you cannot include such columns in a query. Also, I’d greatly appreciate it if anybody has any pointers as to where in the source I should be looking if I wanted to patch this issue in my local branch. I’ve taken a glance at some of the Spark SQL Catalyst code but I’m afraid I’m too much of a Scala novice to make much headway here. For reference, I’m using Spark 1.0.1. Thanks for your input. Adam
store spark streaming dstream in hdfs or cassandra
Hi, I was wondering what is the best way to store off dstreams in hdfs or casandra. Could somebody provide an example? Thanks, Ali -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/store-spark-streaming-dstream-in-hdfs-or-cassandra-tp11064.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Inconsistent Spark SQL behavior when column names contain dots
I still see the same “Unresolved attributes” error when using hql + backticks. Here’s a code snippet that replicates this behavior: val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc) val sampleRDD = sc.parallelize(Array({key.one: value1, key.two: value2})) val sampleTable = hiveContext.jsonRDD(sampleRDD) sampleTable.registerAsTable(sample_table) hiveContext.hql(SELECT `key.one` FROM sample_table) From: Michael Armbrust mich...@databricks.commailto:mich...@databricks.com Reply-To: user@spark.apache.orgmailto:user@spark.apache.org user@spark.apache.orgmailto:user@spark.apache.org Date: Thursday, July 31, 2014 at 11:20 AM To: user@spark.apache.orgmailto:user@spark.apache.org user@spark.apache.orgmailto:user@spark.apache.org Subject: Re: Inconsistent Spark SQL behavior when column names contain dots Ideally you'd use backticks to reference columns that contain weird characters. I don't believe this works in sql parser, but I'm curious if using the hql parser in HiveContext would work for you? If you wanted to add support for this in the sql parser I'd check out SqlParser.scala. Thought it is likely we will abandon that code in the next release for something more complete. On Thu, Jul 31, 2014 at 11:16 AM, Budde, Adam bu...@amazon.commailto:bu...@amazon.com wrote: I’m working with a dataset where each row is stored as a single-line flat JSON object. I want to leverage Spark SQL to run relational queries on this data. Many of the object keys in this dataset have dots in them, e.g.: { “key.number1”: “value1”, “key.number2”: “value2” … } I can successfully load the data as an RDD in Spark and construct a Spark SQL table using the jsonRDD function. If I print the schema of the table, I see that Spark SQL infers the full object key, dot included, as the column name: sqlTable.printSchema() root |-- key.number1: StringType |-- key.number2: StringType … However, when I try to use one of these column names in a query, it seems that the Spark SQL parser always assumes I’m trying to reference a nested attribute. The same thing happens when using HiveQL. If there’s a way to escape the dot in the column name, I haven’t found it: sqlContext.sql(“SELECT key.number1 FROM TABLE sql_table LIMIT 1”).first == Query Plan == org.apache.spark.sql.catalyst.errors.package$TreeNodeException: Unresolved attributes: ‘key.number1, tree Project [‘key.number1] … This is not a critical issue by any means— it’s simple enough to use map() to transform the dots to underscores after loading the JSON data as text. I just wanted to reach out to the community for some guidance as to whether or not this issue warrants a bug report. To me, this behavior seems to be inconsistent— you can create a table with column names containing dots, but AFAICT you cannot include such columns in a query. Also, I’d greatly appreciate it if anybody has any pointers as to where in the source I should be looking if I wanted to patch this issue in my local branch. I’ve taken a glance at some of the Spark SQL Catalyst code but I’m afraid I’m too much of a Scala novice to make much headway here. For reference, I’m using Spark 1.0.1. Thanks for your input. Adam
Re: Inconsistent Spark SQL behavior when column names contain dots
I have created https://issues.apache.org/jira/browse/SPARK-2775 to track it. On Thu, Jul 31, 2014 at 11:47 AM, Budde, Adam bu...@amazon.com wrote: I still see the same “Unresolved attributes” error when using hql + backticks. Here’s a code snippet that replicates this behavior: val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc) val sampleRDD = sc.parallelize(Array({key.one: value1, key.two: value2})) val sampleTable = hiveContext.jsonRDD(sampleRDD) sampleTable.registerAsTable(sample_table) hiveContext.hql(SELECT `key.one` FROM sample_table) From: Michael Armbrust mich...@databricks.com Reply-To: user@spark.apache.org user@spark.apache.org Date: Thursday, July 31, 2014 at 11:20 AM To: user@spark.apache.org user@spark.apache.org Subject: Re: Inconsistent Spark SQL behavior when column names contain dots Ideally you'd use backticks to reference columns that contain weird characters. I don't believe this works in sql parser, but I'm curious if using the hql parser in HiveContext would work for you? If you wanted to add support for this in the sql parser I'd check out SqlParser.scala. Thought it is likely we will abandon that code in the next release for something more complete. On Thu, Jul 31, 2014 at 11:16 AM, Budde, Adam bu...@amazon.com wrote: I’m working with a dataset where each row is stored as a single-line flat JSON object. I want to leverage Spark SQL to run relational queries on this data. Many of the object keys in this dataset have dots in them, e.g.: { “key.number1”: “value1”, “key.number2”: “value2” … } I can successfully load the data as an RDD in Spark and construct a Spark SQL table using the jsonRDD function. If I print the schema of the table, I see that Spark SQL infers the full object key, dot included, as the column name: sqlTable.printSchema() root |-- key.number1: StringType |-- key.number2: StringType … However, when I try to use one of these column names in a query, it seems that the Spark SQL parser always assumes I’m trying to reference a nested attribute. The same thing happens when using HiveQL. If there’s a way to escape the dot in the column name, I haven’t found it: sqlContext.sql(“SELECT key.number1 FROM TABLE sql_table LIMIT 1”).first == Query Plan == org.apache.spark.sql.catalyst.errors.package$TreeNodeException: Unresolved attributes: ‘key.number1, tree Project [‘key.number1] … This is not a critical issue by any means— it’s simple enough to use map() to transform the dots to underscores after loading the JSON data as text. I just wanted to reach out to the community for some guidance as to whether or not this issue warrants a bug report. To me, this behavior seems to be inconsistent— you can create a table with column names containing dots, but AFAICT you cannot include such columns in a query. Also, I’d greatly appreciate it if anybody has any pointers as to where in the source I should be looking if I wanted to patch this issue in my local branch. I’ve taken a glance at some of the Spark SQL Catalyst code but I’m afraid I’m too much of a Scala novice to make much headway here. For reference, I’m using Spark 1.0.1. Thanks for your input. Adam
Re: Number of partitions and Number of concurrent tasks
Ok, I set the number of spark worker instances to 2 (below is my startup command). But, this essentially had the effect of increasing my number of workers from 3 to 6 (which was good) but it also reduced my number of cores per worker from 8 to 4 (which was not so good). In the end, I would still only be able to concurrently process 24 partitions in parallel. I'm starting a stand-alone cluster using the spark provided ec2 scripts . I tried setting the env variable for SPARK_WORKER_CORES in the spark_ec2.py but this had no effect. So, it's not clear if I could even set the SPARK_WORKER_CORES with the ec2 scripts. Anyway, not sure if there is anything else I can try but at least wanted to document what I did try and the net effect. I'm open to any suggestions/advice. ./spark-ec2 -k key -i key.pem --hadoop-major-version=2 launch -s 3 -t m3.2xlarge -w 3600 --spot-price=.08 -z us-east-1e --worker-instances=2 my-cluster From: Daniel Siegmann daniel.siegm...@velos.io To: Darin McBeath ddmcbe...@yahoo.com Cc: Daniel Siegmann daniel.siegm...@velos.io; user@spark.apache.org user@spark.apache.org Sent: Thursday, July 31, 2014 10:04 AM Subject: Re: Number of partitions and Number of concurrent tasks I haven't configured this myself. I'd start with setting SPARK_WORKER_CORES to a higher value, since that's a bit simpler than adding more workers. This defaults to all available cores according to the documentation, so I'm not sure if you can actually set it higher. If not, you can get around this by adding more worker instances; I believe simply setting SPARK_WORKER_INSTANCES to 2 would be sufficient. I don't think you have to set the cores if you have more workers - it will default to 8 cores per worker (in your case). But maybe 16 cores per node will be too many. You'll have to test. Keep in mind that more workers means more memory and such too, so you may need to tweak some other settings downward in this case. On a side note: I've read some people found performance was better when they had more workers with less memory each, instead of a single worker with tons of memory, because it cut down on garbage collection time. But I can't speak to that myself. In any case, if you increase the number of cores available in your cluster (whether per worker, or adding more workers per node, or of course adding more nodes) you should see more tasks running concurrently. Whether this will actually be faster probably depends mainly on whether the CPUs in your nodes were really being fully utilized with the current number of cores. On Wed, Jul 30, 2014 at 8:30 PM, Darin McBeath ddmcbe...@yahoo.com wrote: Thanks. So to make sure I understand. Since I'm using a 'stand-alone' cluster, I would set SPARK_WORKER_INSTANCES to something like 2 (instead of the default value of 1). Is that correct? But, it also sounds like I need to explicitly set a value for SPARKER_WORKER_CORES (based on what the documentation states). What would I want that value to be based on my configuration below? Or, would I leave that alone? From: Daniel Siegmann daniel.siegm...@velos.io To: user@spark.apache.org; Darin McBeath ddmcbe...@yahoo.com Sent: Wednesday, July 30, 2014 5:58 PM Subject: Re: Number of partitions and Number of concurrent tasks This is correct behavior. Each core can execute exactly one task at a time, with each task corresponding to a partition. If your cluster only has 24 cores, you can only run at most 24 tasks at once. You could run multiple workers per node to get more executors. That would give you more cores in the cluster. But however many cores you have, each core will run only one task at a time. On Wed, Jul 30, 2014 at 3:56 PM, Darin McBeath ddmcbe...@yahoo.com wrote: I have a cluster with 3 nodes (each with 8 cores) using Spark 1.0.1. I have an RDDString which I've repartitioned so it has 100 partitions (hoping to increase the parallelism). When I do a transformation (such as filter) on this RDD, I can't seem to get more than 24 tasks (my total number of cores across the 3 nodes) going at one point in time. By tasks, I mean the number of tasks that appear under the Application UI. I tried explicitly setting the spark.default.parallelism to 48 (hoping I would get 48 tasks concurrently running) and verified this in the Application UI for the running application but this had no effect. Perhaps, this is ignored for a 'filter' and the default is the total number of cores available. I'm fairly new with Spark so maybe I'm just missing or misunderstanding something fundamental. Any help would be appreciated. Thanks. Darin. -- Daniel Siegmann, Software Developer Velos Accelerating Machine Learning 440 NINTH AVENUE, 11TH FLOOR, NEW YORK, NY 10001 E: daniel.siegm...@velos.iow: www.velos.io -- Daniel Siegmann, Software Developer Velos Accelerating Machine
Re: store spark streaming dstream in hdfs or cassandra
Off the top of my head, you can use the ForEachDStream to which you pass in the code that writes to Hadoop, and then register that as an output stream, so the function you pass in is periodically executed and causes the data to be written to HDFS. If you are ok with the data being in text format - simply use saveAsTextFiles method in the RDD class. salemi wrote: Hi, I was wondering what is the best way to store off dstreams in hdfs or casandra. Could somebody provide an example? Thanks, Ali -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/store-spark-streaming-dstream-in-hdfs-or-cassandra-tp11064.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
SchemaRDD select expression
I'm looking to write a select statement to get a distinct count on userId grouped by keyword column on a parquet file SchemaRDD equivalent of: SELECT keyword, count(distinct(userId)) from table group by keyword How to write it using the chained select().groupBy() operations? Thanks! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/SchemaRDD-select-expression-tp11069.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: spark.shuffle.consolidateFiles seems not working
I got the number from the Hadoop admin. It's 1M actually. I suspect the consolidation didn't work as expected? Any other reason? On Thu, Jul 31, 2014 at 11:01 AM, Shao, Saisai saisai.s...@intel.com wrote: I don’t think it’s a bug of consolidated shuffle, it’s a Linux configuration problem. The default open files in Linux is 1024, while your open file is larger than 1024 you will get the error as you mentioned below. So you can set the open file numbers to a large one by: ulimit –n xxx or write into /etc/security/limits.conf in Ubuntu. Shuffle consolidation can reduce the total shuffle file numbers, but the concurrent opened file number is the same as basic hash-based shuffle. Thanks Jerry *From:* Jianshi Huang [mailto:jianshi.hu...@gmail.com] *Sent:* Thursday, July 31, 2014 10:34 AM *To:* user@spark.apache.org *Cc:* xia...@sjtu.edu.cn *Subject:* Re: spark.shuffle.consolidateFiles seems not working Ok... but my question is why spark.shuffle.consolidateFiles is working (or is it)? Is this a bug? On Wed, Jul 30, 2014 at 4:29 PM, Larry Xiao xia...@sjtu.edu.cn wrote: Hi Jianshi, I've met similar situation before. And my solution was 'ulimit', you can use -a to see your current settings -n to set open files limit (and other limits also) And I set -n to 10240. I see spark.shuffle.consolidateFiles helps by reusing open files. (so I don't know to what extend does it help) Hope it helps. Larry On 7/30/14, 4:01 PM, Jianshi Huang wrote: I'm using Spark 1.0.1 on Yarn-Client mode. SortByKey always reports a FileNotFoundExceptions with messages says too many open files. I already set spark.shuffle.consolidateFiles to true: conf.set(spark.shuffle.consolidateFiles, true) But it seems not working. What are the other possible reasons? How to fix it? Jianshi -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github Blog: http://huangjs.github.com/ -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github Blog: http://huangjs.github.com/ -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github Blog: http://huangjs.github.com/
RDD operation examples with data?
Hi, I'm learning Spark and I am confused about when to use the many different operations on RDDs. Does anyone have any examples which show example inputs and resulting outputs for the various RDD operations and if the operation takes an Function a simple example of the code? For example, something like this for flatMap One row - the quick brown fox Passed to: JavaRDDString words = lines.flatMap(new FlatMapFunctionString, String() { @Override public IterableString call(String s) { return Arrays.asList(SPACE.split(s)); } }); When completed: words would contain the quick brown fox (Yes this one is pretty obvious but some of the others aren't). If such examples don't exist, is there a shared wiki or someplace we could start building one? Thanks, Chris
Re: How do you debug a PythonException?
So if I try this again but in the Scala shell (as opposed to the Python one), this is what I get: scala val a = sc.textFile(s3n://some-path/*.json, minPartitions=sc.defaultParallelism * 3).cache() a: org.apache.spark.rdd.RDD[String] = MappedRDD[1] at textFile at console:12 scala a.map(_.length).max14/07/31 20:09:04 WARN LoadSnappy: Snappy native library is available14/07/31 20:10:41 WARN TaskSetManager: Lost TID 22 (task 0.0:22)14/07/31 20:10:41 WARN TaskSetManager: Loss was due to java.lang.OutOfMemoryError java.lang.OutOfMemoryError: GC overhead limit exceeded at java.util.Arrays.copyOfRange(Arrays.java:2694) at java.lang.String.init(String.java:203) at java.nio.HeapCharBuffer.toString(HeapCharBuffer.java:561) at java.nio.CharBuffer.toString(CharBuffer.java:1201) at org.apache.hadoop.io.Text.decode(Text.java:350) at org.apache.hadoop.io.Text.decode(Text.java:327) at org.apache.hadoop.io.Text.toString(Text.java:254) at org.apache.spark.SparkContext$$anonfun$textFile$1.apply(SparkContext.scala:458) at org.apache.spark.SparkContext$$anonfun$textFile$1.apply(SparkContext.scala:458) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103) at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:107) at org.apache.spark.rdd.RDD.iterator(RDD.scala:227) at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) at org.apache.spark.rdd.RDD.iterator(RDD.scala:229) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:111) at org.apache.spark.scheduler.Task.run(Task.scala:51) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:183) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745)14/07/31 20:10:42 ERROR TaskSchedulerImpl: Lost executor 19 on ip-10-13-142-142.ec2.internal: OutOfMemoryError So I guess I need to fiddle with some memory configs? I’m surprised that just checking input line length could trigger this. Nick On Wed, Jul 30, 2014 at 8:58 PM, Davies Liu dav...@databricks.com wrote: The exception in Python means that the worker try to read command from JVM, but it reach the end of socket (socket had been closed). So it's possible that there another exception happened in JVM. Could you change the log level of log4j, then check is there any problem inside JVM? Davies On Wed, Jul 30, 2014 at 9:12 AM, Nicholas Chammas nicholas.cham...@gmail.com wrote: Any clues? This looks like a bug, but I can't report it without more precise information. On Tue, Jul 29, 2014 at 9:56 PM, Nick Chammas nicholas.cham...@gmail.com wrote: I’m in the PySpark shell and I’m trying to do this: a = sc.textFile('s3n://path-to-handful-of-very-large-files-totalling-1tb/*.json', minPartitions=sc.defaultParallelism * 3).cache() a.map(lambda x: len(x)).max() My job dies with the following: 14/07/30 01:46:28 WARN TaskSetManager: Loss was due to org.apache.spark.api.python.PythonException org.apache.spark.api.python.PythonException: Traceback (most recent call last): File /root/spark/python/pyspark/worker.py, line 73, in main command = pickleSer._read_with_length(infile) File /root/spark/python/pyspark/serializers.py, line 142, in _read_with_length length = read_int(stream) File /root/spark/python/pyspark/serializers.py, line 337, in read_int raise EOFError EOFError at org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:115) at org.apache.spark.api.python.PythonRDD$$anon$1.init(PythonRDD.scala:145) at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:78) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) at org.apache.spark.rdd.RDD.iterator(RDD.scala:229) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:111) at org.apache.spark.scheduler.Task.run(Task.scala:51) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:183) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) 14/07/30 01:46:29 ERROR TaskSchedulerImpl: Lost executor 19 on ip-10-190-171-217.ec2.internal: remote Akka client disassociated How do I debug this? I’m using 1.0.2-rc1 deployed to EC2. Nick
Re: RDD operation examples with data?
I would check out the source examples on Spark's Github: https://github.com/apache/spark/tree/master/examples/src/main/scala/org/apache/spark/examples And, Zhen He put together a great web page with summaries and examples of each function: http://apache-spark-user-list.1001560.n3.nabble.com/A-new-resource-for-getting-examples-of-Spark-RDD-API-calls-td5529.html Hope this helps! Jacob On Thu, Jul 31, 2014 at 3:00 PM, Chris Curtin curtin.ch...@gmail.com wrote: Hi, I'm learning Spark and I am confused about when to use the many different operations on RDDs. Does anyone have any examples which show example inputs and resulting outputs for the various RDD operations and if the operation takes an Function a simple example of the code? For example, something like this for flatMap One row - the quick brown fox Passed to: JavaRDDString words = lines.flatMap(new FlatMapFunctionString, String() { @Override public IterableString call(String s) { return Arrays.asList(SPACE.split(s)); } }); When completed: words would contain the quick brown fox (Yes this one is pretty obvious but some of the others aren't). If such examples don't exist, is there a shared wiki or someplace we could start building one? Thanks, Chris
Re: SchemaRDD select expression
countDistinct is recently added and is in 1.0.2. If you are using that or the master branch, you could try something like: r.select('keyword, countDistinct('userId)).groupBy('keyword) On Thu, Jul 31, 2014 at 12:27 PM, buntu buntu...@gmail.com wrote: I'm looking to write a select statement to get a distinct count on userId grouped by keyword column on a parquet file SchemaRDD equivalent of: SELECT keyword, count(distinct(userId)) from table group by keyword How to write it using the chained select().groupBy() operations? Thanks! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/SchemaRDD-select-expression-tp11069.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: store spark streaming dstream in hdfs or cassandra
To read/write from/to Cassandra I recommend you to use the Spark-Cassandra connector at [1] Using it, saving a Spark Streaming RDD to Cassandra is fairly easy: sparkConfig.set(CassandraConnectionHost, cassandraHost) val sc = new SparkContext(sparkConfig) val ssc = new StreamingContext(sc, Seconds(x)) ... stream.foreachRDD{ rdd = {rdd.saveToCassandra(keyspace, table); ()}} ... The most recent version has additional support for creating the streamingContext with the cassandra config, effectively merging the 2nd and 3rd lines above. -kr, Gerard. [1] https://github.com/datastax/spark-cassandra-connector/ On Thu, Jul 31, 2014 at 9:12 PM, Hari Shreedharan hshreedha...@cloudera.com wrote: Off the top of my head, you can use the ForEachDStream to which you pass in the code that writes to Hadoop, and then register that as an output stream, so the function you pass in is periodically executed and causes the data to be written to HDFS. If you are ok with the data being in text format - simply use saveAsTextFiles method in the RDD class. salemi wrote: Hi, I was wondering what is the best way to store off dstreams in hdfs or casandra. Could somebody provide an example? Thanks, Ali -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/store-spark-streaming-dstream-in-hdfs-or-cassandra-tp11064.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: How do you debug a PythonException?
Davies, That was it. Removing the call to cache() let the job run successfully, but this challenges my understanding of how Spark handles caching data. I thought it was safe to cache data sets larger than the cluster could hold in memory. What Spark would do is cache as much as it could and leave the rest for access from disk. Is that not correct? Nick On Thu, Jul 31, 2014 at 5:04 PM, Davies Liu dav...@databricks.com wrote: Maybe because you try to cache all the data in memory, but heap of JVM is not big enough. If remove the .cache(), is there still this problem? On Thu, Jul 31, 2014 at 1:33 PM, Nicholas Chammas nicholas.cham...@gmail.com wrote: Hmm, looking at this stack trace a bit more carefully, it looks like the code in the Hadoop API for reading data from the source choked. Is that correct? Perhaps, there is a missing newline (or two. or more) that make 1 line of data too much to read in at once? I'm just guessing here. Gonna try to track this down real quick. Btw, I'm seeing this on 1.0.1 as well, so it's not a regression in 1.0.2-rc1 or anything like that. Nick On Thu, Jul 31, 2014 at 4:18 PM, Nicholas Chammas nicholas.cham...@gmail.com wrote: So if I try this again but in the Scala shell (as opposed to the Python one), this is what I get: scala val a = sc.textFile(s3n://some-path/*.json, minPartitions=sc.defaultParallelism * 3).cache() a: org.apache.spark.rdd.RDD[String] = MappedRDD[1] at textFile at console:12 scala a.map(_.length).max 14/07/31 20:09:04 WARN LoadSnappy: Snappy native library is available 14/07/31 20:10:41 WARN TaskSetManager: Lost TID 22 (task 0.0:22) 14/07/31 20:10:41 WARN TaskSetManager: Loss was due to java.lang.OutOfMemoryError java.lang.OutOfMemoryError: GC overhead limit exceeded at java.util.Arrays.copyOfRange(Arrays.java:2694) at java.lang.String.init(String.java:203) at java.nio.HeapCharBuffer.toString(HeapCharBuffer.java:561) at java.nio.CharBuffer.toString(CharBuffer.java:1201) at org.apache.hadoop.io.Text.decode(Text.java:350) at org.apache.hadoop.io.Text.decode(Text.java:327) at org.apache.hadoop.io.Text.toString(Text.java:254) at org.apache.spark.SparkContext$$anonfun$textFile$1.apply(SparkContext.scala:458) at org.apache.spark.SparkContext$$anonfun$textFile$1.apply(SparkContext.scala:458) at scala.collection.Iterator$anon$11.next(Iterator.scala:328) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103) at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:107) at org.apache.spark.rdd.RDD.iterator(RDD.scala:227) at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) at org.apache.spark.rdd.RDD.iterator(RDD.scala:229) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:111) at org.apache.spark.scheduler.Task.run(Task.scala:51) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:183) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) 14/07/31 20:10:42 ERROR TaskSchedulerImpl: Lost executor 19 on ip-10-13-142-142.ec2.internal: OutOfMemoryError So I guess I need to fiddle with some memory configs? I’m surprised that just checking input line length could trigger this. Nick On Wed, Jul 30, 2014 at 8:58 PM, Davies Liu dav...@databricks.com wrote: The exception in Python means that the worker try to read command from JVM, but it reach the end of socket (socket had been closed). So it's possible that there another exception happened in JVM. Could you change the log level of log4j, then check is there any problem inside JVM? Davies On Wed, Jul 30, 2014 at 9:12 AM, Nicholas Chammas nicholas.cham...@gmail.com wrote: Any clues? This looks like a bug, but I can't report it without more precise information. On Tue, Jul 29, 2014 at 9:56 PM, Nick Chammas nicholas.cham...@gmail.com wrote: I’m in the PySpark shell and I’m trying to do this: a = sc.textFile('s3n://path-to-handful-of-very-large-files-totalling-1tb/*.json', minPartitions=sc.defaultParallelism * 3).cache() a.map(lambda x: len(x)).max() My job dies with the following: 14/07/30 01:46:28 WARN TaskSetManager: Loss was due to org.apache.spark.api.python.PythonException org.apache.spark.api.python.PythonException: Traceback (most recent call last): File
Re: SchemaRDD select expression
Looking at what this patch [1] has to do to achieve it, I am not sure if you can do the same thing in 1.0.0 using DSL only. Just curious, why don't you use the hql() / sql() methods and pass a query string in? [1] https://github.com/apache/spark/pull/1211/files On Thu, Jul 31, 2014 at 2:20 PM, Buntu Dev buntu...@gmail.com wrote: Thanks Zongheng for the pointer. Is there a way to achieve the same in 1.0.0 ? On Thu, Jul 31, 2014 at 1:43 PM, Zongheng Yang zonghen...@gmail.com wrote: countDistinct is recently added and is in 1.0.2. If you are using that or the master branch, you could try something like: r.select('keyword, countDistinct('userId)).groupBy('keyword) On Thu, Jul 31, 2014 at 12:27 PM, buntu buntu...@gmail.com wrote: I'm looking to write a select statement to get a distinct count on userId grouped by keyword column on a parquet file SchemaRDD equivalent of: SELECT keyword, count(distinct(userId)) from table group by keyword How to write it using the chained select().groupBy() operations? Thanks! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/SchemaRDD-select-expression-tp11069.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: SchemaRDD select expression
I was not sure if registerAsTable() and then query against that table have additional performance impact and if DSL eliminates that. On Thu, Jul 31, 2014 at 2:33 PM, Zongheng Yang zonghen...@gmail.com wrote: Looking at what this patch [1] has to do to achieve it, I am not sure if you can do the same thing in 1.0.0 using DSL only. Just curious, why don't you use the hql() / sql() methods and pass a query string in? [1] https://github.com/apache/spark/pull/1211/files On Thu, Jul 31, 2014 at 2:20 PM, Buntu Dev buntu...@gmail.com wrote: Thanks Zongheng for the pointer. Is there a way to achieve the same in 1.0.0 ? On Thu, Jul 31, 2014 at 1:43 PM, Zongheng Yang zonghen...@gmail.com wrote: countDistinct is recently added and is in 1.0.2. If you are using that or the master branch, you could try something like: r.select('keyword, countDistinct('userId)).groupBy('keyword) On Thu, Jul 31, 2014 at 12:27 PM, buntu buntu...@gmail.com wrote: I'm looking to write a select statement to get a distinct count on userId grouped by keyword column on a parquet file SchemaRDD equivalent of: SELECT keyword, count(distinct(userId)) from table group by keyword How to write it using the chained select().groupBy() operations? Thanks! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/SchemaRDD-select-expression-tp11069.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: SchemaRDD select expression
The performance should be the same using the DSL or SQL strings. On Thu, Jul 31, 2014 at 2:36 PM, Buntu Dev buntu...@gmail.com wrote: I was not sure if registerAsTable() and then query against that table have additional performance impact and if DSL eliminates that. On Thu, Jul 31, 2014 at 2:33 PM, Zongheng Yang zonghen...@gmail.com wrote: Looking at what this patch [1] has to do to achieve it, I am not sure if you can do the same thing in 1.0.0 using DSL only. Just curious, why don't you use the hql() / sql() methods and pass a query string in? [1] https://github.com/apache/spark/pull/1211/files On Thu, Jul 31, 2014 at 2:20 PM, Buntu Dev buntu...@gmail.com wrote: Thanks Zongheng for the pointer. Is there a way to achieve the same in 1.0.0 ? On Thu, Jul 31, 2014 at 1:43 PM, Zongheng Yang zonghen...@gmail.com wrote: countDistinct is recently added and is in 1.0.2. If you are using that or the master branch, you could try something like: r.select('keyword, countDistinct('userId)).groupBy('keyword) On Thu, Jul 31, 2014 at 12:27 PM, buntu buntu...@gmail.com wrote: I'm looking to write a select statement to get a distinct count on userId grouped by keyword column on a parquet file SchemaRDD equivalent of: SELECT keyword, count(distinct(userId)) from table group by keyword How to write it using the chained select().groupBy() operations? Thanks! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/SchemaRDD-select-expression-tp11069.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: SchemaRDD select expression
Thanks Michael for confirming! On Thu, Jul 31, 2014 at 2:43 PM, Michael Armbrust mich...@databricks.com wrote: The performance should be the same using the DSL or SQL strings. On Thu, Jul 31, 2014 at 2:36 PM, Buntu Dev buntu...@gmail.com wrote: I was not sure if registerAsTable() and then query against that table have additional performance impact and if DSL eliminates that. On Thu, Jul 31, 2014 at 2:33 PM, Zongheng Yang zonghen...@gmail.com wrote: Looking at what this patch [1] has to do to achieve it, I am not sure if you can do the same thing in 1.0.0 using DSL only. Just curious, why don't you use the hql() / sql() methods and pass a query string in? [1] https://github.com/apache/spark/pull/1211/files On Thu, Jul 31, 2014 at 2:20 PM, Buntu Dev buntu...@gmail.com wrote: Thanks Zongheng for the pointer. Is there a way to achieve the same in 1.0.0 ? On Thu, Jul 31, 2014 at 1:43 PM, Zongheng Yang zonghen...@gmail.com wrote: countDistinct is recently added and is in 1.0.2. If you are using that or the master branch, you could try something like: r.select('keyword, countDistinct('userId)).groupBy('keyword) On Thu, Jul 31, 2014 at 12:27 PM, buntu buntu...@gmail.com wrote: I'm looking to write a select statement to get a distinct count on userId grouped by keyword column on a parquet file SchemaRDD equivalent of: SELECT keyword, count(distinct(userId)) from table group by keyword How to write it using the chained select().groupBy() operations? Thanks! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/SchemaRDD-select-expression-tp11069.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Installing Spark 0.9.1 on EMR Cluster
Hi Rahul, I am not sure about bootstrapping while creating but we have downloaded the tar ball , extracted and configured accordingly and it worked fine. I believe you would want to write a custom script which does all these things and add it like a bootstrap action. Thanks, Sai On Jul 31, 2014 2:42 PM, Rahul Bhojwani rahulbhojwani2...@gmail.com wrote: I wanted to install Spark version 0.9.1 on Amazon EMR Cluster. Can anyone give the install script which I can pass as the custom bootstrap action while creating a cluster? Thanks -- [image: http://] Rahul K Bhojwani [image: http://]about.me/rahul_bhojwani http://about.me/rahul_bhojwani
Issue with Spark on EC2 using spark-ec2 script
Hey all, I was able to spawn up a cluster, but when I'm trying to submit a simple jar via spark-submit it fails to run. I am trying to run the simple Standalone Application from the quickstart. Oddly enough, I could get another application running through the spark-shell. What am I doing wrong here? :( http://spark.apache.org/docs/latest/quick-start.html * Here's my setup: * $ ls project simple.sbt src target $ ls -R src src: main src/main: scala src/main/scala: SimpleApp.scala $ cat src/main/scala/SimpleApp.scala package main.scala /* SimpleApp.scala */ import org.apache.spark.SparkContext import org.apache.spark.SparkContext._ import org.apache.spark.SparkConf object SimpleApp { def main(args: Array[String]) { val logFile = /tmp/README.md val conf = new SparkConf().setAppName(Simple Application) val sc = new SparkContext(conf) val logData = sc.textFile(logFile, 2).cache() val numAs = logData.filter(line = line.contains(a)).count() val numBs = logData.filter(line = line.contains(b)).count() println(Lines with a: %s, Lines with b: %s.format(numAs, numBs)) } } $ cat simple.sbt name := Simple Project version := 1.0 scalaVersion := 2.10.4 libraryDependencies += org.apache.spark %% spark-core % 1.0.1 resolvers += Akka Repository at http://repo.akka.io/releases/; * Here's how I run the job: * $ /root/spark/bin/spark-submit --class main.scala.SimpleApp --master local[4] ./target/scala-2.10/simple-project_2.10-1.0.jar *Here is the error: * 14/07/31 16:23:56 INFO scheduler.DAGScheduler: Failed to run count at SimpleApp.scala:14 Exception in thread main org.apache.spark.SparkException: Job aborted due to stage failure: Task 0.0:1 failed 1 times, most recent failure: Exception failure in TID 1 on host localhost: java.io.IOException: No such file or directory java.io.UnixFileSystem.createFileExclusively(Native Method) java.io.File.createNewFile(File.java:1006) java.io.File.createTempFile(File.java:1989) org.apache.spark.util.Utils$.fetchFile(Utils.scala:326) org.apache.spark.executor.Executor$$anonfun$org$apache$spark$executor$Executor$$updateDependencies$6.apply(Executor.scala:332) org.apache.spark.executor.Executor$$anonfun$org$apache$spark$executor$Executor$$updateDependencies$6.apply(Executor.scala:330) scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772) scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226) scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39) scala.collection.mutable.HashMap.foreach(HashMap.scala:98) scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771) org.apache.spark.executor.Executor.org $apache$spark$executor$Executor$$updateDependencies(Executor.scala:330) org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:168) java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) java.lang.Thread.run(Thread.java:745) Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1033) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1017) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1015) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1015) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:633) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:633) at scala.Option.foreach(Option.scala:236) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:633) at org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1207) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) at akka.actor.ActorCell.invoke(ActorCell.scala:456) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) at akka.dispatch.Mailbox.run(Mailbox.scala:219) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at
makeLinkRDDs in MLlib ALS
It seems to me that the way makeLinkRDDs works is by taking advantage of the fact that partition IDs happen to coincide with what we get from userPartitioner, since the HashPartitioner in *val grouped = ratingsByUserBlock.partitionBy(new HashPartitioner(numUserBlocks))* is actually preserving the keys from the ALSPartitioner. i.e., the blockId in *val links = grouped.mapPartitionsWithIndex((blockId, elements) = { ...* happens to be the same with the keys in *grouped* I feel this is rather fragile. Say if we are not using HashPartitioner, then the blockId won't be the same as that from the ALSPartitioner, which can lead to annoying problems. Correct me if I miss something. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/makeLinkRDDs-in-MLlib-ALS-tp11089.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Spark job finishes then command shell is blocked/hangs?
Hi, My spark job finishes with this output: 14/07/31 16:33:25 INFO SparkContext: Job finished: count at RetrieveData.scala:18, took 0.013189 s However, the command line doesn't go back to normal and instead just hangs. This is my first time running a spark job - is this normal? If not, how do I fix/debug this? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-job-finishes-then-command-shell-is-blocked-hangs-tp11095.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Installing Spark 0.9.1 on EMR Cluster
Have you tried flag --spark-version of spark-ec2 ? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Installing-Spark-0-9-1-on-EMR-Cluster-tp11084p11096.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Example standalone app error!
When are you guys getting the error? When Sparkcontext is created? Or when it is being shutdown? If this error is being thrown when the SparkContext is created, then one possible reason maybe conflicting versions of Akka. Spark depends on a version of Akka which is different from that of Scala, and launching a spark app using Scala command (instead of Java) can cause issues. TD On Thu, Jul 31, 2014 at 6:30 AM, Alex Minnaar aminn...@verticalscope.com wrote: I am eager to solve this problem. So if anyone has any suggestions I would be glad to hear them. Thanks, Alex From: Andrew Or and...@databricks.com Sent: Tuesday, July 29, 2014 4:53 PM To: user@spark.apache.org Subject: Re: Example standalone app error! Hi Alex, Very strange. This error occurs when someone tries to call an abstract method. I have run into this before and resolved it with a SBT clean followed by an assembly, so maybe you could give that a try. Let me know if that fixes it, Andrew 2014-07-29 13:01 GMT-07:00 Alex Minnaar aminn...@verticalscope.com: I am trying to run an example Spark standalone app with the following code import org.apache.spark.streaming._ import org.apache.spark.streaming.StreamingContext._ object SparkGensimLDA extends App{ val ssc=new StreamingContext(local,testApp,Seconds(5)) val lines=ssc.textFileStream(/.../spark_example/) val words=lines.flatMap(_.split( )) val wordCounts=words.map(x = (x,1)).reduceByKey(_ + _) wordCounts.print() ssc.start() ssc.awaitTermination() } However I am getting the following error 15:35:40.170 [spark-akka.actor.default-dispatcher-2] ERROR akka.actor.ActorSystemImpl - Uncaught fatal error from thread [spark-akka.actor.default-dispatcher-3] shutting down ActorSystem [spark] java.lang.AbstractMethodError: null at akka.actor.ActorCell.create(ActorCell.scala:580) ~[akka-actor_2.10-2.3.2.jar:na] at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:456) ~[akka-actor_2.10-2.3.2.jar:na] at akka.actor.ActorCell.systemInvoke(ActorCell.scala:478) ~[akka-actor_2.10-2.3.2.jar:na] at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:263) ~[akka-actor_2.10-2.3.2.jar:na] at akka.dispatch.Mailbox.run(Mailbox.scala:219) ~[akka-actor_2.10-2.3.2.jar:na] at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393) [akka-actor_2.10-2.3.2.jar:na] at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) [scala-library-2.10.4.jar:na] at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) [scala-library-2.10.4.jar:na] at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) [scala-library-2.10.4.jar:na] at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) [scala-library-2.10.4.jar:na] 15:35:40.171 [spark-akka.actor.default-dispatcher-2] ERROR akka.actor.ActorSystemImpl - Uncaught fatal error from thread [spark-akka.actor.default-dispatcher-4] shutting down ActorSystem [spark] java.lang.AbstractMethodError: null at akka.actor.ActorCell.create(ActorCell.scala:580) ~[akka-actor_2.10-2.3.2.jar:na] at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:456) ~[akka-actor_2.10-2.3.2.jar:na] at akka.actor.ActorCell.systemInvoke(ActorCell.scala:478) ~[akka-actor_2.10-2.3.2.jar:na] at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:263) ~[akka-actor_2.10-2.3.2.jar:na] at akka.dispatch.Mailbox.run(Mailbox.scala:219) ~[akka-actor_2.10-2.3.2.jar:na] at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393) [akka-actor_2.10-2.3.2.jar:na] at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) [scala-library-2.10.4.jar:na] at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) [scala-library-2.10.4.jar:na] at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) [scala-library-2.10.4.jar:na] at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) [scala-library-2.10.4.jar:na] 15:35:40.175 [main] DEBUG o.a.spark.storage.DiskBlockManager - Creating local directories at root dirs '/var/folders/6y/h1f088_j007_d11kpwb1jg6mgp/T/' 15:35:40.176 [spark-akka.actor.default-dispatcher-4] ERROR akka.actor.ActorSystemImpl - Uncaught fatal error from thread [spark-akka.actor.default-dispatcher-2] shutting down ActorSystem [spark] java.lang.AbstractMethodError: org.apache.spark.storage.BlockManagerMasterActor.aroundPostStop()V at akka.actor.dungeon.FaultHandling$class.akka$actor$dungeon$FaultHandling$$finishTerminate(FaultHandling.scala:210) ~[akka-actor_2.10-2.3.2.jar:na] at akka.actor.dungeon.FaultHandling$class.terminate(FaultHandling.scala:172) ~[akka-actor_2.10-2.3.2.jar:na] at akka.actor.ActorCell.terminate(ActorCell.scala:369) ~[akka-actor_2.10-2.3.2.jar:na] at
Re: spark.scheduler.pool seems not working in spark streaming
Whoa! That worked! I was half afraid it wont, since I hadnt tried it myself. TD On Wed, Jul 30, 2014 at 8:32 PM, liuwei stupi...@126.com wrote: Hi, Tathagata Das: I followed your advice and solved this problem, thank you very much! 在 2014年7月31日,上午3:07,Tathagata Das tathagata.das1...@gmail.com 写道: This is because setLocalProperty makes all Spark jobs submitted using the current thread belong to the set pool. However, in Spark Streaming, all the jobs are actually launched in the background from a different thread. So this setting does not work. However, there is a work around. If you are doing any kind of output operations on DStreams, like DStream.foreachRDD(), you can set the property inside that dstream.foreachRDD(rdd = rdd.sparkContext.setLocalProperty(...) ) On Wed, Jul 30, 2014 at 1:43 AM, liuwei stupi...@126.com wrote: In my spark streaming program, I set scheduler pool, just as follows: val myFairSchedulerFile = “xxx.xml” val myStreamingPool = “xxx” System.setProperty(“spark.scheduler.allocation.file”, myFairSchedulerFile) val conf = new SparkConf() val ssc = new StreamingContext(conf, batchInterval) ssc.sparkContext.setLocalProperty(“spark.scheduler.pool”, myStreamingPool) …. ssc.start() ssc.awaitTermination() I submit my spark streaming job in my spark cluster, and I found stage’s pool name is “default”, it seem ssc.sparkContext.setLocalProperty(“spark.scheduler.pool”, myStreamingPool) not work.
Re: spark.scheduler.pool seems not working in spark streaming
I filed a JIRA for this task for future reference. https://issues.apache.org/jira/browse/SPARK-2780 On Thu, Jul 31, 2014 at 5:37 PM, Tathagata Das tathagata.das1...@gmail.com wrote: Whoa! That worked! I was half afraid it wont, since I hadnt tried it myself. TD On Wed, Jul 30, 2014 at 8:32 PM, liuwei stupi...@126.com wrote: Hi, Tathagata Das: I followed your advice and solved this problem, thank you very much! 在 2014年7月31日,上午3:07,Tathagata Das tathagata.das1...@gmail.com 写道: This is because setLocalProperty makes all Spark jobs submitted using the current thread belong to the set pool. However, in Spark Streaming, all the jobs are actually launched in the background from a different thread. So this setting does not work. However, there is a work around. If you are doing any kind of output operations on DStreams, like DStream.foreachRDD(), you can set the property inside that dstream.foreachRDD(rdd = rdd.sparkContext.setLocalProperty(...) ) On Wed, Jul 30, 2014 at 1:43 AM, liuwei stupi...@126.com wrote: In my spark streaming program, I set scheduler pool, just as follows: val myFairSchedulerFile = “xxx.xml” val myStreamingPool = “xxx” System.setProperty(“spark.scheduler.allocation.file”, myFairSchedulerFile) val conf = new SparkConf() val ssc = new StreamingContext(conf, batchInterval) ssc.sparkContext.setLocalProperty(“spark.scheduler.pool”, myStreamingPool) …. ssc.start() ssc.awaitTermination() I submit my spark streaming job in my spark cluster, and I found stage’s pool name is “default”, it seem ssc.sparkContext.setLocalProperty(“spark.scheduler.pool”, myStreamingPool) not work.
Re: sbt package failed: wrong libraryDependencies for spark-streaming?
Hi Tathagata, I didn't mean to say this was an error. According to the other thread I linked, right now there shouldn't be any conflicts, so I wanted to use streaming in the shell for easy testing. I thought I had to create my own project in which I'd add streaming as a dependency, but if I can add it into the config that' even simpler and gets rid of my sbt problem. I'll try that. Simon -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/sbt-package-failed-wrong-libraryDependencies-for-spark-streaming-tp11103p11106.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
spark-submit registers the driver twice
Hi All, I am using the spark-submit command to submit my jar to a standalone cluster with two executor. When I use the spark-submit it deploys the application twice and I see two application entries in the master UI. The master logs as shown below also indicate that submit try to deploy the application twice and the deployment of second application fails I see the the error is 14/07/31 17:13:34 WARN TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient memory on the driver side. I have increased the memory for the executor. Could you please help me understand what is going wrong here? Thanks, Ali 14/07/31 17:09:32 INFO Master: Registering app KafkaMessageReceiver 14/07/31 17:09:32 INFO Master: Registered app KafkaMessageReceiver with ID app-20140731170932-0016 14/07/31 17:09:32 INFO Master: Launching executor app-20140731170932-0016/0 on worker worker-20140731192616-dev1.dr.com-46317 14/07/31 17:09:32 INFO Master: Launching executor app-20140731170932-0016/1 on worker worker-20140731162612-dev.dr.com-58975 14/07/31 17:09:33 INFO Master: Registering app KafkaMessageReceiver 14/07/31 17:09:33 INFO Master: Registered app KafkaMessageReceiver with ID app-20140731170933-0017 -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/spark-submit-registers-the-driver-twice-tp2.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: sbt package failed: wrong libraryDependencies for spark-streaming?
Hey Simon, The stuff you are trying to show - logs, contents of spark-env.sh, etc. are missing from the email. At least I am not able to see it (viewing through gmail). Are you pasting screenshots? Those might get blocked out somehow! TD On Thu, Jul 31, 2014 at 6:55 PM, durin m...@simon-schaefer.net wrote: I've added the following to my spark-env.sh: I can now execute without an error in the shell. However, I will get an error when doing this: What am I missing? Do I have to import another jar? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/sbt-package-failed-wrong-libraryDependencies-for-spark-streaming-tp11103p11108.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Issue with Spark on EC2 using spark-ec2 script
Hey Dean! Thanks! Did you try running this on a local environment or one generated by the spark-ec2 script? The environment I am running on is a 4 data node 1 master spark cluster generated by the spark-ec2 script. I haven't modified anything in the environment except for adding data to the ephemeral hdfs. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Issue-with-Spark-on-EC2-using-spark-ec2-script-tp11088p7.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: SQLCtx cacheTable
cacheTable uses a special columnar caching technique that is optimized for SchemaRDDs. It something similar to MEMORY_ONLY_SER but not quite. You can specify the persistence level on the SchemaRDD itself and register that as a temporary table, however it is likely you will not get as good performance. On Thu, Jul 31, 2014 at 6:16 AM, Gurvinder Singh gurvinder.si...@uninett.no wrote: Hi, I am wondering how can I specify the persistence level in cacheTable. As it is takes only table name as parameter. It should be possible to specify the persistence level. - Gurvinder
Re: HiveContext is creating metastore warehouse locally instead of in hdfs
Could you enable HistoryServer and provide the properties and CLASSPATH for the spark-shell? And 'env' command to list your environment variables? By the way, what does the spark logs says? Enable debug mode to see what's going on in spark-shell when it tries to interact and init HiveContext. On Jul 31, 2014, at 19:09, chenjie chenjie2...@gmail.com wrote: Hi, Yin and Andrew, thank you for your reply. When I create table in hive cli, it works correctly and the table will be found in hdfs. I forgot start hiveserver2 before and I started it today. Then I run the command below: spark-shell --master spark://192.168.40.164:7077 --driver-class-path conf/hive-site.xml Furthermore, I added the following command: hiveContext.hql(SET hive.metastore.warehouse.dir=hdfs://192.168.40.164:8020/user/hive/warehouse) But then didn't work for me. I got the same exception as before and found the table file in local directory instead of hdfs. Yin Huai-2 wrote Another way is to set hive.metastore.warehouse.dir explicitly to the HDFS dir storing Hive tables by using SET command. For example: hiveContext.hql(SET hive.metastore.warehouse.dir=hdfs://localhost:54310/user/hive/warehouse) On Thu, Jul 31, 2014 at 8:05 AM, Andrew Lee lt; alee526@ gt; wrote: Hi All, It has been awhile, but what I did to make it work is to make sure the followings: 1. Hive is working when you run Hive CLI and JDBC via Hiveserver2 2. Make sure you have the hive-site.xml from above Hive configuration. The problem here is that you want the hive-site.xml from the Hive metastore. The one for Hive and HCatalog may be different files. Make sure you check the xml properties in that file, pick the one that has the warehouse property configured and the JDO setup. 3. Make sure hive-site.xml from step 2 is included in $SPARK_HOME/conf, and in your runtime CLASSPATH when you run spark-shell 4. Use the history server to check the runtime CLASSPATH and order to ensure hive-site.xml is included. HiveContext should pick up the hive-site.xml and talk to your running hive service. Hope these tips help. On Jul 30, 2014, at 22:47, chenjie lt; chenjie2001@ gt; wrote: Hi, Michael. I Have the same problem. My warehouse directory is always created locally. I copied the default hive-site.xml into the $SPARK_HOME/conf directory on each node. After I executed the code below, val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc) hiveContext.hql(CREATE TABLE IF NOT EXISTS src (key INT, value STRING)) hiveContext.hql(LOAD DATA LOCAL INPATH '/extdisk2/tools/spark/examples/src/main/resources/kv1.txt' INTO TABLE src) hiveContext.hql(FROM src SELECT key, value).collect() I got the exception below: java.io.FileNotFoundException: File file:/user/hive/warehouse/src/kv1.txt does not exist at org.apache.hadoop.fs.RawLocalFileSystem.getFileStatus(RawLocalFileSystem.java:520) at org.apache.hadoop.fs.FilterFileSystem.getFileStatus(FilterFileSystem.java:398) at org.apache.hadoop.fs.ChecksumFileSystem$ChecksumFSInputChecker. init (ChecksumFileSystem.java:137) at org.apache.hadoop.fs.ChecksumFileSystem.open(ChecksumFileSystem.java:339) at org.apache.hadoop.fs.FileSystem.open(FileSystem.java:763) at org.apache.hadoop.mapred.LineRecordReader. init (LineRecordReader.java:106) at org.apache.hadoop.mapred.TextInputFormat.getRecordReader(TextInputFormat.java:67) at org.apache.spark.rdd.HadoopRDD$$anon$1. init (HadoopRDD.scala:193) At last, I found /user/hive/warehouse/src/kv1.txt was created on the node where I start spark-shell. The spark that I used is pre-built spark1.0.1 for hadoop2. Thanks in advance. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/HiveContext-is-creating-metastore-warehouse-locally-instead-of-in-hdfs-tp10838p1.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Re: java.util.concurrent.TimeoutException: Futures timed out after [30 seconds]
Glad to help you On Fri, Aug 1, 2014 at 11:28 AM, Bin wubin_phi...@126.com wrote: Hi Haiyang, Thanks, it really is the reason. Best, Bin 在 2014-07-31 08:05:34,Haiyang Fu haiyangfu...@gmail.com 写道: Have you tried to increase the dirver memory? On Thu, Jul 31, 2014 at 3:54 PM, Bin wubin_phi...@126.com wrote: Hi All, The data size of my task is about 30mb. It runs smoothly in local mode. However, when I submit it to the cluster, it throws the titled error (Please see below for the complete output). Actually, my output is almost the same with http://stackoverflow.com/questions/24080891/spark-program-hangs-at-job-finished-toarray-workers-throw-java-util-concurren. I also toArray my data, which was the reason of his case. However, how come it runs OK in local but not in the cluster? The memory of each worker is over 60g, and my run command is: $SPARK_HOME/bin/spark-class org.apache.spark.deploy.Client launch spark://10.196.135.101:7077 $jar_path $programname -Dspark. master=spark://10.196.135.101:7077 -Dspark.cores.max=300 -Dspark.executor.memory=20g -spark.jars=$jar_path -Dspark.default.parallelism=100 -Dspark.hadoop.hadoop.job.ugi=$username,$groupname -Dspark.app.name=$appname $in_path $scala_out_path Looking for help and thanks a lot! Below please find the complete output: 14/07/31 15:06:53 WARN Configuration: DEPRECATED: hadoop-site.xml found in the classpath. Usage of hadoop-site.xml is deprecated. Instead use core-site.xml, mapred-site.xml and hdfs-site.xml to override properties of core-default.xml, mapred-default.xml and hdfs-default.xml respectively 14/07/31 15:06:53 INFO SecurityManager: Changing view acls to: spark 14/07/31 15:06:53 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(spark) 14/07/31 15:06:53 INFO Slf4jLogger: Slf4jLogger started 14/07/31 15:06:53 INFO Remoting: Starting remoting 14/07/31 15:06:54 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkExecutor@tdw-10-215-140-22:39446] 14/07/31 15:06:54 INFO Remoting: Remoting now listens on addresses: [akka.tcp://sparkExecutor@tdw-10-215-140-22:39446] 14/07/31 15:06:54 INFO CoarseGrainedExecutorBackend: Connecting to driver: akka.tcp://spark@tdw-10-196-135-106:38502/user/CoarseGrainedScheduler 14/07/31 15:06:54 INFO WorkerWatcher: Connecting to worker akka.tcp://sparkWorker@tdw-10-215-140-22:34755/user/Worker 14/07/31 15:06:54 INFO WorkerWatcher: Successfully connected to akka.tcp://sparkWorker@tdw-10-215-140-22:34755/user/Worker 14/07/31 15:06:56 INFO CoarseGrainedExecutorBackend: Successfully registered with driver 14/07/31 15:06:56 INFO SecurityManager: Changing view acls to: spark 14/07/31 15:06:56 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(spark) 14/07/31 15:06:56 INFO Slf4jLogger: Slf4jLogger started 14/07/31 15:06:56 INFO Remoting: Starting remoting 14/07/31 15:06:56 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://spark@tdw-10-215-140-22:56708] 14/07/31 15:06:56 INFO Remoting: Remoting now listens on addresses: [akka.tcp://spark@tdw-10-215-140-22:56708] 14/07/31 15:06:56 INFO SparkEnv: Connecting to MapOutputTracker: akka.tcp://spark@tdw-10-196-135-106:38502/user/MapOutputTracker 14/07/31 15:06:58 INFO SparkEnv: Connecting to BlockManagerMaster: akka.tcp://spark@tdw-10-196-135-106:38502/user/BlockManagerMaster 14/07/31 15:06:59 INFO DiskBlockManager: Created local directory at /data1/sparkenv/local/spark-local-20140731150659-3f12 14/07/31 15:06:59 INFO DiskBlockManager: Created local directory at /data2/sparkenv/local/spark-local-20140731150659-1602 14/07/31 15:06:59 INFO DiskBlockManager: Created local directory at /data3/sparkenv/local/spark-local-20140731150659-d213 14/07/31 15:06:59 INFO DiskBlockManager: Created local directory at /data4/sparkenv/local/spark-local-20140731150659-f42e 14/07/31 15:06:59 INFO DiskBlockManager: Created local directory at /data5/sparkenv/local/spark-local-20140731150659-63d0 14/07/31 15:06:59 INFO DiskBlockManager: Created local directory at /data6/sparkenv/local/spark-local-20140731150659-9003 14/07/31 15:06:59 INFO DiskBlockManager: Created local directory at /data7/sparkenv/local/spark-local-20140731150659-f260 14/07/31 15:06:59 INFO DiskBlockManager: Created local directory at /data8/sparkenv/local/spark-local-20140731150659-6334 14/07/31 15:06:59 INFO DiskBlockManager: Created local directory at /data9/sparkenv/local/spark-local-20140731150659-3af4 14/07/31 15:06:59 INFO DiskBlockManager: Created local directory at /data10/sparkenv/local/spark-local-20140731150659-133d 14/07/31 15:06:59 INFO DiskBlockManager: Created local directory at /data11/sparkenv/local/spark-local-20140731150659-ed08 14/07/31 15:06:59 INFO MemoryStore: MemoryStore started with capacity 11.5 GB. 14/07/31 15:06:59
Re: java.lang.OutOfMemoryError: Java heap space
Hi, here are two tips for you, 1. increase the parallism level 2.increase the driver memory On Fri, Aug 1, 2014 at 12:58 AM, Sameer Tilak ssti...@live.com wrote: Hi everyone, I have the following configuration. I am currently running my app in local mode. val conf = new SparkConf().setMaster(local[2]).setAppName(ApproxStrMatch).set(spark.executor.memory, 3g).set(spark.storage.memoryFraction, 0.1) I am getting the following error. I tried setting up spark.executor.memory and memory fraction setting, however my UI does not show the increase and I still get these errors. I am loading a TSV file from HDFS (around 5 GB). Does this mean, I should update these settings and add more memory or is it somethign else? Spark master has 24 GB physical memory and workers have 16 GB, but we are running other services (CDH 5.1) on these nodes as well. 14/07/31 09:48:09 INFO BlockFetcherIterator$BasicBlockFetcherIterator: Getting 2 non-empty blocks out of 2 blocks 14/07/31 09:48:09 INFO BlockFetcherIterator$BasicBlockFetcherIterator: Getting 2 non-empty blocks out of 2 blocks 14/07/31 09:48:09 INFO BlockFetcherIterator$BasicBlockFetcherIterator: Started 0 remote fetches in 6 ms 14/07/31 09:48:09 INFO BlockFetcherIterator$BasicBlockFetcherIterator: Started 0 remote fetches in 6 ms 14/07/31 09:48:09 INFO BlockFetcherIterator$BasicBlockFetcherIterator: maxBytesInFlight: 50331648, targetRequestSize: 10066329 14/07/31 09:48:09 INFO BlockFetcherIterator$BasicBlockFetcherIterator: maxBytesInFlight: 50331648, targetRequestSize: 10066329 14/07/31 09:48:09 INFO BlockFetcherIterator$BasicBlockFetcherIterator: Getting 2 non-empty blocks out of 2 blocks 14/07/31 09:48:09 INFO BlockFetcherIterator$BasicBlockFetcherIterator: Getting 2 non-empty blocks out of 2 blocks 14/07/31 09:48:09 INFO BlockFetcherIterator$BasicBlockFetcherIterator: Started 0 remote fetches in 1 ms 14/07/31 09:48:09 INFO BlockFetcherIterator$BasicBlockFetcherIterator: Started 0 remote fetches in 1 ms 14/07/31 09:48:17 ERROR Executor: Exception in task ID 5 java.lang.OutOfMemoryError: Java heap space at java.util.Arrays.copyOf(Arrays.java:2271) at java.io.ByteArrayOutputStream.toByteArray(ByteArrayOutputStream.java:178) at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:73) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:197) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:744) 14/07/31 09:48:17 ERROR ExecutorUncaughtExceptionHandler: Uncaught exception in thread Thread[Executor task launch worker-3,5,main] java.lang.OutOfMemoryError: Java heap space at java.util.Arrays.copyOf(Arrays.java:2271) at java.io.ByteArrayOutputStream.toByteArray(ByteArrayOutputStream.java:178) at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:73) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:197) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:744) 14/07/31 09:48:17 WARN TaskSetManager: Lost TID 5 (task 1.0:0) 14/07/31 09:48:17 WARN TaskSetManager: Loss was due to java.lang.OutOfMemoryError java.lang.OutOfMemoryError: Java heap space at java.util.Arrays.copyOf(Arrays.java:2271) at java.io.ByteArrayOutputStream.toByteArray(ByteArrayOutputStream.java:178) at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:73) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:197) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:744) 14/07/31 09:48:17 ERROR TaskSetManager: Task 1.0:0 failed 1 times; aborting job 14/07/31 09:48:17 INFO TaskSchedulerImpl: Cancelling stage 1 14/07/31 09:48:17 INFO DAGScheduler: Failed to run collect at ComputeScores.scala:76 14/07/31 09:48:17 INFO Executor: Executor is trying to kill task 6 14/07/31 09:48:17 INFO TaskSchedulerImpl: Stage 1 was cancelled