Github user baishuo commented on the pull request:
https://github.com/apache/spark/pull/2226#issuecomment-54949823
I try to explain my design idea(the code is mostly in
InsertIntoHiveTable.scala) :
lets assume there is a table called table1,which has 2 columns:col1,col2,
and two partitions: part1, part2.
first:
In case of just insert data to a static partition,I find when
"saveAsHiveFile" finished, the data was wroten to a temporary location, then
directory like: /tmp/hive-root/hive_****/-ext-10000,lets call it TMPLOCATION.
And under TMPLOCATION, there is sub directory /part1=.../part2=... , all data
was store under TMPLOCATION/part1=.../part2=... , then spark will call hive api
"loadPartition" to move the files to
{hivewarehouse}/{tablename}/part1=.../part2=... and update the metadata. then
the whole progress is OK.
If we what to implement the "dynamic partiton function", we need to use
hive api "loadDynamicPartitions" to move data and update metadata. But the
requirement of directory formate for "loadDynamicPartitions" is a little
difference to "loadPartition":
1: In case of one static partition and one dynamic partition (HQL like "
insert overwrite table table1 partition(part1=val1,part2) select a,b,c from
..."), loadDynamicPartitions need the tmp data located at
TMPLOCATION/part2=c1(there is NO "part1=val1", in the progress of
loadDynamicPartitions, it wiil be added), TMPLOCATION/part2=c2 ......., And
loadDynamicPartitions will move them to
{hivewarehouse}/{tablename}/part1=val1/part2=c1,
{hivewarehouse}/{tablename}/part1=val1/part2=c2 ...., and update the metadata.
Note that in this case loadDynamicPartitions do note need the subdir like
part1=val1 under TMPLOCATION
2: In case of zero static partition and 2 dynamic partition (HQL like "
insert overwrite table table1 partition(part1,part2) select a,b,x,c from
..."), loadDynamicPartitions need the tmp data located at
TMPLOCATION/part1=../part2=c1, TMPLOCATION/part1=../part2=c2 ......., And
loadDynamicPartitions will move them to
{hivewarehouse}/{tablename}/part1=../part2=...,
So whether there is static partition in HQL determines how we create subdir
under TMPLOCATION. That why the function "getDynamicPartDir" exist.
second:
where shall we call the "getDynamicPartDir"? must a location that we can
get the values for dynamic partiton. so we call this function at "iter.map {
row =>..." in the closure of "val rdd = childRdd.mapPartitions". when we get
the row, we can get the values for dynamic partiton. after we get the
dynamicPartPath by function getDynamicPartDir, we can pass it to next RDD by
the output this RDD: serializer.serialize(outputData, standardOI) ->
dynamicPartPath. (for the static partiton,dynamicPartPath is null)
when the next rdd (closure in writeToFile) get the data and
dynamicPartPath, we can check if the dynamicPartPath equals null. if not null.
we check if there is already a corresponding writer exist in writerMap which
store all writer for each partition. if there is. we use this writer to write
the record. that ensure the data belongs to same partition will be wroten to
the same directory.
loadDynamicPartitions require there is no other files under TMPLOCATION
except the subdir for dynamic partition. that why there are several "if
(dynamicPartNum == 0)" in writeToFile
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]