Re: In Java how can I create an RDD with a large number of elements

2014-12-08 Thread praveen seluka
Steve, Something like this will do I think = sc.parallelize(1 to 1000,
1000).flatMap(x = 1 to 10)

the above will launch 1000 tasks (maps), with each task creating 10^5
numbers (total of 100 million elements)


On Mon, Dec 8, 2014 at 6:17 PM, Steve Lewis lordjoe2...@gmail.com wrote:

  assume I don't care about values which may be created in a later map - in
 scala I can say
 val rdd = sc.parallelize(1 to 10, numSlices = 1000)
 but in Java JavaSparkContext can only paralellize a List - limited to
 Integer,MAX_VALUE elements and required to exist in memory - the best I can
 do on memory is to build my own List based on a BitSet.
 Is there a JIRA asking for JavaSparkContext.parallelize to take an
 Iterable or an Iterator?
 I am trying to make an RDD with at least 100 million elements and if
 possible several billion to test performance issues on a large application



executorAdded event to DAGScheduler

2014-09-26 Thread praveen seluka
Can someone explain the motivation behind passing executorAdded event to
DAGScheduler ? *DAGScheduler *does *submitWaitingStages *when *executorAdded
*method is called by *TaskSchedulerImpl*. I see some issue in the below
code,

*TaskSchedulerImpl.scala code*
if (!executorsByHost.contains(o.host)) {
executorsByHost(o.host) = new HashSet[String]()
executorAdded(o.executorId, o.host)
newExecAvail = true
  }

Note that executorAdded is called only when there is a new host and not for
every new executor. For instance, there can be two executors in the same
host and in this case. (But DAGScheduler executorAdded is notified only for
new host - so only once in this case). If this is indeed an issue, I would
like to submit a patch for this quickly. [cc Andrew Or]

- Praveen


Re: executorAdded event to DAGScheduler

2014-09-26 Thread praveen seluka
Some corrections.

On Fri, Sep 26, 2014 at 5:32 PM, praveen seluka praveen.sel...@gmail.com
wrote:

 Can someone explain the motivation behind passing executorAdded event to
 DAGScheduler ? *DAGScheduler *does *submitWaitingStages *when *executorAdded
 *method is called by *TaskSchedulerImpl*. I see some issue in the below
 code,

 *TaskSchedulerImpl.scala code*
 if (!executorsByHost.contains(o.host)) {
 executorsByHost(o.host) = new HashSet[String]()
 executorAdded(o.executorId, o.host)
 newExecAvail = true
   }

 Note that executorAdded is called only when there is a new host and not
 for every new executor. For instance, there can be two executors in the
 same host and in this case the DAGscheduler is notified only once. If this
 is indeed an issue, I would like to submit a patch for this quickly. [cc
 Andrew Or]

 - Praveen





Re: executorAdded event to DAGScheduler

2014-09-26 Thread praveen seluka
In Yarn, we can easily  have multiple containers allocated in the same node.

On Fri, Sep 26, 2014 at 6:05 PM, Nan Zhu zhunanmcg...@gmail.com wrote:

  just a quick reply, we cannot start two executors in the same host for a
 single application in the standard deployment (one worker per machine)

 I’m not sure if it will create an issue when you have multiple workers in
 the same host, as submitWaitingStages is called everywhere and I never
 try such a deployment mode

 Best,

 --
 Nan Zhu

 On Friday, September 26, 2014 at 8:02 AM, praveen seluka wrote:

 Can someone explain the motivation behind passing executorAdded event to
 DAGScheduler ? *DAGScheduler *does *submitWaitingStages *when *executorAdded
 *method is called by *TaskSchedulerImpl*. I see some issue in the below
 code,

 *TaskSchedulerImpl.scala code*
 if (!executorsByHost.contains(o.host)) {
 executorsByHost(o.host) = new HashSet[String]()
 executorAdded(o.executorId, o.host)
 newExecAvail = true
   }

 Note that executorAdded is called only when there is a new host and not
 for every new executor. For instance, there can be two executors in the
 same host and in this case. (But DAGScheduler executorAdded is notified
 only for new host - so only once in this case). If this is indeed an issue,
 I would like to submit a patch for this quickly. [cc Andrew Or]

 - Praveen






Yarn Over-allocating Containers

2014-09-12 Thread praveen seluka
Hi all

Am seeing a strange issue in Spark on Yarn(Stable). Let me know if known,
or am missing something as it looks very fundamental.

Launch a Spark job with 2 Containers. addContainerRequest called twice and
then calls allocate to AMRMClient. This will get 2 Containers allocated.
Fine as of now.

Reporter thread starts. Now, if 1 of the container dies - this is what
happens. Reporter thread adds another addContainerRequest and the next
allocate is *actually* getting back 3 containers (total no of container
requests from beginning). Reporter thread has a check to discard (release)
excess container and ends-up releasing 2.

In summary, job starts with 2 containers, 1 dies(lets say), reporter thread
adds 1 more container request, subsequently gets back 3 allocated
containers(from yarn) and discards 2 as it needed just 1.

Thanks
Praveen


Re: API to add/remove containers inside an application

2014-09-05 Thread Praveen Seluka
Mailed our list - will send it to Spark Dev


On Fri, Sep 5, 2014 at 11:28 AM, Rajat Gupta rgu...@qubole.com wrote:

 +1 on this. First step to more automated autoscaling of spark application
 master...


 On Fri, Sep 5, 2014 at 12:56 AM, Praveen Seluka psel...@qubole.com
 wrote:

 +user



 On Thu, Sep 4, 2014 at 10:53 PM, Praveen Seluka psel...@qubole.com
 wrote:

 Spark on Yarn has static allocation of resources.
 https://issues.apache.org/jira/browse/SPARK-3174 - This JIRA by Sandy
 is about adding and removing executors dynamically based on load. Even
 before doing this, can we expose and interface to add/remove executors.
 This will be very useful in Spark Shell.

 User can launch Spark Shell with just few executors. And later user
 wants more executors to be added, user could call some API like
 sc.addExecutor(count). We could similarly have removeExecutor API too. I
 have been thinking to work on this. Later, SPARK-3174 JIRA can add
 intelligence on when to add/remove depending on load. Right now, user will
 get control over adding and removing executor at runtime.

 Thoughts?

 Praveen






Re: API to add/remove containers inside an application

2014-09-04 Thread Praveen Seluka
+user


On Thu, Sep 4, 2014 at 10:53 PM, Praveen Seluka psel...@qubole.com wrote:

 Spark on Yarn has static allocation of resources.
 https://issues.apache.org/jira/browse/SPARK-3174 - This JIRA by Sandy is
 about adding and removing executors dynamically based on load. Even before
 doing this, can we expose and interface to add/remove executors. This will
 be very useful in Spark Shell.

 User can launch Spark Shell with just few executors. And later user wants
 more executors to be added, user could call some API like
 sc.addExecutor(count). We could similarly have removeExecutor API too. I
 have been thinking to work on this. Later, SPARK-3174 JIRA can add
 intelligence on when to add/remove depending on load. Right now, user will
 get control over adding and removing executor at runtime.

 Thoughts?

 Praveen



Re: import org.apache.spark.streaming.twitter._ in Shell

2014-07-15 Thread Praveen Seluka
If you want to make Twitter* classes available in your shell, I believe you
could do the following
1. Change the parent pom module ordering - Move external/twitter before
assembly
2. In assembly/pom.xm, add external/twitter dependency - this will package
twitter* into the assembly jar

Now when spark-shell is launched, assembly jar is in classpath - hence
twitter* too. I think this will work (remember trying this sometime back)


On Tue, Jul 15, 2014 at 11:59 AM, Nicholas Chammas 
nicholas.cham...@gmail.com wrote:

 Hmm, I'd like to clarify something from your comments, Tathagata.

 Going forward, is Twitter Streaming functionality not supported from the
 shell? What should users do if they'd like to process live Tweets from the
 shell?

 Nick


 On Mon, Jul 14, 2014 at 11:50 PM, Nicholas Chammas 
 nicholas.cham...@gmail.com wrote:

 At some point, you were able to access TwitterUtils from spark shell
 using Spark 1.0.0+ ?


 Yep.


 If yes, then what change in Spark caused it to not work any more?


 It still works for me. I was just commenting on your remark that it
 doesn't work through the shell, which I now understand to apply to versions
 of Spark before 1.0.0.

  Nick





Re: Number of executors change during job running

2014-07-11 Thread Praveen Seluka
If I understand correctly, you could not change the number of executors at
runtime right(correct me if am wrong) - its defined when we start the
application and fixed. Do you mean number of tasks?


On Fri, Jul 11, 2014 at 6:29 AM, Tathagata Das tathagata.das1...@gmail.com
wrote:

 Can you try setting the number-of-partitions in all the shuffle-based
 DStream operations, explicitly. It may be the case that the default
 parallelism (that is, spark.default.parallelism) is probably not being
 respected.

 Regarding the unusual delay, I would look at the task details of that
 stage in the Spark web ui. It will show break of time for each task,
 including GC times, etc. That might give some indication.

 TD


 On Thu, Jul 10, 2014 at 5:13 PM, Bill Jay bill.jaypeter...@gmail.com
 wrote:

 Hi Tathagata,

 I set default parallelism as 300 in my configuration file. Sometimes
 there are more executors in a job. However, it is still slow. And I further
 observed that most executors take less than 20 seconds but two of them take
 much longer such as 2 minutes. The data size is very small (less than 480k
 lines with only 4 fields). I am not sure why the group by operation takes
 more then 3 minutes.  Thanks!

 Bill


 On Thu, Jul 10, 2014 at 4:28 PM, Tathagata Das 
 tathagata.das1...@gmail.com wrote:

 Are you specifying the number of reducers in all the DStream.ByKey
 operations? If the reduce by key is not set, then the number of reducers
 used in the stages can keep changing across batches.

 TD


 On Wed, Jul 9, 2014 at 4:05 PM, Bill Jay bill.jaypeter...@gmail.com
 wrote:

 Hi all,

 I have a Spark streaming job running on yarn. It consume data from
 Kafka and group the data by a certain field. The data size is 480k lines
 per minute where the batch size is 1 minute.

 For some batches, the program sometimes take more than 3 minute to
 finish the groupBy operation, which seems slow to me. I allocated 300
 workers and specify 300 as the partition number for groupby. When I checked
 the slow stage *combineByKey at ShuffledDStream.scala:42,* there are
 sometimes 2 executors allocated for this stage. However, during other
 batches, the executors can be several hundred for the same stage, which
 means the number of executors for the same operations change.

 Does anyone know how Spark allocate the number of executors for
 different stages and how to increase the efficiency for task? Thanks!

 Bill







Re: Getting started : Spark on YARN issue

2014-06-20 Thread Praveen Seluka
Hi Andrew

Thanks Andrew for your suggestion. I updated the hdfs-site on server side
and also on client side to use hostname instead of IP as mentioned here =
http://rainerpeter.wordpress.com/2014/02/12/connect-to-hdfs-running-in-ec2-using-public-ip-addresses/
. Now, I could see that the client is able to talk to the datanode.

Also, I will consider submitting application from within ec2 itself so that
private IP is resolvable.

Thanks
Praveen


On Fri, Jun 20, 2014 at 2:35 AM, Andrew Or and...@databricks.com wrote:

 (Also, an easier workaround is to simply submit the application from
 within your
 cluster, thus saving you all the manual labor of reconfiguring everything
 to use
 public hostnames. This may or may not be applicable to your use case.)


 2014-06-19 14:04 GMT-07:00 Andrew Or and...@databricks.com:

 Hi Praveen,

 Yes, the fact that it is trying to use a private IP from outside of the
 cluster is suspicious.
 My guess is that your HDFS is configured to use internal IPs rather than
 external IPs.
 This means even though the hadoop confs on your local machine only use
 external IPs,
 the org.apache.spark.deploy.yarn.Client that is running on your local
 machine is trying
 to use whatever address your HDFS name node tells it to use, which is
 private in this
 case.

 A potential fix is to update your hdfs-site.xml (and other related
 configs) within your
 cluster to use public hostnames. Let me know if that does the job.

 Andrew


 2014-06-19 6:04 GMT-07:00 Praveen Seluka psel...@qubole.com:

 I am trying to run Spark on YARN. I have a hadoop 2.2 cluster (YARN  +
 HDFS) in EC2. Then, I compiled Spark using Maven with 2.2 hadoop profiles.
 Now am trying to run the example Spark job . (In Yarn-cluster mode).

 From my *local machine. *I have setup HADOOP_CONF_DIR environment
 variable correctly.

 ➜  spark git:(master) ✗ /bin/bash -c ./bin/spark-submit --class
 org.apache.spark.examples.SparkPi --master yarn-cluster --num-executors 2
 --driver-memory 2g --executor-memory 2g --executor-cores 1
 examples/target/scala-2.10/spark-examples_*.jar 10
 14/06/19 14:59:39 WARN util.NativeCodeLoader: Unable to load
 native-hadoop library for your platform... using builtin-java classes where
 applicable
 14/06/19 14:59:39 INFO client.RMProxy: Connecting to ResourceManager at
 ec2-54-242-244-250.compute-1.amazonaws.com/54.242.244.250:8050
 14/06/19 14:59:41 INFO yarn.Client: Got Cluster metric info from
 ApplicationsManager (ASM), number of NodeManagers: 1
 14/06/19 14:59:41 INFO yarn.Client: Queue info ... queueName: default,
 queueCurrentCapacity: 0.0, queueMaxCapacity: 1.0,
   queueApplicationCount = 0, queueChildQueueCount = 0
 14/06/19 14:59:41 INFO yarn.Client: Max mem capabililty of a single
 resource in this cluster 12288
 14/06/19 14:59:41 INFO yarn.Client: Preparing Local resources
 14/06/19 14:59:42 WARN hdfs.BlockReaderLocal: The short-circuit local
 reads feature cannot be used because libhadoop cannot be loaded.
 14/06/19 14:59:43 INFO yarn.Client: Uploading
 file:/home/rgupta/awesome/spark/examples/target/scala-2.10/spark-examples_2.10-1.0.0-SNAPSHOT.jar
 to hdfs://
 ec2-54-242-244-250.compute-1.amazonaws.com:8020/user/rgupta/.sparkStaging/application_1403176373037_0009/spark-examples_2.10-1.0.0-SNAPSHOT.jar
 14/06/19 15:00:45 INFO hdfs.DFSClient: Exception in
 createBlockOutputStream
 org.apache.hadoop.net.ConnectTimeoutException: 6 millis timeout
 while waiting for channel to be ready for connect. ch :
 java.nio.channels.SocketChannel[connection-pending remote=/
 10.180.150.66:50010]
 at org.apache.hadoop.net.NetUtils.connect(NetUtils.java:532)
 at
 org.apache.hadoop.hdfs.DFSOutputStream.createSocketForPipeline(DFSOutputStream.java:1305)
 at
 org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.createBlockOutputStream(DFSOutputStream.java:1128)
 at
 org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.nextBlockOutputStream(DFSOutputStream.java:1088)
 at
 org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:514)
 14/06/19 15:00:45 INFO hdfs.DFSClient: Abandoning
 BP-1714253233-10.180.215.105-1403176367942:blk_1073741833_1009
 14/06/19 15:00:46 INFO hdfs.DFSClient: Excluding datanode
 10.180.150.66:50010
 14/06/19 15:00:46 WARN hdfs.DFSClient: DataStreamer Exception

 Its able to talk to Resource Manager
 Then it puts the example.jar file to HDFS and it fails. Its trying to
 write to datanode. I verified that 50010 port is accessible through local
 machine. Any idea whats the issue here ?
 One thing thats suspicious is */10.180.150.66:50010
 http://10.180.150.66:50010 - it looks like its trying to connect using
 private IP. If so, how can I resolve this to use public IP.*

 Thanks
 Praveen






Getting started : Spark on YARN issue

2014-06-19 Thread Praveen Seluka
I am trying to run Spark on YARN. I have a hadoop 2.2 cluster (YARN  +
HDFS) in EC2. Then, I compiled Spark using Maven with 2.2 hadoop profiles.
Now am trying to run the example Spark job . (In Yarn-cluster mode).

From my *local machine. *I have setup HADOOP_CONF_DIR environment variable
correctly.

➜  spark git:(master) ✗ /bin/bash -c ./bin/spark-submit --class
org.apache.spark.examples.SparkPi --master yarn-cluster --num-executors 2
--driver-memory 2g --executor-memory 2g --executor-cores 1
examples/target/scala-2.10/spark-examples_*.jar 10
14/06/19 14:59:39 WARN util.NativeCodeLoader: Unable to load native-hadoop
library for your platform... using builtin-java classes where applicable
14/06/19 14:59:39 INFO client.RMProxy: Connecting to ResourceManager at
ec2-54-242-244-250.compute-1.amazonaws.com/54.242.244.250:8050
14/06/19 14:59:41 INFO yarn.Client: Got Cluster metric info from
ApplicationsManager (ASM), number of NodeManagers: 1
14/06/19 14:59:41 INFO yarn.Client: Queue info ... queueName: default,
queueCurrentCapacity: 0.0, queueMaxCapacity: 1.0,
  queueApplicationCount = 0, queueChildQueueCount = 0
14/06/19 14:59:41 INFO yarn.Client: Max mem capabililty of a single
resource in this cluster 12288
14/06/19 14:59:41 INFO yarn.Client: Preparing Local resources
14/06/19 14:59:42 WARN hdfs.BlockReaderLocal: The short-circuit local reads
feature cannot be used because libhadoop cannot be loaded.
14/06/19 14:59:43 INFO yarn.Client: Uploading
file:/home/rgupta/awesome/spark/examples/target/scala-2.10/spark-examples_2.10-1.0.0-SNAPSHOT.jar
to hdfs://
ec2-54-242-244-250.compute-1.amazonaws.com:8020/user/rgupta/.sparkStaging/application_1403176373037_0009/spark-examples_2.10-1.0.0-SNAPSHOT.jar
14/06/19 15:00:45 INFO hdfs.DFSClient: Exception in createBlockOutputStream
org.apache.hadoop.net.ConnectTimeoutException: 6 millis timeout while
waiting for channel to be ready for connect. ch :
java.nio.channels.SocketChannel[connection-pending remote=/
10.180.150.66:50010]
at org.apache.hadoop.net.NetUtils.connect(NetUtils.java:532)
at
org.apache.hadoop.hdfs.DFSOutputStream.createSocketForPipeline(DFSOutputStream.java:1305)
at
org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.createBlockOutputStream(DFSOutputStream.java:1128)
at
org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.nextBlockOutputStream(DFSOutputStream.java:1088)
at
org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:514)
14/06/19 15:00:45 INFO hdfs.DFSClient: Abandoning
BP-1714253233-10.180.215.105-1403176367942:blk_1073741833_1009
14/06/19 15:00:46 INFO hdfs.DFSClient: Excluding datanode
10.180.150.66:50010
14/06/19 15:00:46 WARN hdfs.DFSClient: DataStreamer Exception

Its able to talk to Resource Manager
Then it puts the example.jar file to HDFS and it fails. Its trying to write
to datanode. I verified that 50010 port is accessible through local
machine. Any idea whats the issue here ?
One thing thats suspicious is */10.180.150.66:50010
http://10.180.150.66:50010 - it looks like its trying to connect using
private IP. If so, how can I resolve this to use public IP.*

Thanks
Praveen