Re: In Java how can I create an RDD with a large number of elements
Steve, Something like this will do I think = sc.parallelize(1 to 1000, 1000).flatMap(x = 1 to 10) the above will launch 1000 tasks (maps), with each task creating 10^5 numbers (total of 100 million elements) On Mon, Dec 8, 2014 at 6:17 PM, Steve Lewis lordjoe2...@gmail.com wrote: assume I don't care about values which may be created in a later map - in scala I can say val rdd = sc.parallelize(1 to 10, numSlices = 1000) but in Java JavaSparkContext can only paralellize a List - limited to Integer,MAX_VALUE elements and required to exist in memory - the best I can do on memory is to build my own List based on a BitSet. Is there a JIRA asking for JavaSparkContext.parallelize to take an Iterable or an Iterator? I am trying to make an RDD with at least 100 million elements and if possible several billion to test performance issues on a large application
executorAdded event to DAGScheduler
Can someone explain the motivation behind passing executorAdded event to DAGScheduler ? *DAGScheduler *does *submitWaitingStages *when *executorAdded *method is called by *TaskSchedulerImpl*. I see some issue in the below code, *TaskSchedulerImpl.scala code* if (!executorsByHost.contains(o.host)) { executorsByHost(o.host) = new HashSet[String]() executorAdded(o.executorId, o.host) newExecAvail = true } Note that executorAdded is called only when there is a new host and not for every new executor. For instance, there can be two executors in the same host and in this case. (But DAGScheduler executorAdded is notified only for new host - so only once in this case). If this is indeed an issue, I would like to submit a patch for this quickly. [cc Andrew Or] - Praveen
Re: executorAdded event to DAGScheduler
Some corrections. On Fri, Sep 26, 2014 at 5:32 PM, praveen seluka praveen.sel...@gmail.com wrote: Can someone explain the motivation behind passing executorAdded event to DAGScheduler ? *DAGScheduler *does *submitWaitingStages *when *executorAdded *method is called by *TaskSchedulerImpl*. I see some issue in the below code, *TaskSchedulerImpl.scala code* if (!executorsByHost.contains(o.host)) { executorsByHost(o.host) = new HashSet[String]() executorAdded(o.executorId, o.host) newExecAvail = true } Note that executorAdded is called only when there is a new host and not for every new executor. For instance, there can be two executors in the same host and in this case the DAGscheduler is notified only once. If this is indeed an issue, I would like to submit a patch for this quickly. [cc Andrew Or] - Praveen
Re: executorAdded event to DAGScheduler
In Yarn, we can easily have multiple containers allocated in the same node. On Fri, Sep 26, 2014 at 6:05 PM, Nan Zhu zhunanmcg...@gmail.com wrote: just a quick reply, we cannot start two executors in the same host for a single application in the standard deployment (one worker per machine) I’m not sure if it will create an issue when you have multiple workers in the same host, as submitWaitingStages is called everywhere and I never try such a deployment mode Best, -- Nan Zhu On Friday, September 26, 2014 at 8:02 AM, praveen seluka wrote: Can someone explain the motivation behind passing executorAdded event to DAGScheduler ? *DAGScheduler *does *submitWaitingStages *when *executorAdded *method is called by *TaskSchedulerImpl*. I see some issue in the below code, *TaskSchedulerImpl.scala code* if (!executorsByHost.contains(o.host)) { executorsByHost(o.host) = new HashSet[String]() executorAdded(o.executorId, o.host) newExecAvail = true } Note that executorAdded is called only when there is a new host and not for every new executor. For instance, there can be two executors in the same host and in this case. (But DAGScheduler executorAdded is notified only for new host - so only once in this case). If this is indeed an issue, I would like to submit a patch for this quickly. [cc Andrew Or] - Praveen
Yarn Over-allocating Containers
Hi all Am seeing a strange issue in Spark on Yarn(Stable). Let me know if known, or am missing something as it looks very fundamental. Launch a Spark job with 2 Containers. addContainerRequest called twice and then calls allocate to AMRMClient. This will get 2 Containers allocated. Fine as of now. Reporter thread starts. Now, if 1 of the container dies - this is what happens. Reporter thread adds another addContainerRequest and the next allocate is *actually* getting back 3 containers (total no of container requests from beginning). Reporter thread has a check to discard (release) excess container and ends-up releasing 2. In summary, job starts with 2 containers, 1 dies(lets say), reporter thread adds 1 more container request, subsequently gets back 3 allocated containers(from yarn) and discards 2 as it needed just 1. Thanks Praveen
Re: API to add/remove containers inside an application
Mailed our list - will send it to Spark Dev On Fri, Sep 5, 2014 at 11:28 AM, Rajat Gupta rgu...@qubole.com wrote: +1 on this. First step to more automated autoscaling of spark application master... On Fri, Sep 5, 2014 at 12:56 AM, Praveen Seluka psel...@qubole.com wrote: +user On Thu, Sep 4, 2014 at 10:53 PM, Praveen Seluka psel...@qubole.com wrote: Spark on Yarn has static allocation of resources. https://issues.apache.org/jira/browse/SPARK-3174 - This JIRA by Sandy is about adding and removing executors dynamically based on load. Even before doing this, can we expose and interface to add/remove executors. This will be very useful in Spark Shell. User can launch Spark Shell with just few executors. And later user wants more executors to be added, user could call some API like sc.addExecutor(count). We could similarly have removeExecutor API too. I have been thinking to work on this. Later, SPARK-3174 JIRA can add intelligence on when to add/remove depending on load. Right now, user will get control over adding and removing executor at runtime. Thoughts? Praveen
Re: API to add/remove containers inside an application
+user On Thu, Sep 4, 2014 at 10:53 PM, Praveen Seluka psel...@qubole.com wrote: Spark on Yarn has static allocation of resources. https://issues.apache.org/jira/browse/SPARK-3174 - This JIRA by Sandy is about adding and removing executors dynamically based on load. Even before doing this, can we expose and interface to add/remove executors. This will be very useful in Spark Shell. User can launch Spark Shell with just few executors. And later user wants more executors to be added, user could call some API like sc.addExecutor(count). We could similarly have removeExecutor API too. I have been thinking to work on this. Later, SPARK-3174 JIRA can add intelligence on when to add/remove depending on load. Right now, user will get control over adding and removing executor at runtime. Thoughts? Praveen
Re: import org.apache.spark.streaming.twitter._ in Shell
If you want to make Twitter* classes available in your shell, I believe you could do the following 1. Change the parent pom module ordering - Move external/twitter before assembly 2. In assembly/pom.xm, add external/twitter dependency - this will package twitter* into the assembly jar Now when spark-shell is launched, assembly jar is in classpath - hence twitter* too. I think this will work (remember trying this sometime back) On Tue, Jul 15, 2014 at 11:59 AM, Nicholas Chammas nicholas.cham...@gmail.com wrote: Hmm, I'd like to clarify something from your comments, Tathagata. Going forward, is Twitter Streaming functionality not supported from the shell? What should users do if they'd like to process live Tweets from the shell? Nick On Mon, Jul 14, 2014 at 11:50 PM, Nicholas Chammas nicholas.cham...@gmail.com wrote: At some point, you were able to access TwitterUtils from spark shell using Spark 1.0.0+ ? Yep. If yes, then what change in Spark caused it to not work any more? It still works for me. I was just commenting on your remark that it doesn't work through the shell, which I now understand to apply to versions of Spark before 1.0.0. Nick
Re: Number of executors change during job running
If I understand correctly, you could not change the number of executors at runtime right(correct me if am wrong) - its defined when we start the application and fixed. Do you mean number of tasks? On Fri, Jul 11, 2014 at 6:29 AM, Tathagata Das tathagata.das1...@gmail.com wrote: Can you try setting the number-of-partitions in all the shuffle-based DStream operations, explicitly. It may be the case that the default parallelism (that is, spark.default.parallelism) is probably not being respected. Regarding the unusual delay, I would look at the task details of that stage in the Spark web ui. It will show break of time for each task, including GC times, etc. That might give some indication. TD On Thu, Jul 10, 2014 at 5:13 PM, Bill Jay bill.jaypeter...@gmail.com wrote: Hi Tathagata, I set default parallelism as 300 in my configuration file. Sometimes there are more executors in a job. However, it is still slow. And I further observed that most executors take less than 20 seconds but two of them take much longer such as 2 minutes. The data size is very small (less than 480k lines with only 4 fields). I am not sure why the group by operation takes more then 3 minutes. Thanks! Bill On Thu, Jul 10, 2014 at 4:28 PM, Tathagata Das tathagata.das1...@gmail.com wrote: Are you specifying the number of reducers in all the DStream.ByKey operations? If the reduce by key is not set, then the number of reducers used in the stages can keep changing across batches. TD On Wed, Jul 9, 2014 at 4:05 PM, Bill Jay bill.jaypeter...@gmail.com wrote: Hi all, I have a Spark streaming job running on yarn. It consume data from Kafka and group the data by a certain field. The data size is 480k lines per minute where the batch size is 1 minute. For some batches, the program sometimes take more than 3 minute to finish the groupBy operation, which seems slow to me. I allocated 300 workers and specify 300 as the partition number for groupby. When I checked the slow stage *combineByKey at ShuffledDStream.scala:42,* there are sometimes 2 executors allocated for this stage. However, during other batches, the executors can be several hundred for the same stage, which means the number of executors for the same operations change. Does anyone know how Spark allocate the number of executors for different stages and how to increase the efficiency for task? Thanks! Bill
Re: Getting started : Spark on YARN issue
Hi Andrew Thanks Andrew for your suggestion. I updated the hdfs-site on server side and also on client side to use hostname instead of IP as mentioned here = http://rainerpeter.wordpress.com/2014/02/12/connect-to-hdfs-running-in-ec2-using-public-ip-addresses/ . Now, I could see that the client is able to talk to the datanode. Also, I will consider submitting application from within ec2 itself so that private IP is resolvable. Thanks Praveen On Fri, Jun 20, 2014 at 2:35 AM, Andrew Or and...@databricks.com wrote: (Also, an easier workaround is to simply submit the application from within your cluster, thus saving you all the manual labor of reconfiguring everything to use public hostnames. This may or may not be applicable to your use case.) 2014-06-19 14:04 GMT-07:00 Andrew Or and...@databricks.com: Hi Praveen, Yes, the fact that it is trying to use a private IP from outside of the cluster is suspicious. My guess is that your HDFS is configured to use internal IPs rather than external IPs. This means even though the hadoop confs on your local machine only use external IPs, the org.apache.spark.deploy.yarn.Client that is running on your local machine is trying to use whatever address your HDFS name node tells it to use, which is private in this case. A potential fix is to update your hdfs-site.xml (and other related configs) within your cluster to use public hostnames. Let me know if that does the job. Andrew 2014-06-19 6:04 GMT-07:00 Praveen Seluka psel...@qubole.com: I am trying to run Spark on YARN. I have a hadoop 2.2 cluster (YARN + HDFS) in EC2. Then, I compiled Spark using Maven with 2.2 hadoop profiles. Now am trying to run the example Spark job . (In Yarn-cluster mode). From my *local machine. *I have setup HADOOP_CONF_DIR environment variable correctly. ➜ spark git:(master) ✗ /bin/bash -c ./bin/spark-submit --class org.apache.spark.examples.SparkPi --master yarn-cluster --num-executors 2 --driver-memory 2g --executor-memory 2g --executor-cores 1 examples/target/scala-2.10/spark-examples_*.jar 10 14/06/19 14:59:39 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 14/06/19 14:59:39 INFO client.RMProxy: Connecting to ResourceManager at ec2-54-242-244-250.compute-1.amazonaws.com/54.242.244.250:8050 14/06/19 14:59:41 INFO yarn.Client: Got Cluster metric info from ApplicationsManager (ASM), number of NodeManagers: 1 14/06/19 14:59:41 INFO yarn.Client: Queue info ... queueName: default, queueCurrentCapacity: 0.0, queueMaxCapacity: 1.0, queueApplicationCount = 0, queueChildQueueCount = 0 14/06/19 14:59:41 INFO yarn.Client: Max mem capabililty of a single resource in this cluster 12288 14/06/19 14:59:41 INFO yarn.Client: Preparing Local resources 14/06/19 14:59:42 WARN hdfs.BlockReaderLocal: The short-circuit local reads feature cannot be used because libhadoop cannot be loaded. 14/06/19 14:59:43 INFO yarn.Client: Uploading file:/home/rgupta/awesome/spark/examples/target/scala-2.10/spark-examples_2.10-1.0.0-SNAPSHOT.jar to hdfs:// ec2-54-242-244-250.compute-1.amazonaws.com:8020/user/rgupta/.sparkStaging/application_1403176373037_0009/spark-examples_2.10-1.0.0-SNAPSHOT.jar 14/06/19 15:00:45 INFO hdfs.DFSClient: Exception in createBlockOutputStream org.apache.hadoop.net.ConnectTimeoutException: 6 millis timeout while waiting for channel to be ready for connect. ch : java.nio.channels.SocketChannel[connection-pending remote=/ 10.180.150.66:50010] at org.apache.hadoop.net.NetUtils.connect(NetUtils.java:532) at org.apache.hadoop.hdfs.DFSOutputStream.createSocketForPipeline(DFSOutputStream.java:1305) at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.createBlockOutputStream(DFSOutputStream.java:1128) at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.nextBlockOutputStream(DFSOutputStream.java:1088) at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:514) 14/06/19 15:00:45 INFO hdfs.DFSClient: Abandoning BP-1714253233-10.180.215.105-1403176367942:blk_1073741833_1009 14/06/19 15:00:46 INFO hdfs.DFSClient: Excluding datanode 10.180.150.66:50010 14/06/19 15:00:46 WARN hdfs.DFSClient: DataStreamer Exception Its able to talk to Resource Manager Then it puts the example.jar file to HDFS and it fails. Its trying to write to datanode. I verified that 50010 port is accessible through local machine. Any idea whats the issue here ? One thing thats suspicious is */10.180.150.66:50010 http://10.180.150.66:50010 - it looks like its trying to connect using private IP. If so, how can I resolve this to use public IP.* Thanks Praveen
Getting started : Spark on YARN issue
I am trying to run Spark on YARN. I have a hadoop 2.2 cluster (YARN + HDFS) in EC2. Then, I compiled Spark using Maven with 2.2 hadoop profiles. Now am trying to run the example Spark job . (In Yarn-cluster mode). From my *local machine. *I have setup HADOOP_CONF_DIR environment variable correctly. ➜ spark git:(master) ✗ /bin/bash -c ./bin/spark-submit --class org.apache.spark.examples.SparkPi --master yarn-cluster --num-executors 2 --driver-memory 2g --executor-memory 2g --executor-cores 1 examples/target/scala-2.10/spark-examples_*.jar 10 14/06/19 14:59:39 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 14/06/19 14:59:39 INFO client.RMProxy: Connecting to ResourceManager at ec2-54-242-244-250.compute-1.amazonaws.com/54.242.244.250:8050 14/06/19 14:59:41 INFO yarn.Client: Got Cluster metric info from ApplicationsManager (ASM), number of NodeManagers: 1 14/06/19 14:59:41 INFO yarn.Client: Queue info ... queueName: default, queueCurrentCapacity: 0.0, queueMaxCapacity: 1.0, queueApplicationCount = 0, queueChildQueueCount = 0 14/06/19 14:59:41 INFO yarn.Client: Max mem capabililty of a single resource in this cluster 12288 14/06/19 14:59:41 INFO yarn.Client: Preparing Local resources 14/06/19 14:59:42 WARN hdfs.BlockReaderLocal: The short-circuit local reads feature cannot be used because libhadoop cannot be loaded. 14/06/19 14:59:43 INFO yarn.Client: Uploading file:/home/rgupta/awesome/spark/examples/target/scala-2.10/spark-examples_2.10-1.0.0-SNAPSHOT.jar to hdfs:// ec2-54-242-244-250.compute-1.amazonaws.com:8020/user/rgupta/.sparkStaging/application_1403176373037_0009/spark-examples_2.10-1.0.0-SNAPSHOT.jar 14/06/19 15:00:45 INFO hdfs.DFSClient: Exception in createBlockOutputStream org.apache.hadoop.net.ConnectTimeoutException: 6 millis timeout while waiting for channel to be ready for connect. ch : java.nio.channels.SocketChannel[connection-pending remote=/ 10.180.150.66:50010] at org.apache.hadoop.net.NetUtils.connect(NetUtils.java:532) at org.apache.hadoop.hdfs.DFSOutputStream.createSocketForPipeline(DFSOutputStream.java:1305) at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.createBlockOutputStream(DFSOutputStream.java:1128) at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.nextBlockOutputStream(DFSOutputStream.java:1088) at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:514) 14/06/19 15:00:45 INFO hdfs.DFSClient: Abandoning BP-1714253233-10.180.215.105-1403176367942:blk_1073741833_1009 14/06/19 15:00:46 INFO hdfs.DFSClient: Excluding datanode 10.180.150.66:50010 14/06/19 15:00:46 WARN hdfs.DFSClient: DataStreamer Exception Its able to talk to Resource Manager Then it puts the example.jar file to HDFS and it fails. Its trying to write to datanode. I verified that 50010 port is accessible through local machine. Any idea whats the issue here ? One thing thats suspicious is */10.180.150.66:50010 http://10.180.150.66:50010 - it looks like its trying to connect using private IP. If so, how can I resolve this to use public IP.* Thanks Praveen