It probably uses hashcode too so make sure those two methods are in sync Sent from my mobile phone On Jan 3, 2014 3:26 AM, "Archit Thakur" <[email protected]> wrote:
> I realized my mistake as soon as I posted it. I actually meant groupByKey > not sortedByKey. And Yeah you are right, it is consuming 6 Hdfs blocks. > > The issue I am facing is When I do a groupBy, it reduces the no. of unique > keys in the Rdd and modify them also. > > For eg: > > I have a custom DS. > > Below is the set of unique keys in the baseRdd > > (40^0^0[2^1380^0]6[2[17^70.197.1.165:4554][2^WP]] > (40^0^0[2^1380^0]6[2[18^71.68.211.98:62510][2^WP]] > (40^0^0[2^1380^1383836478]6[2[18^96.27.139.59:49412][2^WP]] > (40^0^0[2^1380^1383837196]6[2[17^70.197.1.165:4547][2^WP]] > (40^0^0[2^1380^1383837276]6[2[19^70.193.193.108:5877][2^WP]] > (40^0^0[2^1380^1383838476]6[2[18^71.68.211.98:62498][2^WP]] > (40^0^0[2^1380^1383838564]6[2[18^71.68.211.98:62508][2^WP]] > (40^0^0[2^1380^1383839099]6[2[19^71.75.156.224:52842][2^WP]] > (40^0^0[2^1380^1383839119]6[2[19^128.211.178.8:33448][2^WP]] > (40^0^0[2^1380^1383839294]6[2[19^71.75.156.224:36652][2^WP]] > (40^0^0[2^1380^1383839651]6[2[18^69.133.71.57:58320][2^WP]] > (43^1383836400^0[2^1380^1]6[2[5^Event][2^WP]] > > > and when I do a groupBy on the Rdd, it gives me: > > (40^0^0[2^1380^0]6[2[17^70.197.1.165:4554][2^WP]] > (40^0^0[2^1380^0]6[2[18^96.27.139.59:49412][2^WP]] > (40^0^0[2^1380^1383836478]6[2[18^96.27.139.59:49412][2^WP]] > (40^0^0[2^1380^1383837196]6[2[17^70.197.1.165:4547][2^WP]] > (40^0^0[2^1380^1383837276]6[2[19^70.193.193.108:5877][2^WP]] > (40^0^0[2^1380^1383838564]6[2[18^71.68.211.98:62508][2^WP]] > (40^0^0[2^1380^1383839099]6[2[19^71.75.156.224:52842][2^WP]] > (43^1383836400^0[2^1380^1]6[2[5^Event][2^WP]] > > > Not only it has reduced the no. of keys but also have modified it. > > groupBy operation only uses equals method of the Key class (to check the > equality of the key), right? > > > On Fri, Jan 3, 2014 at 4:02 PM, Andrew Ash <[email protected]> wrote: > >> Hi Archit, >> >> A partition is a chunk of data about the size of an HDFS block, not that >> of a single key. Because every partition is tracked individually and each >> is processed in a task on one CPU core, having massive numbers of them >> causes slowdowns in the scheduler and elsewhere in the system. About how >> much data are you looking at here? If the source of your RDDs are in HDFS, >> then how many HDFS blocks are required to hold the 6 RDDs? >> >> Andrew >> >> >> On Fri, Jan 3, 2014 at 5:12 AM, Archit Thakur >> <[email protected]>wrote: >> >>> I saw Code of sortByKey: >>> >>> def sortByKey(ascending: Boolean = true, numPartitions: Int = >>> self.partitions.size): RDD[P] = { >>> >>> It makes numPartitions = self.partitions.size which comes from >>> getPartitions method of RDD, if you dont specify it explicitly. >>> >>> In this case it will be rdd which will be created by step (3rd). Isn't >>> it wrong? >>> >>> >>> On Fri, Jan 3, 2014 at 3:09 PM, Archit Thakur <[email protected] >>> > wrote: >>> >>>> Hi, >>>> >>>> I have 6 sequence files as input to spark code. >>>> What I am doing is: >>>> 1. Create 6 individual RDD's out of them. >>>> 2. Union them. >>>> 3. Then Some Mapping. >>>> 4. Count no of ele in RDD. >>>> 5. Then SortByKey. >>>> >>>> Now, If I see logging: >>>> >>>> 14/01/03 09:04:04 INFO scheduler.DAGScheduler: Got job 0 (count at >>>> PreBaseCubeCreator.scala:96) with 6 output partitions (allowLocal=false) >>>> >>>> This is count step (4th) >>>> >>>> Doubt 1: Why 6 output partitions? >>>> >>>> It then prints progress for each of them >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> *14/01/03 09:04:05 INFO >>>> storage.BlockManagerMasterActor$BlockManagerInfo: Registering block manager >>>> guavus-000392:52345 with 47.4 GB RAM 14/01/03 09:04:08 INFO >>>> cluster.ClusterTaskSetManager: Finished TID 5 in 3938 ms on guavus-000392 >>>> (progress: 1/6)14/01/03 09:04:08 INFO scheduler.DAGScheduler: Completed >>>> ResultTask(0, 5)14/01/03 09:04:09 INFO cluster.ClusterTaskSetManager: >>>> Finished TID 4 in 4211 ms on guavus-000392 (progress: 2/6) 14/01/03 >>>> 09:04:09 INFO scheduler.DAGScheduler: Completed ResultTask(0, 4)14/01/03 >>>> 09:04:09 INFO cluster.ClusterTaskSetManager: Finished TID 1 in 4221 ms on >>>> guavus-000392 (progress: 3/6)14/01/03 09:04:09 INFO scheduler.DAGScheduler: >>>> Completed ResultTask(0, 1) 14/01/03 09:04:10 INFO >>>> cluster.ClusterTaskSetManager: Finished TID 0 in 5581 ms on guavus-000392 >>>> (progress: 4/6)14/01/03 09:04:10 INFO scheduler.DAGScheduler: Completed >>>> ResultTask(0, 0)14/01/03 09:04:12 INFO cluster.ClusterTaskSetManager: >>>> Finished TID 3 in 7830 ms on guavus-000392 (progress: 5/6) 14/01/03 >>>> 09:04:12 INFO scheduler.DAGScheduler: Completed ResultTask(0, 3)14/01/03 >>>> 09:04:20 INFO cluster.ClusterTaskSetManager: Finished TID 2 in 15786 ms on >>>> guavus-000392 (progress: 6/6)14/01/03 09:04:20 INFO scheduler.DAGScheduler: >>>> Completed ResultTask(0, 2) 14/01/03 09:04:20 INFO scheduler.DAGScheduler: >>>> Stage 0 (count at PreBaseCubeCreator.scala:96) finished in 16.320 s14/01/03 >>>> 09:04:20 INFO cluster.ClusterScheduler: Remove TaskSet 0.0 from >>>> pool14/01/03 09:04:20 INFO spark.SparkContext: Job finished: count* >>>> >>>> After that when it goes to sortByKey: >>>> >>>> *14/01/03 09:04:33 INFO scheduler.DAGScheduler: Got job 2 (sortByKey at >>>> PreBaseCubeCreator.scala:98) with 6 output partitions (allowLocal=false)* >>>> >>>> However, It should have been n output partitions, where n = unique no. >>>> of keys in RDD. Isn't it? >>>> >>>> Thanks and Regards, >>>> Archit Thakur. >>>> >>> >>> >> >
