Marc Arndt created SPARK-27853:
----------------------------------

             Summary: Allow for custom Partitioning implementations
                 Key: SPARK-27853
                 URL: https://issues.apache.org/jira/browse/SPARK-27853
             Project: Spark
          Issue Type: Improvement
          Components: Optimizer, SQL
    Affects Versions: 2.4.3
            Reporter: Marc Arndt


When partitioning a Dataset Spark uses the physical plan element 
ShuffleExchangeExec together with a Partitioning instance. 

I find myself in situation where I need to provide my own partitioning 
criteria, that decides to which partition each InternalRow should belong. 
According to the Spark API I would expect to be able to provide my custom 
partitioning criteria as a custom implementation of the Partitioning interface.

Sadly after implementing a custom Partitioning implementation you will receive 
a "Exchange not implemented for $newPartitioning" error message, because of the 
following code inside the ShuffleExchangeExec#prepareShuffleDependency method:

{code:scala}
val part: Partitioner = newPartitioning match {
    case RoundRobinPartitioning(numPartitions) => new 
HashPartitioner(numPartitions)
    case HashPartitioning(_, n) =>
    new Partitioner {
        override def numPartitions: Int = n
        // For HashPartitioning, the partitioning key is already a valid 
partition ID, as we use
        // `HashPartitioning.partitionIdExpression` to produce partitioning key.
        override def getPartition(key: Any): Int = key.asInstanceOf[Int]
    }
    case RangePartitioning(sortingExpressions, numPartitions) =>
    // Internally, RangePartitioner runs a job on the RDD that samples keys to 
compute
    // partition bounds. To get accurate samples, we need to copy the mutable 
keys.
    val rddForSampling = rdd.mapPartitionsInternal { iter =>
        val mutablePair = new MutablePair[InternalRow, Null]()
        iter.map(row => mutablePair.update(row.copy(), null))
    }
    implicit val ordering = new LazilyGeneratedOrdering(sortingExpressions, 
outputAttributes)
    new RangePartitioner(
        numPartitions,
        rddForSampling,
        ascending = true,
        samplePointsPerPartitionHint = 
SQLConf.get.rangeExchangeSampleSizePerPartition)
    case SinglePartition =>
    new Partitioner {
        override def numPartitions: Int = 1
        override def getPartition(key: Any): Int = 0
    }
    case _ => sys.error(s"Exchange not implemented for $newPartitioning")
    // TODO: Handle BroadcastPartitioning.
}
def getPartitionKeyExtractor(): InternalRow => Any = newPartitioning match {
    case RoundRobinPartitioning(numPartitions) =>
    // Distributes elements evenly across output partitions, starting from a 
random partition.
    var position = new 
Random(TaskContext.get().partitionId()).nextInt(numPartitions)
    (row: InternalRow) => {
        // The HashPartitioner will handle the `mod` by the number of partitions
        position += 1
        position
    }
    case h: HashPartitioning =>
    val projection = UnsafeProjection.create(h.partitionIdExpression :: Nil, 
outputAttributes)
    row => projection(row).getInt(0)
    case RangePartitioning(_, _) | SinglePartition => identity
    case _ => sys.error(s"Exchange not implemented for $newPartitioning")
}
{code}

The code in the above code snippet matches the given Partitioning instance 
"newPartitioning" against a set of given hardcoded Partitioning types. When 
adding a new Partitioning implementation the pattern matching won't be able to 
find a pattern for it and therefore will use the fallback case:

{code:java}
    case _ => sys.error(s"Exchange not implemented for $newPartitioning")
{code}

and throw an exception.

To be able to provide custom partition behaviour I would suggest to change the 
implementation in ShuffleExchangeExec to be able to work with an arbitrary 
Partitioning implementation. For the Partition creation I would imagine that 
this can be done in a nice way inside the Partitioning classes via a 
Partitioning#createPartitioner method.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to