Mike, We tried that. This map task is actually part of a larger set of operations. I pointed out this map task since it involves partitionBy() and we always use partitionBy() whenever partition-unaware shuffle operations are performed (such as distinct). We in fact do not notice a change in the distribution after several unrelated stages are executed and a significant time has passed (nearly 10-15 minutes).
I agree. We are not looking for partitions to go to specific nodes and nor do we expect a uniform distribution of keys across the cluster. There will be a skew. But it cannot be that all the data is on one node and nothing on the other and no, the keys are not the same. They vary from 1 to around 55000 (integers). What makes this strange is that it seems to work fine on the spark shell (REPL). Regards, Raghava. On Mon, Apr 18, 2016 at 1:14 AM, Mike Hynes <91m...@gmail.com> wrote: > A HashPartitioner will indeed partition based on the key, but you > cannot know on *which* node that key will appear. Again, the RDD > partitions will not necessarily be distributed evenly across your > nodes because of the greedy scheduling of the first wave of tasks, > particularly if those tasks have durations less than the initial > executor delay. I recommend you look at your logs to verify if this is > happening to you. > > Mike > > On 4/18/16, Anuj Kumar <anujs...@gmail.com> wrote: > > Good point Mike +1 > > > > On Mon, Apr 18, 2016 at 9:47 AM, Mike Hynes <91m...@gmail.com> wrote: > > > >> When submitting a job with spark-submit, I've observed delays (up to > >> 1--2 seconds) for the executors to respond to the driver in order to > >> receive tasks in the first stage. The delay does not persist once the > >> executors have been synchronized. > >> > >> When the tasks are very short, as may be your case (relatively small > >> data and a simple map task like you have described), the 8 tasks in > >> your stage may be allocated to only 1 executor in 2 waves of 4, since > >> the second executor won't have responded to the master before the > >> first 4 tasks on the first executor have completed. > >> > >> To see if this is the cause in your particular case, you could try the > >> following to confirm: > >> 1. Examine the starting times of the tasks alongside their > >> executor > >> 2. Make a "dummy" stage execute before your real stages to > >> synchronize the executors by creating and materializing any random RDD > >> 3. Make the tasks longer, i.e. with some silly computational > >> work. > >> > >> Mike > >> > >> > >> On 4/17/16, Raghava Mutharaju <m.vijayaragh...@gmail.com> wrote: > >> > Yes its the same data. > >> > > >> > 1) The number of partitions are the same (8, which is an argument to > >> > the > >> > HashPartitioner). In the first case, these partitions are spread > across > >> > both the worker nodes. In the second case, all the partitions are on > >> > the > >> > same node. > >> > 2) What resources would be of interest here? Scala shell takes the > >> default > >> > parameters since we use "bin/spark-shell --master <master-URL>" to run > >> the > >> > scala-shell. For the scala program, we do set some configuration > >> > options > >> > such as driver memory (12GB), parallelism is set to 8 and we use Kryo > >> > serializer. > >> > > >> > We are running this on Azure D3-v2 machines which have 4 cores and > 14GB > >> > RAM.1 executor runs on each worker node. Following configuration > >> > options > >> > are set for the scala program -- perhaps we should move it to the > spark > >> > config file. > >> > > >> > Driver memory and executor memory are set to 12GB > >> > parallelism is set to 8 > >> > Kryo serializer is used > >> > Number of retainedJobs and retainedStages has been increased to check > >> them > >> > in the UI. > >> > > >> > What information regarding Spark Context would be of interest here? > >> > > >> > Regards, > >> > Raghava. > >> > > >> > On Sun, Apr 17, 2016 at 10:54 PM, Anuj Kumar <anujs...@gmail.com> > >> > wrote: > >> > > >> >> If the data file is same then it should have similar distribution of > >> >> keys. > >> >> Few queries- > >> >> > >> >> 1. Did you compare the number of partitions in both the cases? > >> >> 2. Did you compare the resource allocation for Spark Shell vs Scala > >> >> Program being submitted? > >> >> > >> >> Also, can you please share the details of Spark Context, Environment > >> >> and > >> >> Executors when you run via Scala program? > >> >> > >> >> On Mon, Apr 18, 2016 at 4:41 AM, Raghava Mutharaju < > >> >> m.vijayaragh...@gmail.com> wrote: > >> >> > >> >>> Hello All, > >> >>> > >> >>> We are using HashPartitioner in the following way on a 3 node > cluster > >> (1 > >> >>> master and 2 worker nodes). > >> >>> > >> >>> val u = > >> >>> sc.textFile("hdfs://x.x.x.x:8020/user/azureuser/s.txt").map[(Int, > >> >>> Int)](line => { line.split("\\|") match { case Array(x, y) => > >> >>> (y.toInt, > >> >>> x.toInt) } }).partitionBy(new > >> HashPartitioner(8)).setName("u").persist() > >> >>> > >> >>> u.count() > >> >>> > >> >>> If we run this from the spark shell, the data (52 MB) is split > across > >> >>> the > >> >>> two worker nodes. But if we put this in a scala program and run it, > >> then > >> >>> all the data goes to only one node. We have run it multiple times, > >> >>> but > >> >>> this > >> >>> behavior does not change. This seems strange. > >> >>> > >> >>> Is there some problem with the way we use HashPartitioner? > >> >>> > >> >>> Thanks in advance. > >> >>> > >> >>> Regards, > >> >>> Raghava. > >> >>> > >> >> > >> >> > >> > > >> > > >> > -- > >> > Regards, > >> > Raghava > >> > http://raghavam.github.io > >> > > >> > >> > >> -- > >> Thanks, > >> Mike > >> > > > > > -- > Thanks, > Mike > -- Regards, Raghava http://raghavam.github.io