On Tue, Oct 4, 2011 at 1:46 PM, Heikki Linnakangas <
[email protected]> wrote:
> Ok. Could you phrase that as a code comment?
>
> Here's a version of the patch I've been working on. There's no functional
> changes, just a lot of moving things around, comment changes, etc. to
> hopefully make it more readable.
Thanks for your work on this patch. Patch with comment is attached.
------
With best regards,
Alexander Korotkov.
*** a/src/backend/access/gist/gistproc.c
--- b/src/backend/access/gist/gistproc.c
***************
*** 26,31 **** static bool gist_box_leaf_consistent(BOX *key, BOX *query,
--- 26,35 ----
static double size_box(Datum dbox);
static bool rtree_internal_consistent(BOX *key, BOX *query,
StrategyNumber strategy);
+ static BOX *empty_box(void);
+
+ /* Minimal possible ratio of split */
+ #define LIMIT_RATIO 0.3
/**************************************************
***************
*** 49,78 **** rt_box_union(PG_FUNCTION_ARGS)
PG_RETURN_BOX_P(n);
}
- static Datum
- rt_box_inter(PG_FUNCTION_ARGS)
- {
- BOX *a = PG_GETARG_BOX_P(0);
- BOX *b = PG_GETARG_BOX_P(1);
- BOX *n;
-
- n = (BOX *) palloc(sizeof(BOX));
-
- n->high.x = Min(a->high.x, b->high.x);
- n->high.y = Min(a->high.y, b->high.y);
- n->low.x = Max(a->low.x, b->low.x);
- n->low.y = Max(a->low.y, b->low.y);
-
- if (n->high.x < n->low.x || n->high.y < n->low.y)
- {
- pfree(n);
- /* Indicate "no intersection" by returning NULL pointer */
- n = NULL;
- }
-
- PG_RETURN_BOX_P(n);
- }
-
/*
* The GiST Consistent method for boxes
*
--- 53,58 ----
***************
*** 194,279 **** gist_box_penalty(PG_FUNCTION_ARGS)
PG_RETURN_POINTER(result);
}
- static void
- chooseLR(GIST_SPLITVEC *v,
- OffsetNumber *list1, int nlist1, BOX *union1,
- OffsetNumber *list2, int nlist2, BOX *union2)
- {
- bool firstToLeft = true;
-
- if (v->spl_ldatum_exists || v->spl_rdatum_exists)
- {
- if (v->spl_ldatum_exists && v->spl_rdatum_exists)
- {
- BOX LRl = *union1,
- LRr = *union2;
- BOX RLl = *union2,
- RLr = *union1;
- double sizeLR,
- sizeRL;
-
- adjustBox(&LRl, DatumGetBoxP(v->spl_ldatum));
- adjustBox(&LRr, DatumGetBoxP(v->spl_rdatum));
- adjustBox(&RLl, DatumGetBoxP(v->spl_ldatum));
- adjustBox(&RLr, DatumGetBoxP(v->spl_rdatum));
-
- sizeLR = size_box(DirectFunctionCall2(rt_box_inter, BoxPGetDatum(&LRl), BoxPGetDatum(&LRr)));
- sizeRL = size_box(DirectFunctionCall2(rt_box_inter, BoxPGetDatum(&RLl), BoxPGetDatum(&RLr)));
-
- if (sizeLR > sizeRL)
- firstToLeft = false;
-
- }
- else
- {
- float p1,
- p2;
- GISTENTRY oldUnion,
- addon;
-
- gistentryinit(oldUnion, (v->spl_ldatum_exists) ? v->spl_ldatum : v->spl_rdatum,
- NULL, NULL, InvalidOffsetNumber, FALSE);
-
- gistentryinit(addon, BoxPGetDatum(union1), NULL, NULL, InvalidOffsetNumber, FALSE);
- DirectFunctionCall3(gist_box_penalty, PointerGetDatum(&oldUnion), PointerGetDatum(&addon), PointerGetDatum(&p1));
- gistentryinit(addon, BoxPGetDatum(union2), NULL, NULL, InvalidOffsetNumber, FALSE);
- DirectFunctionCall3(gist_box_penalty, PointerGetDatum(&oldUnion), PointerGetDatum(&addon), PointerGetDatum(&p2));
-
- if ((v->spl_ldatum_exists && p1 > p2) || (v->spl_rdatum_exists && p1 < p2))
- firstToLeft = false;
- }
- }
-
- if (firstToLeft)
- {
- v->spl_left = list1;
- v->spl_right = list2;
- v->spl_nleft = nlist1;
- v->spl_nright = nlist2;
- if (v->spl_ldatum_exists)
- adjustBox(union1, DatumGetBoxP(v->spl_ldatum));
- v->spl_ldatum = BoxPGetDatum(union1);
- if (v->spl_rdatum_exists)
- adjustBox(union2, DatumGetBoxP(v->spl_rdatum));
- v->spl_rdatum = BoxPGetDatum(union2);
- }
- else
- {
- v->spl_left = list2;
- v->spl_right = list1;
- v->spl_nleft = nlist2;
- v->spl_nright = nlist1;
- if (v->spl_ldatum_exists)
- adjustBox(union2, DatumGetBoxP(v->spl_ldatum));
- v->spl_ldatum = BoxPGetDatum(union2);
- if (v->spl_rdatum_exists)
- adjustBox(union1, DatumGetBoxP(v->spl_rdatum));
- v->spl_rdatum = BoxPGetDatum(union1);
- }
-
- v->spl_ldatum_exists = v->spl_rdatum_exists = false;
- }
-
/*
* Trivial split: half of entries will be placed on one page
* and another half - to another
--- 174,179 ----
***************
*** 338,536 **** fallbackSplit(GistEntryVector *entryvec, GIST_SPLITVEC *v)
}
/*
! * The GiST PickSplit method
*
! * New linear algorithm, see 'New Linear Node Splitting Algorithm for R-tree',
! * C.H.Ang and T.C.Tan
*
! * This is used for both boxes and points.
*/
Datum
gist_box_picksplit(PG_FUNCTION_ARGS)
{
GistEntryVector *entryvec = (GistEntryVector *) PG_GETARG_POINTER(0);
GIST_SPLITVEC *v = (GIST_SPLITVEC *) PG_GETARG_POINTER(1);
! OffsetNumber i;
! OffsetNumber *listL,
! *listR,
! *listB,
! *listT;
! BOX *unionL,
! *unionR,
! *unionB,
! *unionT;
! int posL,
! posR,
! posB,
! posT;
! BOX pageunion;
! BOX *cur;
! char direction = ' ';
! bool allisequal = true;
! OffsetNumber maxoff;
! int nbytes;
- posL = posR = posB = posT = 0;
maxoff = entryvec->n - 1;
! cur = DatumGetBoxP(entryvec->vector[FirstOffsetNumber].key);
! memcpy((void *) &pageunion, (void *) cur, sizeof(BOX));
! /* find MBR */
! for (i = OffsetNumberNext(FirstOffsetNumber); i <= maxoff; i = OffsetNumberNext(i))
{
! cur = DatumGetBoxP(entryvec->vector[i].key);
! if (allisequal && (
! pageunion.high.x != cur->high.x ||
! pageunion.high.y != cur->high.y ||
! pageunion.low.x != cur->low.x ||
! pageunion.low.y != cur->low.y
! ))
! allisequal = false;
!
! adjustBox(&pageunion, cur);
}
! if (allisequal)
{
/*
! * All entries are the same
*/
fallbackSplit(entryvec, v);
PG_RETURN_POINTER(v);
}
nbytes = (maxoff + 2) * sizeof(OffsetNumber);
! listL = (OffsetNumber *) palloc(nbytes);
! listR = (OffsetNumber *) palloc(nbytes);
! listB = (OffsetNumber *) palloc(nbytes);
! listT = (OffsetNumber *) palloc(nbytes);
! unionL = (BOX *) palloc(sizeof(BOX));
! unionR = (BOX *) palloc(sizeof(BOX));
! unionB = (BOX *) palloc(sizeof(BOX));
! unionT = (BOX *) palloc(sizeof(BOX));
!
! #define ADDLIST( list, unionD, pos, num ) do { \
! if ( pos ) { \
! if ( (unionD)->high.x < cur->high.x ) (unionD)->high.x = cur->high.x; \
! if ( (unionD)->low.x > cur->low.x ) (unionD)->low.x = cur->low.x; \
! if ( (unionD)->high.y < cur->high.y ) (unionD)->high.y = cur->high.y; \
! if ( (unionD)->low.y > cur->low.y ) (unionD)->low.y = cur->low.y; \
! } else { \
! memcpy( (void*)(unionD), (void*) cur, sizeof( BOX ) ); \
! } \
! (list)[pos] = num; \
! (pos)++; \
! } while(0)
! for (i = FirstOffsetNumber; i <= maxoff; i = OffsetNumberNext(i))
! {
! cur = DatumGetBoxP(entryvec->vector[i].key);
! if (cur->low.x - pageunion.low.x < pageunion.high.x - cur->high.x)
! ADDLIST(listL, unionL, posL, i);
! else
! ADDLIST(listR, unionR, posR, i);
! if (cur->low.y - pageunion.low.y < pageunion.high.y - cur->high.y)
! ADDLIST(listB, unionB, posB, i);
! else
! ADDLIST(listT, unionT, posT, i);
! }
! #define LIMIT_RATIO 0.1
! #define _IS_BADRATIO(x,y) ( (y) == 0 || (float)(x)/(float)(y) < LIMIT_RATIO )
! #define IS_BADRATIO(x,y) ( _IS_BADRATIO((x),(y)) || _IS_BADRATIO((y),(x)) )
! /* bad disposition, try to split by centers of boxes */
! if (IS_BADRATIO(posR, posL) && IS_BADRATIO(posT, posB))
{
! double avgCenterX = 0.0,
! avgCenterY = 0.0;
! double CenterX,
! CenterY;
! for (i = FirstOffsetNumber; i <= maxoff; i = OffsetNumberNext(i))
{
! cur = DatumGetBoxP(entryvec->vector[i].key);
! avgCenterX += ((double) cur->high.x + (double) cur->low.x) / 2.0;
! avgCenterY += ((double) cur->high.y + (double) cur->low.y) / 2.0;
}
!
! avgCenterX /= maxoff;
! avgCenterY /= maxoff;
!
! posL = posR = posB = posT = 0;
! for (i = FirstOffsetNumber; i <= maxoff; i = OffsetNumberNext(i))
{
! cur = DatumGetBoxP(entryvec->vector[i].key);
!
! CenterX = ((double) cur->high.x + (double) cur->low.x) / 2.0;
! CenterY = ((double) cur->high.y + (double) cur->low.y) / 2.0;
! if (CenterX < avgCenterX)
! ADDLIST(listL, unionL, posL, i);
! else if (CenterX == avgCenterX)
{
! if (posL > posR)
! ADDLIST(listR, unionR, posR, i);
! else
! ADDLIST(listL, unionL, posL, i);
}
else
- ADDLIST(listR, unionR, posR, i);
-
- if (CenterY < avgCenterY)
- ADDLIST(listB, unionB, posB, i);
- else if (CenterY == avgCenterY)
{
! if (posB > posT)
! ADDLIST(listT, unionT, posT, i);
! else
! ADDLIST(listB, unionB, posB, i);
}
- else
- ADDLIST(listT, unionT, posT, i);
}
!
! if (IS_BADRATIO(posR, posL) && IS_BADRATIO(posT, posB))
{
! fallbackSplit(entryvec, v);
! PG_RETURN_POINTER(v);
}
}
! /* which split more optimal? */
! if (Max(posL, posR) < Max(posB, posT))
! direction = 'x';
! else if (Max(posL, posR) > Max(posB, posT))
! direction = 'y';
! else
{
! Datum interLR = DirectFunctionCall2(rt_box_inter,
! BoxPGetDatum(unionL),
! BoxPGetDatum(unionR));
! Datum interBT = DirectFunctionCall2(rt_box_inter,
! BoxPGetDatum(unionB),
! BoxPGetDatum(unionT));
! double sizeLR,
! sizeBT;
!
! sizeLR = size_box(interLR);
! sizeBT = size_box(interBT);
!
! if (sizeLR < sizeBT)
! direction = 'x';
! else
! direction = 'y';
! }
! if (direction == 'x')
! chooseLR(v,
! listL, posL, unionL,
! listR, posR, unionR);
! else
! chooseLR(v,
! listB, posB, unionB,
! listT, posT, unionT);
PG_RETURN_POINTER(v);
}
--- 238,825 ----
}
/*
! * Represents information about entry which can be present to any group without
! * affecting overlap over selected axis ("common entry").
! */
! typedef struct
! {
! /* Index of entry in the initial array */
! int index;
! /* Delta between penalties of entry insertion into different groups */
! double delta;
! } CommonEntry;
!
! /*
! * Context for g_box_consider_split. Contains information about currently
! * selected split and some general information.
! */
! typedef struct
! {
! /* number of splitting entries */
! int entriesCount;
! /* minimum bounding box across all entries */
! BOX boundingBox;
!
! /* Information about currently selected split follows */
!
! bool first; /* true if no split was selected yet */
!
! double leftUpper; /* upper bound of left interval */
! double rightLower; /* lower bound of right interval */
!
! float4 ratio;
! float4 overlap;
! int dim; /* axis of this split */
! double range; /* width of general MBR projection to the selected axis */
! } ConsiderSplitContext;
!
! /*
! * Interval represents projection of box to axis.
! */
! typedef struct
! {
! double lower,
! upper;
! } SplitInterval;
!
! /*
! * Interval comparison function by lower bound of the interval;
! */
! static int
! interval_cmp_lower(const void *i1, const void *i2)
! {
! double lower1 = ((SplitInterval *) i1)->lower,
! lower2 = ((SplitInterval *) i2)->lower;
!
! if (lower1 < lower2)
! return -1;
! else if (lower1 > lower2)
! return 1;
! else
! return 0;
! }
!
! /*
! * Interval comparison function by upper bound of the interval;
! */
! static int
! interval_cmp_upper(const void *i1, const void *i2)
! {
! double upper1 = ((SplitInterval *) i1)->upper,
! upper2 = ((SplitInterval *) i2)->upper;
!
! if (upper1 < upper2)
! return -1;
! else if (upper1 > upper2)
! return 1;
! else
! return 0;
! }
!
! /*
! * Replace negative value with zero.
! */
! static inline float
! non_negative(float val)
! {
! if (val >= 0.0f)
! return val;
! else
! return 0.0f;
! }
!
! /*
! * Consider replacement of currently selected split with the better one.
! */
! static void inline
! g_box_consider_split(ConsiderSplitContext *context, int dimNum,
! double rightLower, int minLeftCount,
! double leftUpper, int maxLeftCount)
! {
! int leftCount,
! rightCount;
! float4 ratio,
! overlap;
! double range;
!
! /*
! * Calculate entries distribution ration assuming most uniform
! * distribution of common entries.
! */
! if (minLeftCount >= (context->entriesCount + 1) / 2)
! {
! leftCount = minLeftCount;
! }
! else
! {
! if (maxLeftCount <= context->entriesCount / 2)
! leftCount = maxLeftCount;
! else
! leftCount = context->entriesCount / 2;
! }
! rightCount = context->entriesCount - leftCount;
!
! /*
! * Ratio of split - quotient between size of lesser group and total entries
! * count.
! */
! ratio = ((float4) Min(leftCount, rightCount)) /
! ((float4) context->entriesCount);
!
! if (ratio > LIMIT_RATIO)
! {
! bool selectthis = false;
!
! /*
! * The ratio is acceptable, so compare current split with previously
! * selected one. Between splits of one dimension we search for
! * minimal overlap (allowing negative values) and minimal ration
! * (between same overlaps. We switch dimension if find less overlap
! * (non-negative) or less range with same overlap.
! */
! if (dimNum == 0)
! range = context->boundingBox.high.x - context->boundingBox.low.x;
! else
! range = context->boundingBox.high.y - context->boundingBox.low.y;
!
! overlap = (leftUpper - rightLower) / range;
!
! /* If there is no previous selection, select this */
! if (context->first)
! selectthis = true;
! else if (context->dim == dimNum)
! {
! /*
! * Within the same dimension, choose the new split if it has a
! * smaller overlap, or same overlap but better ratio.
! */
! if (overlap < context->overlap ||
! (overlap == context->overlap && ratio > context->ratio))
! selectthis = true;
! }
! else
! {
! /*
! * Across dimensions, choose the new split if it has a
! * smaller *non-negative* overlap, or same *non-negative* overlap
! * but bigger range. This condition differs from described in the
! * article. On the datasets where leaf MBRs don't overlap
! * themselves, non-overlapping splits (i.e. splits which have zero
! * *non-negative* overlap) are frequently possible. In this case
! * splits tends to be along one dimension, because most distant
! * non-overlapping splits (i.e. having lowest negative overlap)
! * appears to be in the same dimension as in the previous split.
! * Therefore MBRs appear to be very prolonged along another
! * dimension, that leads to bad search performance. Using range
! * as the second split criteria makes MBRs more quadratic. Using
! * *non-negative* overlap instead of overlap as the first split
! * criteria gives to range criteria chance to matter, because
! * non-overlapping splits are equivalent in this criteria.
! */
! if (non_negative(overlap) < non_negative(context->overlap) ||
! (range > context->range &&
! non_negative(overlap) <= non_negative(context->overlap)))
! selectthis = true;
! }
!
! if (selectthis)
! {
! /* save information about selected split */
! context->first = false;
! context->ratio = ratio;
! context->range = range;
! context->overlap = overlap;
! context->rightLower = rightLower;
! context->leftUpper = leftUpper;
! context->dim = dimNum;
! }
! }
! }
!
! /*
! * Create new empty BOX
! */
! static BOX *
! empty_box(void)
! {
! BOX *result;
!
! result = (BOX *) palloc(sizeof(BOX));
! result->low.x = 0.0f;
! result->low.y = 0.0f;
! result->high.x = 0.0f;
! result->high.y = 0.0f;
! return result;
! }
!
! /*
! * Return increase of original BOX area by new BOX area insertion.
! */
! static double
! box_penalty(BOX *original, BOX *new)
! {
! double union_width,
! union_height;
!
! union_width = Max(original->high.x, new->high.x) -
! Min(original->low.x, new->low.x);
! union_height = Max(original->high.y, new->high.y) -
! Min(original->low.y, new->low.y);
! return union_width * union_height - (original->high.x - original->low.x) *
! (original->high.y - original->low.y);
! }
!
! /*
! * Compare common entries by their deltas.
! */
! static int
! common_entry_cmp(const void *i1, const void *i2)
! {
! double delta1 = ((CommonEntry *) i1)->delta,
! delta2 = ((CommonEntry *) i2)->delta;
!
! if (delta1 < delta2)
! return -1;
! else if (delta1 > delta2)
! return 1;
! else
! return 0;
! }
!
! /*
! * --------------------------------------------------------------------------
! * Double sorting split algorithm. Finds split of boxes by considering splits
! * along each axis. Searching splits along axes are based on the notions of
! * split pair and corner split pair.
*
! * Split pair is a pair <a, b> where 'a' is upper bound of left group and 'b'
! * is lower bound of right group. Lower bound or left group and upper bound of
! * right group are assumed to be the general bounds along axis.
*
! * Corner split pair is a split pair <a, b> where 'a' is upper bound of some
! * interval, 'b' is lower bound of some, 'a' can't be lower or 'b' can't be
! * higher with split pair property preservation.
! *
! * Split along axis divides entries to the following groups:
! * 1) Entries which should be placed to the left group
! * 2) Entries which should be placed to the right group
! * 3) "Common entries" which can be placed to any of groups without affecting
! * of overlap along selected axis.
! *
! * Split along axis is selected by overlap along that axis and some other
! * criteria (see g_box_consider_split). When split along axis is selected,
! * "common entries" are distributed by penalty with group boxes.
! *
! * For details see:
! * "A new double sorting-based node splitting algorithm for R-tree", A. Korotkov
! * http://syrcose.ispras.ru/2011/files/SYRCoSE2011_Proceedings.pdf#page=36
! * --------------------------------------------------------------------------
*/
Datum
gist_box_picksplit(PG_FUNCTION_ARGS)
{
GistEntryVector *entryvec = (GistEntryVector *) PG_GETARG_POINTER(0);
GIST_SPLITVEC *v = (GIST_SPLITVEC *) PG_GETARG_POINTER(1);
! OffsetNumber i,
! maxoff;
! ConsiderSplitContext context;
! BOX *box,
! *leftBox,
! *rightBox;
! int dim,
! commonEntriesCount,
! nbytes;
! SplitInterval *intervalsLower,
! *intervalsUpper;
! CommonEntry *commonEntries;
! int nentries;
!
! #define PLACE_LEFT(box, off) \
! do { \
! if (v->spl_nleft > 0) \
! adjustBox(leftBox, box); \
! else \
! *leftBox = *(box); \
! v->spl_left[v->spl_nleft++] = off; \
! } while(0)
!
! #define PLACE_RIGHT(box, off) \
! do { \
! if (v->spl_nright > 0) \
! adjustBox(rightBox, box); \
! else \
! *rightBox = *(box); \
! v->spl_right[v->spl_nright++] = off; \
! } while(0)
!
! memset(&context, 0, sizeof(ConsiderSplitContext));
maxoff = entryvec->n - 1;
+ nentries = context.entriesCount = maxoff - FirstOffsetNumber + 1;
! /* Allocate arrays for intervals along axes */
! intervalsLower = (SplitInterval *) palloc(nentries * sizeof(SplitInterval));
! intervalsUpper = (SplitInterval *) palloc(nentries * sizeof(SplitInterval));
! /*
! * Calculate the overall minimum bounding box over all the entries.
! */
! for (i = FirstOffsetNumber; i <= maxoff; i = OffsetNumberNext(i))
{
! box = DatumGetBoxP(entryvec->vector[i].key);
! if (i == FirstOffsetNumber)
! context.boundingBox = *box;
! else
! adjustBox(&context.boundingBox, box);
}
! /*
! * Iterate over axes for optimal split searching.
! */
! context.first = true; /* nothing selected yet */
! for (dim = 0; dim < 2; dim++)
{
+ double leftUpper,
+ rightLower;
+ int i1,
+ i2;
+
+ /* Prepare array of intervals along axis */
+ for (i = FirstOffsetNumber; i <= maxoff; i = OffsetNumberNext(i))
+ {
+ box = DatumGetBoxP(entryvec->vector[i].key);
+ if (dim == 0)
+ {
+ intervalsLower[i - FirstOffsetNumber].lower = box->low.x;
+ intervalsLower[i - FirstOffsetNumber].upper = box->high.x;
+ }
+ else
+ {
+ intervalsLower[i - FirstOffsetNumber].lower = box->low.y;
+ intervalsLower[i - FirstOffsetNumber].upper = box->high.y;
+ }
+ }
+
/*
! * Make two arrays of intervals: sorted by lower bound and sorted by
! * upper bound.
*/
+ memcpy(intervalsUpper, intervalsLower,
+ sizeof(SplitInterval) * nentries);
+ qsort(intervalsLower, nentries, sizeof(SplitInterval),
+ interval_cmp_lower);
+ qsort(intervalsUpper, nentries, sizeof(SplitInterval),
+ interval_cmp_upper);
+
+ /*
+ * Iterate over lower bound of right group finding corresponding
+ * lowest upper bound of left group.
+ */
+ i1 = 0;
+ i2 = 0;
+ rightLower = intervalsLower[i1].lower;
+ leftUpper = intervalsUpper[i2].lower;
+
+ while (true)
+ {
+ /*
+ * Find next lower bound of right group.
+ */
+ while (i1 < nentries && rightLower == intervalsLower[i1].lower)
+ {
+ leftUpper = Max(leftUpper, intervalsLower[i1].upper);
+ i1++;
+ }
+ if (i1 >= nentries)
+ break;
+ rightLower = intervalsLower[i1].lower;
+
+ /*
+ * Find count of intervals which anyway should be placed to the
+ * left group.
+ */
+ while (i2 < nentries && intervalsUpper[i2].upper <= leftUpper)
+ i2++;
+
+ /*
+ * Consider found split.
+ */
+ g_box_consider_split(&context, dim, rightLower, i1, leftUpper, i2);
+ }
+
+ /*
+ * Iterate over upper bound of left group finding corresponding
+ * highest lower bound of right group.
+ */
+ i1 = nentries - 1;
+ i2 = nentries - 1;
+ rightLower = intervalsLower[i1].upper;
+ leftUpper = intervalsUpper[i2].upper;
+
+ while (true)
+ {
+ /*
+ * Find next upper bound of left group.
+ */
+ while (i2 >= 0 && leftUpper == intervalsUpper[i2].upper)
+ {
+ rightLower = Min(rightLower, intervalsUpper[i2].lower);
+ i2--;
+ }
+ if (i2 < 0)
+ break;
+ leftUpper = intervalsUpper[i2].upper;
+
+ /*
+ * Find count of intervals which anyway should be placed to the
+ * right group.
+ */
+ while (i1 >= 0 && intervalsLower[i1].lower >= rightLower)
+ i1--;
+
+ /*
+ * Consider found split.
+ */
+ g_box_consider_split(&context, dim,
+ rightLower, i1 + 1, leftUpper, i2 + 1);
+ }
+ }
+
+ /*
+ * If we failed to find any acceptable splits, use trivial split.
+ */
+ if (context.first)
+ {
fallbackSplit(entryvec, v);
PG_RETURN_POINTER(v);
}
+ /* Allocate vectors for results */
nbytes = (maxoff + 2) * sizeof(OffsetNumber);
! v->spl_left = (OffsetNumber *) palloc(nbytes);
! v->spl_right = (OffsetNumber *) palloc(nbytes);
! v->spl_nleft = 0;
! v->spl_nright = 0;
! /* Allocate boxes of left and right groups */
! leftBox = empty_box();
! rightBox = empty_box();
! /*
! * Allocate an array for "Common entries" - entries which can be placed
! * to either group without affecting overlap along selected axis.
! */
! commonEntriesCount = 0;
! commonEntries = (CommonEntry *) palloc(nentries * sizeof(CommonEntry));
!
! /*
! * Distribute entries which can be distributed unambiguously, and collect
! * "common entries".
! */
! for (i = FirstOffsetNumber; i <= maxoff; i = OffsetNumberNext(i))
{
! double lower,
! upper;
! /*
! * Get upper and lower bounds along selected axis.
! */
! box = DatumGetBoxP(entryvec->vector[i].key);
! if (context.dim == 0)
{
! lower = box->low.x;
! upper = box->high.x;
}
! else
{
! lower = box->low.y;
! upper = box->high.y;
! }
! if (upper <= context.leftUpper)
! {
! /* Fits to the left group */
! if (lower >= context.rightLower)
{
! /* Fits also to the right group, so "common entry" */
! commonEntries[commonEntriesCount++].index = i;
}
else
{
! /* Doesn't fit to the right group, so join to the left group */
! PLACE_LEFT(box, i);
}
}
! else
{
! /*
! * Each entry should fit on either left or right group, so since
! * this didn't fit on the left group, it better fit in the right
! * group.
! */
! Assert(lower >= context.rightLower);
!
! /* Doesn't fit to the left group, so join to the right group */
! PLACE_RIGHT(box, i);
}
}
! /*
! * Distribute "common entries", if any
! */
! if (commonEntriesCount > 0)
{
! /*
! * Calculate minimum number of entries that must be placed in both
! * groups, to reach LIMIT_RATIO.
! */
! int m = ceil(LIMIT_RATIO * (double) nentries);
! /*
! * Calculate delta between penalties of join "common entries" to
! * different groups.
! */
! for (i = 0; i < commonEntriesCount; i++)
! {
! box = DatumGetBoxP(entryvec->vector[commonEntries[i].index].key);
! commonEntries[i].delta = Abs(box_penalty(leftBox, box) -
! box_penalty(rightBox, box));
! }
!
! /*
! * Sort "common entries" by calculated deltas in order to distribute
! * the most ambiguous entries first.
! */
! qsort(commonEntries, commonEntriesCount, sizeof(CommonEntry), common_entry_cmp);
!
! /*
! * Distribute "common entries" between groups.
! */
! for (i = 0; i < commonEntriesCount; i++)
! {
! box = DatumGetBoxP(entryvec->vector[commonEntries[i].index].key);
!
! /*
! * Check if we have to place this entry in either group to achieve
! * LIMIT_RATIO.
! */
! if (v->spl_nleft + (commonEntriesCount - i) <= m)
! PLACE_LEFT(box, commonEntries[i].index);
! else if (v->spl_nright + (commonEntriesCount - i) <= m)
! PLACE_RIGHT(box, commonEntries[i].index);
! else
! {
! /* Otherwise select the group by minimal penalty */
! if (box_penalty(leftBox, box) < box_penalty(rightBox, box))
! PLACE_LEFT(box, commonEntries[i].index);
! else
! PLACE_RIGHT(box, commonEntries[i].index);
! }
! }
! }
+ v->spl_ldatum = PointerGetDatum(leftBox);
+ v->spl_rdatum = PointerGetDatum(rightBox);
PG_RETURN_POINTER(v);
}
--
Sent via pgsql-hackers mailing list ([email protected])
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers