Re: Need to order iterator values in spark dataframe

2020-04-01 Thread Ranjan, Abhinav

Enrico,

The below solution works but there is a little glitch.

It is working fine in spark-shell but failing for *_/skewed keys/_* 
while doing a spark-submit.


while looking into the execution plan, the partitioning value is same 
for both repartition and groupByKey and is driven by the value 
"spark.sql.shuffle.partitions"


like: Exchange hashpartitioning(value#143, 200)

Any ideas on why is skewed keys giving wrong output while the same code 
giving correct in spark-shell?



--Abhinav

On 26/03/20 10:54 pm, Enrico Minack wrote:


Abhinav,

you can repartition by your key, then sortWithinPartition, and the 
groupByKey. Since data are already hash-partitioned by key, Spark 
should not shuffle the data hence change the sort wihtin each partition:


ds.repartition($"key").sortWithinPartitions($"code").groupBy($"key")

Enrico


Am 26.03.20 um 17:53 schrieb Ranjan, Abhinav:


Hi,

I have a dataframe which has data like:

key                         |    code    |    code_value
1                            |    c1        |    11
1                            |    c2        |    12
1                            |    c2        |    9
1                            |    c3        |    12
1                            |    c2        |    13
1                            |    c2        |    14
1                            |    c4        |    12
1                            |    c2        |    15
1                            |    c1        |    12


I need to group the data based on key and then apply some custom 
logic on every of the value I got by grouping. So I did this:


lets suppose it is in a dataframe df.

*case class key_class(key: string, code: string, code_value: string)*


df
.as[key_class]
.groupByKey(_.key)
.mapGroups {
  (x, groupedValues) =>
    val status = groupedValues.map(row => {
  // do some custom logic on row
  ("SUCCESS")
    }).toList

}.toDF("status")


The issue with above approach is the values I get after applying 
groupByKey are not sorted/ordered. I want the values to be sorted by 
the column 'code'.


There is a way to do this:

1. get them in a list and then apply sort ==> this will result in OOM 
if the iterartor is too big.


2. I think some how to apply the secondary sort, but problem with 
that approach is I have to keep track of the key change.


3. sortWithinPartitions cannot be applied because groupBy will mess 
up the order.


4. Another approach is:

df
.as[key_class]
.sort("key").sort("code")
.map {
 // do stuff here
}

but here also I have to keep track of the key change within map 
function, and sometimes this also overflows if the keys are skewed.



_/*So is there any way in which I can get the values sorted after 
grouping them by a key.??*/_


_/*
*/_

_/*Thanks,*/_

_/*Abhinav
*/_





Re: Need to order iterator values in spark dataframe

2020-03-26 Thread Zahid Rahman
I believe I logged an issue first and I should get a response first.
I was ignored.

Regards

Did you know there are 8 million people in kashmir locked up in their homes
by the Hindutwa (Indians)
for 8 months.
Now the whole planet is locked up in their homes.
You didn't take notice of them either.
you ignored them.

Backbutton.co.uk
¯\_(ツ)_/¯
♡۶Java♡۶RMI ♡۶
Make Use Method {MUM}
makeuse.org



On Thu, 26 Mar 2020 at 17:24, Enrico Minack  wrote:

> Abhinav,
>
> you can repartition by your key, then sortWithinPartition, and the
> groupByKey. Since data are already hash-partitioned by key, Spark should
> not shuffle the data hence change the sort wihtin each partition:
>
> ds.repartition($"key").sortWithinPartitions($"code").groupBy($"key")
>
> Enrico
>
> Am 26.03.20 um 17:53 schrieb Ranjan, Abhinav:
>
> Hi,
>
> I have a dataframe which has data like:
>
> key |code|code_value
> 1|c1|11
> 1|c2|12
> 1|c2|9
> 1|c3|12
> 1|c2|13
> 1|c2|14
> 1|c4|12
> 1|c2|15
> 1|c1|12
>
>
> I need to group the data based on key and then apply some custom logic on
> every of the value I got by grouping. So I did this:
>
> lets suppose it is in a dataframe df.
>
> *case class key_class(key: string, code: string, code_value: string)*
>
>
> df
> .as[key_class]
> .groupByKey(_.key)
> .mapGroups {
>   (x, groupedValues) =>
> val status = groupedValues.map(row => {
>   // do some custom logic on row
>   ("SUCCESS")
> }).toList
>
> }.toDF("status")
>
>
> The issue with above approach is the values I get after applying
> groupByKey are not sorted/ordered. I want the values to be sorted by the
> column 'code'.
>
> There is a way to do this:
>
> 1. get them in a list and then apply sort ==> this will result in OOM if
> the iterartor is too big.
>
> 2. I think some how to apply the secondary sort, but problem with that
> approach is I have to keep track of the key change.
>
> 3. sortWithinPartitions cannot be applied because groupBy will mess up the
> order.
>
> 4. Another approach is:
>
> df
> .as[key_class]
> .sort("key").sort("code")
> .map {
>  // do stuff here
> }
>
> but here also I have to keep track of the key change within map function,
> and sometimes this also overflows if the keys are skewed.
>
>
>
> *So is there any way in which I can get the values sorted after grouping
> them by a key.??*
>
>
> *Thanks,*
>
>
> *Abhinav *
>
>
>


Re: Need to order iterator values in spark dataframe

2020-03-26 Thread Enrico Minack

Abhinav,

you can repartition by your key, then sortWithinPartition, and the 
groupByKey. Since data are already hash-partitioned by key, Spark should 
not shuffle the data hence change the sort wihtin each partition:


ds.repartition($"key").sortWithinPartitions($"code").groupBy($"key")

Enrico


Am 26.03.20 um 17:53 schrieb Ranjan, Abhinav:


Hi,

I have a dataframe which has data like:

key                         |    code    |    code_value
1                            |    c1        |    11
1                            |    c2        |    12
1                            |    c2        |    9
1                            |    c3        |    12
1                            |    c2        |    13
1                            |    c2        |    14
1                            |    c4        |    12
1                            |    c2        |    15
1                            |    c1        |    12


I need to group the data based on key and then apply some custom logic 
on every of the value I got by grouping. So I did this:


lets suppose it is in a dataframe df.

*case class key_class(key: string, code: string, code_value: string)*


df
.as[key_class]
.groupByKey(_.key)
.mapGroups {
  (x, groupedValues) =>
    val status = groupedValues.map(row => {
  // do some custom logic on row
  ("SUCCESS")
    }).toList

}.toDF("status")


The issue with above approach is the values I get after applying 
groupByKey are not sorted/ordered. I want the values to be sorted by 
the column 'code'.


There is a way to do this:

1. get them in a list and then apply sort ==> this will result in OOM 
if the iterartor is too big.


2. I think some how to apply the secondary sort, but problem with that 
approach is I have to keep track of the key change.


3. sortWithinPartitions cannot be applied because groupBy will mess up 
the order.


4. Another approach is:

df
.as[key_class]
.sort("key").sort("code")
.map {
 // do stuff here
}

but here also I have to keep track of the key change within map 
function, and sometimes this also overflows if the keys are skewed.



_/*So is there any way in which I can get the values sorted after 
grouping them by a key.??*/_


_/*
*/_

_/*Thanks,*/_

_/*Abhinav
*/_





Need to order iterator values in spark dataframe

2020-03-26 Thread Ranjan, Abhinav

Hi,

I have a dataframe which has data like:

key                         |    code    |    code_value
1                            |    c1        |    11
1                            |    c2        |    12
1                            |    c2        |    9
1                            |    c3        |    12
1                            |    c2        |    13
1                            |    c2        |    14
1                            |    c4        |    12
1                            |    c2        |    15
1                            |    c1        |    12


I need to group the data based on key and then apply some custom logic 
on every of the value I got by grouping. So I did this:


lets suppose it is in a dataframe df.

*case class key_class(key: string, code: string, code_value: string)*


df
.as[key_class]
.groupByKey(_.key)
.mapGroups {
  (x, groupedValues) =>
    val status = groupedValues.map(row => {
  // do some custom logic on row
  ("SUCCESS")
    }).toList

}.toDF("status")


The issue with above approach is the values I get after applying 
groupByKey are not sorted/ordered. I want the values to be sorted by the 
column 'code'.


There is a way to do this:

1. get them in a list and then apply sort ==> this will result in OOM if 
the iterartor is too big.


2. I think some how to apply the secondary sort, but problem with that 
approach is I have to keep track of the key change.


3. sortWithinPartitions cannot be applied because groupBy will mess up 
the order.


4. Another approach is:

df
.as[key_class]
.sort("key").sort("code")
.map {
 // do stuff here
}

but here also I have to keep track of the key change within map 
function, and sometimes this also overflows if the keys are skewed.



_/*So is there any way in which I can get the values sorted after 
grouping them by a key.??*/_


_/*
*/_

_/*Thanks,*/_

_/*Abhinav
*/_