HeartSaVioR opened a new pull request #31355:
URL: https://github.com/apache/spark/pull/31355


   ### What changes were proposed in this pull request?
   
   This PR proposes to extend the functionality of requirement for distribution 
and ordering on V2 write to specify the number of partitioning on repartition, 
so that data source is able to control the parallelism and determine the data 
distribution per partition in prior.
   
   The partitioning with static number is optional, and by default disabled via 
default method, so only implementations required to restrict the number of 
partition statically need to override the method and provide the number.
   
   ### Why are the changes needed?
   
   The use case comes from feature parity with DSv1.
   
   I have state data source which enables the state in SS to be rewritten, 
which enables repartitioning, schema evolution, etc via batch query. The writer 
requires hash partitioning against group key, with the "desired number of 
partitions", which is same as what Spark does read and write against state.
   
   This is now implemented as DSv1, and the requirement is simply done by 
calling repartition with the "desired number".
   
   ```
   val fullPathsForKeyColumns = keySchema.map(key => new 
Column(s"key.${key.name}"))
   data
     .repartition(newPartitions, fullPathsForKeyColumns: _*)
     .queryExecution
     .toRdd
     .foreachPartition(
       writeFn(resolvedCpLocation, version, operatorId, storeName, keySchema, 
valueSchema,
         storeConf, hadoopConfBroadcast, queryId))
   ```
   
   Thanks to SPARK-34026, it's now possible to require the hash partitioning, 
but still not able to require the number of partitions. This PR will enable to 
let data source require the number of partitions.
   
   ### Does this PR introduce _any_ user-facing change?
   
   Yes, but only for data source implementors. Even for them, this is no 
breaking change as default method is added.
   
   ### How was this patch tested?
   
   Added UTs.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to