Re: Problem with flatmap.

2014-01-30 Thread Archit Thakur
10:17 AM, Evan R. Sparks wrote: > >> Could it be that you have the same records that you get back from >> flatMap, just in a different order? >> >> >> On Thu, Jan 30, 2014 at 1:05 AM, Archit Thakur > > wrote: >> >>> Needless to say, it works fine

Re: Problem with flatmap.

2014-01-30 Thread Archit Thakur
Yes, Could be possible. Why does that matter? On Thu, Jan 30, 2014 at 11:47 PM, Evan R. Sparks wrote: > Could it be that you have the same records that you get back from flatMap, > just in a different order? > > > On Thu, Jan 30, 2014 at 1:05 AM, Archit Thakur > wrote: > &

Re: Problem with flatmap.

2014-01-30 Thread Archit Thakur
Needless to say, it works fine with int/string(primitive) type. On Wed, Jan 29, 2014 at 2:04 PM, Archit Thakur wrote: > Hi, > > I am facing a general problem with flatmap operation on rdd. > > I am doing > > MyRdd.flatmap(func(_)) > MyRdd.saveAsTextFile(..) > >

Problem with flatmap.

2014-01-29 Thread Archit Thakur
it has created, it differs. Only the no. of records are same, but the actual records in the file differs from one in the logs. Does Spark modifies keys/values in between? What other operations does it perform with Key or Value? Thanks and Regards, Archit Thakur.

Problems while moving from 0.8.0 to 0.8.1

2014-01-27 Thread Archit Thakur
to keep compiler happy } Other info: 1. My code works fine with 0.8.0. 2. I used groupByKey transformation. 3. I replaces the Aggregator.scala with the older version(0.8.0), compiled it, Restarted Master and Worker, It ran successfully. Thanks and Regards, Archit Thakur.

GroupByKey implementation.

2014-01-26 Thread Archit Thakur
Hi, Below is the implementation for GroupByKey. (v, 0.8.0) def groupByKey(partitioner: Partitioner): RDD[(K, Seq[V])] = { def createCombiner(v: V) = ArrayBuffer(v) def mergeValue(buf: ArrayBuffer[V], v: V) = buf += v val bufs = combineByKey[ArrayBuffer[V]]( createCombiner _, me

Moving from java.util.HashMap to org.apache.spark.util.AppendOnlyMap.scala

2014-01-26 Thread Archit Thakur
HashMap/ AppendOnlyMap. Thanks and Regards, Archit Thakur.

Re: how to set SPARK_WORKER_INSTANCES and SPARK_WORKER_CORES otpimally

2014-01-26 Thread Archit Thakur
s 1 worker while each worker has eight cores. Let me know, if any doubts. Thanks and Regards, Archit Thakur. On Sun, Jan 26, 2014 at 5:58 AM, Chen Jin wrote: > Hi all, > > From spark document, we can set the number of workers by > SPARK_WORKER_INSTANCES and the max number of cores t

Re: ClassNotFoundException with simple Spark job on cluster

2014-01-26 Thread Archit Thakur
The very first thing that comes to my mind after reading your problem is you need to add your jar to the list in the 4th argument of SparkContext as is told by @Nhan above. Let me know, if it doesn't resolve your problem. Thanks and Regards, Archit Thakur. On Sun, Jan 26, 2014 at 8:33 AM,

Re: Division of work between master, worker, executor and driver

2014-01-25 Thread Archit Thakur
On Fri, Jan 24, 2014 at 11:29 PM, Manoj Samel wrote: > On cluster with HDFS + Spark (in standalone deploy mode), there is a > master node + 4 worker nodes. When a spark-shell connects to master, it > creates 4 executor JVMs on each of the 4 worker nodes. > No, It creates 1 (4 in total) executor J

Re: log4j question

2014-01-21 Thread Archit Thakur
Did you change log4j.rootCategory=INFO, console line in your file. Or you only moved the file removing template extension? Coz if you didn't by this log4j configuration, it'll output the log to console and not supress it. Like Khanderao said do, log4j.rootCategory=OFF If you don't want any log

Re: Controlling hadoop block size

2014-01-17 Thread Archit Thakur
On Thu, Jan 16, 2014 at 11:40 PM, Aureliano Buendia wrote: > > > > On Thu, Jan 16, 2014 at 11:39 AM, Archit Thakur > wrote: > >> The command >> >> val rdd1 = sc.parallelize(Range(0, N, 1)) // N ~ 1e3 >> val rdd2 = rdd1.cartesian(rdd1) >> >

Re: How does shuffle work in spark ?

2014-01-16 Thread Archit Thakur
For any shuffle operation, groupByKey, etc. it does write map output to disk before performing the reduce task on the data. On Thu, Jan 16, 2014 at 4:03 PM, suman bharadwaj wrote: > Hi, > > I'm new to spark. And wanted to understand more on how shuffle works in > spark > > In Hadoop map reduce,

Re: Controlling hadoop block size

2014-01-16 Thread Archit Thakur
= N/64 and distribute the data equally (64MB). and perform rdd.operationToTransformItInto_CustmRDD. PS: There might be an operation/RDD that already does the same, I am not aware of it as of now. Please let me know, if you are able to figure it out. Thanks and Regards, Archit Thakur. On Tue, Jan 1

Re: WARN ClusterScheduler: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient memory

2014-01-14 Thread Archit Thakur
How much memory you are setting for exector JVM. This problem comes when either there is a communication problem between Master/Worker. or you do not have any memory left. Eg, you specified 75G for your executor and your machine has a memory of 70G. On Thu, Jan 9, 2014 at 11:27 PM, Aureliano Buen

Re: Getting java.netUnknownHostException

2014-01-14 Thread Archit Thakur
Try running ./bin/start-slave.sh 1 spark://A-IP:PORT. Thx, Archit_Thakur. On Sat, Jan 11, 2014 at 7:18 AM, Khanderao kand wrote: > For "java.netUnknownHostException" Did you check something basic that you > are able to connect to A from B? and checked /etc/hosts? > > > On Fri, Jan 10, 2014 at 7

Re: Controlling hadoop block size

2014-01-14 Thread Archit Thakur
Hadoop block size decreased, do you mean HDFS block size? That is not possible. Block size of HDFS is never affected by your spark jobs. "For a big number of tasks, I get a very high number of 1 MB files generated by saveAsSequenceFile()." What do you mean by "big number of tasks" No. of files

Re: Akka error kills workers in standalone mode

2014-01-14 Thread Archit Thakur
You are getting a NullPointerException because of which it gets failed. It runs at local means you are ignoring a fact that many of the classes wont be initialized on the worker executor node when you might have initialized them in your master executor JVM. To check = Does your code works when you

Re: is it forgotten to document how to set SPARK_WORKER_DIR?

2014-01-07 Thread Archit Thakur
What do you mean by worker dir? On Tue, Jan 7, 2014 at 11:43 AM, Nan Zhu wrote: > Hi, all > > I’m trying to change my worker dir to a mounted disk with larger space > > but I found that no document telling me how to do this, > > I have to check the source code and found the following lines > >

Re: Will JVM be reused?

2014-01-05 Thread Archit Thakur
successfully. I know I can do it by reading it on master and then broadcasting, but there is a reason I want to do it this way. On Sun, Jan 5, 2014 at 1:43 AM, Archit Thakur wrote: > ya ya had got that. Thx. > > > On Sun, Jan 5, 2014 at 1:41 AM, Roshan Nair wrote: > >> The dri

Re: debug standalone Spark jobs?

2014-01-05 Thread Archit Thakur
You can run your spark application locally by setting SPARK_MASTER="local" and then debug the launched jvm in your IDE. On Sun, Jan 5, 2014 at 9:04 PM, Nan Zhu wrote: > Ah, yes, I think application logs really help > > Thank you > > -- > Nan Zhu > > On Sunday, January 5, 2014 at 10:13 AM, Srir

Re: Will JVM be reused?

2014-01-04 Thread Archit Thakur
ya ya had got that. Thx. On Sun, Jan 5, 2014 at 1:41 AM, Roshan Nair wrote: > The driver jvm is the jvm in which you create the sparkContext and launch > your job. Its different from the master and worker daemons. > > Roshan > On Jan 5, 2014 1:37 AM, "Archit Thakur&q

Re: Will JVM be reused?

2014-01-04 Thread Archit Thakur
Oh, you meant main driver. Yes, correct. On Sun, Jan 5, 2014 at 1:36 AM, Archit Thakur wrote: > Yeah, I believed that too. > > The last being the jvm in which your driver runs.??? Isn't it in the 3 > worker daemon, we have already considered. > > > On Sun, Jan 5, 20

Re: Will JVM be reused?

2014-01-04 Thread Archit Thakur
nd reload an RDD into memory between stages, >> which is why spark won't do that. >> >> Roshan >> On Jan 5, 2014 1:06 AM, "Archit Thakur" >> wrote: >> >>> A JVM reuse doubt. >>> Lets say I have a job which has 5 stages: >>> Each

Will JVM be reused?

2014-01-04 Thread Archit Thakur
A JVM reuse doubt. Lets say I have a job which has 5 stages: Each stage has 10 tasks(10 partitions) Each task has 3 transformation. My Cluster is size 4 (1 Master, 3 Workers), How many JVMs will be launched? 1-Master Daemon 3-Worker Daemon JVM = 1+3+10*3*5 (where at a time 10 will be executed para

Re: Issue with sortByKey.

2014-01-03 Thread Archit Thakur
data are you looking at here? If the source of your RDDs are in HDFS, > then how many HDFS blocks are required to hold the 6 RDDs? > > Andrew > > > On Fri, Jan 3, 2014 at 5:12 AM, Archit Thakur > wrote: > >> I saw Code of sortByKey: >> >&

Re: Issue with sortByKey.

2014-01-03 Thread Archit Thakur
created by step (3rd). Isn't it wrong? On Fri, Jan 3, 2014 at 3:09 PM, Archit Thakur wrote: > Hi, > > I have 6 sequence files as input to spark code. > What I am doing is: > 1. Create 6 individual RDD's out of them. > 2. Union them. > 3. Then Some Mapping. > 4.

Issue with sortByKey.

2014-01-03 Thread Archit Thakur
ey at PreBaseCubeCreator.scala:98) with 6 output partitions (allowLocal=false)* However, It should have been n output partitions, where n = unique no. of keys in RDD. Isn't it? Thanks and Regards, Archit Thakur.

Re: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient memory

2014-01-02 Thread Archit Thakur
he.org/docs/latest/configuration.html > > > 2014/1/2 Archit Thakur > >> Need not mention Workers could be seen on the UI. >> >> >> On Thu, Jan 2, 2014 at 5:01 PM, Archit Thakur >> wrote: >> >>> Hi, >>> >>> I have some 5G of data.

Re: Spark context jar confusions

2014-01-02 Thread Archit Thakur
n one of the slave node? Eg, Object Z. which is present the fat jar and is accessed in the map function(which is executed distributedly?). Won't it be accessible(Coz it is at compile time) ? It usually is, Isn't it? On Thu, Jan 2, 2014 at 6:02 PM, Archit Thakur wrote: > Aureliano,

Re: Spark context jar confusions

2014-01-02 Thread Archit Thakur
Aureliano, It doesn't matter actually. specifying "local" as your spark master only does is It uses the single JVM to run whole application. Making a cluster and then specifying "spark://localhost:7077" runs it on a set of machines. Running spark in lcoal mode will be helpful for debugging purposes

Re: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient memory

2014-01-02 Thread Archit Thakur
Need not mention Workers could be seen on the UI. On Thu, Jan 2, 2014 at 5:01 PM, Archit Thakur wrote: > Hi, > > I have some 5G of data. distributed in some 597 sequence files. My > application does a flatmap on the union of all rdd's created from > individual files. T

Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient memory

2014-01-02 Thread Archit Thakur
Hi, I have some 5G of data. distributed in some 597 sequence files. My application does a flatmap on the union of all rdd's created from individual files. The flatmap statement throws java.lang.stackOverflowError with the default stack size. I increased the stack size to 1g (both system and jvm).

Re: Not able to understand Exception.

2014-01-02 Thread Archit Thakur
Yes, I am using My Custom Data Structures (for Key and Value) and have registered different serializers with Kryo by kryo.register(classOf[MyClass], MyCustomSerializerInstance); Thanks and Regards, Archit Thakur. On Thu, Jan 2, 2014 at 4:26 AM, Christopher Nguyen wrote: > Archit, t

Not able to understand Exception.

2014-01-01 Thread Archit Thakur
I have recently moved to Kryo for serialization to get better performance. Have written some of the serializers for my custom DS. What could below exception be about: (I dont see any of my code line in the stack trace) java.lang.ArrayIndexOutOfBoundsException: -2 at java.util.ArrayList.get

Re: NPE while reading broadcast variable.

2013-12-31 Thread Archit Thakur
I understood the problem. The object ClusterVariableEnumeration (or the class ClusterVariableEnumeration$) will be again loaded on Worker machine. It will again instantiate the class variables ( var CURRENT_EXECUTING). Which will now have their default values (null). Hence NPE. I guess, I need to

Re: NPE while reading broadcast variable.

2013-12-30 Thread Archit Thakur
I am still getting it. I googled and found a similar open problem on stackoverflow: http://stackoverflow.com/questions/17794664/accumulator-fails-on-cluster-works-locally . Thx, Archit_Thakur. On Mon, Dec 23, 2013 at 11:32 AM, Archit Thakur wrote: > Accessed it: > > val CURRENT_

ADD_JARS doubt.!!!!!

2013-12-22 Thread Archit Thakur
local (Something like Distributed Cache in Hadoop Mapreduce). What path would I read it from? Thanks and Regards, Archit Thakur.

Re: NPE while reading broadcast variable.

2013-12-22 Thread Archit Thakur
Accessed it: val CURRENT_EXECUTING = ClusterVariableEnumeration.CURRENT_EXECUTION.value.asInstanceOf[String] On Mon, Dec 23, 2013 at 11:27 AM, Archit Thakur wrote: > Hi, > > I am getting NPE while I access broadcast variable. > > I created an object: > > object Clust

NPE while reading broadcast variable.

2013-12-22 Thread Archit Thakur
ClusterVariableEnumeration.CURRENT_EXECUTION = sc.broadcast("value") and then in map function When I tried to access it it gave me NPE. Idea? Thanks and Regards, Archit Thakur.

Reading from multiple Input path into single Resilient Distributed dataset?

2013-12-16 Thread Archit Thakur
then join all.? but my real requirement is not to join all RDD but MERGE them, like appending 2nd to 1st and so on. What is the best way for this? Thanks and Regards, Archit Thakur.

Fwd: Key Class - NotSerializableException

2013-12-08 Thread Archit Thakur
sortByKey. So I made Collection as Serializable and now It was unable to find some method required for the static field of class Collection. Thanks and Regards, Archit Thakur. On Mon, Dec 9, 2013 at 11:38 AM, MLnick wrote: > Hi Archit > > Spark provides a convenience class for sequenc

Re: Key Class - NotSerializableException

2013-12-08 Thread Archit Thakur
) On Mon, Dec 9, 2013 at 11:21 AM, Archit Thakur wrote: > And Since sortByKey serializes the classes, I guess it has something to do > with Serialization thing. > > > On Mon, Dec 9, 2013 at 11:19 AM, Archit Thakur > wrote: > >> I did make the classes Serialized. But n

Re: Key Class - NotSerializableException

2013-12-08 Thread Archit Thakur
And Since sortByKey serializes the classes, I guess it has something to do with Serialization thing. On Mon, Dec 9, 2013 at 11:19 AM, Archit Thakur wrote: > I did make the classes Serialized. But now running the same command > sc.sequenceFile(file, classOf[Text], classOf[Text]).flatMa

Re: Key Class - NotSerializableException

2013-12-08 Thread Archit Thakur
when I run sc.sequenceFile(file, classOf[Text], classOf[Text]).flatMap(map_ func).count() it doesn't throw the error. Thanks and Regards, Archit Thakur. On Mon, Dec 9, 2013 at 10:48 AM, Patrick Wendell wrote: > It's because sorting serializes the data during the shuffle phase. >

Key Class - NotSerializableException

2013-12-08 Thread Archit Thakur
by flapMap. My question is Why does sortByKey require the Key/Value Classes to be Serialized.? Thanks and Regards, Archit Thakur.

Fwd: GroupingComparator in Spark.

2013-12-04 Thread Archit Thakur
for grouping(that which all records should go to single reducer corresponding to same key.)? Is there any way that could be achieved, wherein we can specify our own SortComparator and GroupingComparator. Thanks and Regards, Archit Thakur.