Hi Chawla,
There is nothing wrong with your code, nor with Spark.

The situation in which two different keys are mapped to the same partition
is perfectly valid,
since they are mapped to the same 'bucket'.

The promise is that all records with the same key 'k' will be mapped to the
same partition.

On Fri, Jun 22, 2018 at 3:07 AM, Jungtaek Lim <kabh...@gmail.com> wrote:

> It is not possible because the cardinality of the partitioning key is
> non-deterministic, while partition count should be fixed. There's a chance
> that cardinality > partition count and then the system can't ensure the
> requirement.
>
> Thanks,
> Jungtaek Lim (HeartSaVioR)
>
> 2018년 6월 22일 (금) 오전 8:55, Chawla,Sumit <sumitkcha...@gmail.com>님이 작성:
>
>> Based on code read it looks like Spark does modulo of key for
>> partition.  Keys of c and b end up pointing to same value.  Whats the best
>> partitioning scheme to deal with this?
>>
>> Regards
>>
>> Sumit Chawla
>>
>>
>> On Thu, Jun 21, 2018 at 4:51 PM, Chawla,Sumit <sumitkcha...@gmail.com>
>> wrote:
>>
>>> Hi
>>>
>>>  I have been trying to this simple operation.  I want to land all values
>>> with one key in same partition, and not have any different key in the same
>>> partition.  Is this possible?   I am getting b and c always getting mixed
>>> up in the same partition.
>>>
>>>
>>> rdd = sc.parallelize([('a', 5), ('d', 8), ('b', 6), ('a', 8), ('d', 9),
>>> ('b', 3),('c', 8)])
>>> from pyspark.rdd import portable_hash
>>>
>>> n = 4
>>>
>>> def partitioner(n):
>>>     """Partition by the first item in the key tuple"""
>>>     def partitioner_(x):
>>>         val = x[0]
>>>         key = portable_hash(x[0])
>>>         print ("Val %s Assigned Key %s" % (val, key))
>>>         return key
>>>     return partitioner_
>>>
>>> def validate(part):
>>>     last_key = None
>>>     for p in part:
>>>         k = p[0]
>>>         if not last_key:
>>>             last_key = k
>>>         if k != last_key:
>>>             print("Mixed keys in partition %s %s" % (k,last_key) )
>>>
>>> partioned = (rdd
>>>   .keyBy(lambda kv: (kv[0], kv[1]))
>>>   .repartitionAndSortWithinPartitions(
>>>       numPartitions=n, partitionFunc=partitioner(n),
>>> ascending=False)).map(lambda x: x[1])
>>>
>>> print(partioned.getNumPartitions())
>>> partioned.foreachPartition(validate)
>>>
>>>
>>> Val a Assigned Key -7583489610679606711
>>> Val a Assigned Key -7583489610679606711
>>> Val d Assigned Key 2755936516345535118
>>> Val b Assigned Key -1175849324817995036
>>> Val c Assigned Key 1421958803217889556
>>> Val d Assigned Key 2755936516345535118
>>> Val b Assigned Key -1175849324817995036
>>> Mixed keys in partition b c
>>> Mixed keys in partition b c
>>>
>>>
>>> Regards
>>> Sumit Chawla
>>>
>>>
>>

Reply via email to