Re: RepartitionByKey Behavior

2018-06-26 Thread Chawla,Sumit
Thanks everyone.  As Nathan suggested,  I ended up collecting the distinct
keys first and then assigning Ids to each key explicitly.

Regards
Sumit Chawla


On Fri, Jun 22, 2018 at 7:29 AM, Nathan Kronenfeld <
nkronenfeld@uncharted.software> wrote:

> On Thu, Jun 21, 2018 at 4:51 PM, Chawla,Sumit 
 wrote:

> Hi
>
>  I have been trying to this simple operation.  I want to land all
> values with one key in same partition, and not have any different key in
> the same partition.  Is this possible?   I am getting b and c always
> getting mixed up in the same partition.
>
>
>
> I think you could do something approsimately like:
>
>  val keys = rdd.map(_.getKey).distinct.zipWithIndex
>  val numKey = keys.map(_._2).count
>  rdd.map(r => (r.getKey, r)).join(keys).partitionBy(new Partitioner()
> {def numPartitions=numKeys;def getPartition(key: Any) =
> key.asInstanceOf[Long].toInt})
>
> i.e., key by a unique number, count that, and repartition by key to the
> exact count.  This presumes, of course, that the number of keys is 
> Also, I haven't tested this code, so don't take it as anything more than
> an approximate idea, please :-)
>
>  -Nathan Kronenfeld
>


Re: RepartitionByKey Behavior

2018-06-22 Thread Nathan Kronenfeld
>
> On Thu, Jun 21, 2018 at 4:51 PM, Chawla,Sumit 
>>> wrote:
>>>
 Hi

  I have been trying to this simple operation.  I want to land all
 values with one key in same partition, and not have any different key in
 the same partition.  Is this possible?   I am getting b and c always
 getting mixed up in the same partition.



I think you could do something approsimately like:

 val keys = rdd.map(_.getKey).distinct.zipWithIndex
 val numKey = keys.map(_._2).count
 rdd.map(r => (r.getKey, r)).join(keys).partitionBy(new Partitioner()
{def numPartitions=numKeys;def getPartition(key: Any) =
key.asInstanceOf[Long].toInt})

i.e., key by a unique number, count that, and repartition by key to the
exact count.  This presumes, of course, that the number of keys is 

Re: RepartitionByKey Behavior

2018-06-22 Thread Elior Malul
Hi Chawla,
There is nothing wrong with your code, nor with Spark.

The situation in which two different keys are mapped to the same partition
is perfectly valid,
since they are mapped to the same 'bucket'.

The promise is that all records with the same key 'k' will be mapped to the
same partition.

On Fri, Jun 22, 2018 at 3:07 AM, Jungtaek Lim  wrote:

> It is not possible because the cardinality of the partitioning key is
> non-deterministic, while partition count should be fixed. There's a chance
> that cardinality > partition count and then the system can't ensure the
> requirement.
>
> Thanks,
> Jungtaek Lim (HeartSaVioR)
>
> 2018년 6월 22일 (금) 오전 8:55, Chawla,Sumit 님이 작성:
>
>> Based on code read it looks like Spark does modulo of key for
>> partition.  Keys of c and b end up pointing to same value.  Whats the best
>> partitioning scheme to deal with this?
>>
>> Regards
>>
>> Sumit Chawla
>>
>>
>> On Thu, Jun 21, 2018 at 4:51 PM, Chawla,Sumit 
>> wrote:
>>
>>> Hi
>>>
>>>  I have been trying to this simple operation.  I want to land all values
>>> with one key in same partition, and not have any different key in the same
>>> partition.  Is this possible?   I am getting b and c always getting mixed
>>> up in the same partition.
>>>
>>>
>>> rdd = sc.parallelize([('a', 5), ('d', 8), ('b', 6), ('a', 8), ('d', 9),
>>> ('b', 3),('c', 8)])
>>> from pyspark.rdd import portable_hash
>>>
>>> n = 4
>>>
>>> def partitioner(n):
>>> """Partition by the first item in the key tuple"""
>>> def partitioner_(x):
>>> val = x[0]
>>> key = portable_hash(x[0])
>>> print ("Val %s Assigned Key %s" % (val, key))
>>> return key
>>> return partitioner_
>>>
>>> def validate(part):
>>> last_key = None
>>> for p in part:
>>> k = p[0]
>>> if not last_key:
>>> last_key = k
>>> if k != last_key:
>>> print("Mixed keys in partition %s %s" % (k,last_key) )
>>>
>>> partioned = (rdd
>>>   .keyBy(lambda kv: (kv[0], kv[1]))
>>>   .repartitionAndSortWithinPartitions(
>>>   numPartitions=n, partitionFunc=partitioner(n),
>>> ascending=False)).map(lambda x: x[1])
>>>
>>> print(partioned.getNumPartitions())
>>> partioned.foreachPartition(validate)
>>>
>>>
>>> Val a Assigned Key -7583489610679606711
>>> Val a Assigned Key -7583489610679606711
>>> Val d Assigned Key 2755936516345535118
>>> Val b Assigned Key -1175849324817995036
>>> Val c Assigned Key 1421958803217889556
>>> Val d Assigned Key 2755936516345535118
>>> Val b Assigned Key -1175849324817995036
>>> Mixed keys in partition b c
>>> Mixed keys in partition b c
>>>
>>>
>>> Regards
>>> Sumit Chawla
>>>
>>>
>>


Re: RepartitionByKey Behavior

2018-06-21 Thread Jungtaek Lim
It is not possible because the cardinality of the partitioning key is
non-deterministic, while partition count should be fixed. There's a chance
that cardinality > partition count and then the system can't ensure the
requirement.

Thanks,
Jungtaek Lim (HeartSaVioR)

2018년 6월 22일 (금) 오전 8:55, Chawla,Sumit 님이 작성:

> Based on code read it looks like Spark does modulo of key for partition.
> Keys of c and b end up pointing to same value.  Whats the best partitioning
> scheme to deal with this?
>
> Regards
>
> Sumit Chawla
>
>
> On Thu, Jun 21, 2018 at 4:51 PM, Chawla,Sumit 
> wrote:
>
>> Hi
>>
>>  I have been trying to this simple operation.  I want to land all values
>> with one key in same partition, and not have any different key in the same
>> partition.  Is this possible?   I am getting b and c always getting mixed
>> up in the same partition.
>>
>>
>> rdd = sc.parallelize([('a', 5), ('d', 8), ('b', 6), ('a', 8), ('d', 9),
>> ('b', 3),('c', 8)])
>> from pyspark.rdd import portable_hash
>>
>> n = 4
>>
>> def partitioner(n):
>> """Partition by the first item in the key tuple"""
>> def partitioner_(x):
>> val = x[0]
>> key = portable_hash(x[0])
>> print ("Val %s Assigned Key %s" % (val, key))
>> return key
>> return partitioner_
>>
>> def validate(part):
>> last_key = None
>> for p in part:
>> k = p[0]
>> if not last_key:
>> last_key = k
>> if k != last_key:
>> print("Mixed keys in partition %s %s" % (k,last_key) )
>>
>> partioned = (rdd
>>   .keyBy(lambda kv: (kv[0], kv[1]))
>>   .repartitionAndSortWithinPartitions(
>>   numPartitions=n, partitionFunc=partitioner(n),
>> ascending=False)).map(lambda x: x[1])
>>
>> print(partioned.getNumPartitions())
>> partioned.foreachPartition(validate)
>>
>>
>> Val a Assigned Key -7583489610679606711
>> Val a Assigned Key -7583489610679606711
>> Val d Assigned Key 2755936516345535118
>> Val b Assigned Key -1175849324817995036
>> Val c Assigned Key 1421958803217889556
>> Val d Assigned Key 2755936516345535118
>> Val b Assigned Key -1175849324817995036
>> Mixed keys in partition b c
>> Mixed keys in partition b c
>>
>>
>> Regards
>> Sumit Chawla
>>
>>
>


Re: RepartitionByKey Behavior

2018-06-21 Thread Chawla,Sumit
Based on code read it looks like Spark does modulo of key for partition.
Keys of c and b end up pointing to same value.  Whats the best partitioning
scheme to deal with this?

Regards
Sumit Chawla


On Thu, Jun 21, 2018 at 4:51 PM, Chawla,Sumit 
wrote:

> Hi
>
>  I have been trying to this simple operation.  I want to land all values
> with one key in same partition, and not have any different key in the same
> partition.  Is this possible?   I am getting b and c always getting mixed
> up in the same partition.
>
>
> rdd = sc.parallelize([('a', 5), ('d', 8), ('b', 6), ('a', 8), ('d', 9),
> ('b', 3),('c', 8)])
> from pyspark.rdd import portable_hash
>
> n = 4
>
> def partitioner(n):
> """Partition by the first item in the key tuple"""
> def partitioner_(x):
> val = x[0]
> key = portable_hash(x[0])
> print ("Val %s Assigned Key %s" % (val, key))
> return key
> return partitioner_
>
> def validate(part):
> last_key = None
> for p in part:
> k = p[0]
> if not last_key:
> last_key = k
> if k != last_key:
> print("Mixed keys in partition %s %s" % (k,last_key) )
>
> partioned = (rdd
>   .keyBy(lambda kv: (kv[0], kv[1]))
>   .repartitionAndSortWithinPartitions(
>   numPartitions=n, partitionFunc=partitioner(n),
> ascending=False)).map(lambda x: x[1])
>
> print(partioned.getNumPartitions())
> partioned.foreachPartition(validate)
>
>
> Val a Assigned Key -7583489610679606711
> Val a Assigned Key -7583489610679606711
> Val d Assigned Key 2755936516345535118
> Val b Assigned Key -1175849324817995036
> Val c Assigned Key 1421958803217889556
> Val d Assigned Key 2755936516345535118
> Val b Assigned Key -1175849324817995036
> Mixed keys in partition b c
> Mixed keys in partition b c
>
>
> Regards
> Sumit Chawla
>
>


RepartitionByKey Behavior

2018-06-21 Thread Chawla,Sumit
Hi

 I have been trying to this simple operation.  I want to land all values
with one key in same partition, and not have any different key in the same
partition.  Is this possible?   I am getting b and c always getting mixed
up in the same partition.


rdd = sc.parallelize([('a', 5), ('d', 8), ('b', 6), ('a', 8), ('d', 9),
('b', 3),('c', 8)])
from pyspark.rdd import portable_hash

n = 4

def partitioner(n):
"""Partition by the first item in the key tuple"""
def partitioner_(x):
val = x[0]
key = portable_hash(x[0])
print ("Val %s Assigned Key %s" % (val, key))
return key
return partitioner_

def validate(part):
last_key = None
for p in part:
k = p[0]
if not last_key:
last_key = k
if k != last_key:
print("Mixed keys in partition %s %s" % (k,last_key) )

partioned = (rdd
  .keyBy(lambda kv: (kv[0], kv[1]))
  .repartitionAndSortWithinPartitions(
  numPartitions=n, partitionFunc=partitioner(n),
ascending=False)).map(lambda x: x[1])

print(partioned.getNumPartitions())
partioned.foreachPartition(validate)


Val a Assigned Key -7583489610679606711
Val a Assigned Key -7583489610679606711
Val d Assigned Key 2755936516345535118
Val b Assigned Key -1175849324817995036
Val c Assigned Key 1421958803217889556
Val d Assigned Key 2755936516345535118
Val b Assigned Key -1175849324817995036
Mixed keys in partition b c
Mixed keys in partition b c


Regards
Sumit Chawla