Github user baishuo commented on the pull request:

    https://github.com/apache/spark/pull/2226#issuecomment-54949823
  
     I try to explain my design idea(the code is mostly in 
InsertIntoHiveTable.scala) :
    lets assume there is a table called table1,which has 2 columns:col1,col2, 
and two partitions: part1, part2.
    
    first:
    In case of just insert data to a static partition,I find when 
"saveAsHiveFile" finished, the data was wroten to a temporary location, then 
directory like: /tmp/hive-root/hive_****/-ext-10000,lets call it TMPLOCATION. 
And under TMPLOCATION, there is sub directory /part1=.../part2=... , all data 
was store under TMPLOCATION/part1=.../part2=... , then spark will call hive api 
"loadPartition" to move the files to 
{hivewarehouse}/{tablename}/part1=.../part2=... and update the metadata. then 
the whole progress is OK.
    
    If we what to implement the "dynamic partiton function", we need to use 
hive api "loadDynamicPartitions" to move data and update metadata. But the 
requirement of directory formate for "loadDynamicPartitions" is a little 
difference to "loadPartition":
    
    1: In case of one static partition and one dynamic partition (HQL like "
    insert overwrite table table1 partition(part1=val1,part2) select a,b,c from 
..."), loadDynamicPartitions need the tmp data located at 
TMPLOCATION/part2=c1(there is NO "part1=val1", in the progress of 
loadDynamicPartitions, it wiil be added), TMPLOCATION/part2=c2 ......., And 
loadDynamicPartitions will move them to 
{hivewarehouse}/{tablename}/part1=val1/part2=c1, 
{hivewarehouse}/{tablename}/part1=val1/part2=c2 ...., and update the metadata. 
Note that in this case loadDynamicPartitions do note need the subdir like 
part1=val1 under TMPLOCATION
    
    2: In case of zero static partition and 2 dynamic partition (HQL like "
    insert overwrite table table1 partition(part1,part2) select a,b,x,c from 
..."), loadDynamicPartitions need the tmp data located at 
TMPLOCATION/part1=../part2=c1, TMPLOCATION/part1=../part2=c2 ......., And 
loadDynamicPartitions will move them to 
{hivewarehouse}/{tablename}/part1=../part2=...,
    
    So whether there is static partition in HQL determines how we create subdir 
under TMPLOCATION. That why the function "getDynamicPartDir" exist.
    
    second:
    where shall we call the "getDynamicPartDir"? must a location that we can 
get the values for dynamic partiton. so we call this function at "iter.map { 
row =>..." in the closure of "val rdd = childRdd.mapPartitions". when we get 
the row, we can get the values for dynamic partiton. after we get the 
dynamicPartPath by function getDynamicPartDir, we can pass it to next RDD by 
the output this RDD: serializer.serialize(outputData, standardOI) -> 
dynamicPartPath. (for the static partiton,dynamicPartPath is null)
    
    when the next rdd (closure in writeToFile) get the data and 
dynamicPartPath, we can check if the dynamicPartPath equals null. if not null. 
we check if there is already a corresponding writer exist in writerMap which 
store all writer for each partition. if there is. we use this writer to write 
the record. that ensure the data belongs to same partition will be wroten to 
the same directory.
    
    loadDynamicPartitions require there is no other files under TMPLOCATION 
except the subdir for dynamic partition. that why there are several "if 
(dynamicPartNum == 0)" in writeToFile


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to