On 01/26/2018 07:55 AM, Peter Moser wrote:
We have now a new approach to plan and execute NORMALIZE as a special
join node of type NORMALIZE, an append-plan on the inner join path,
and a merge-join executor. For the latter, we would need to
extend nodeMergejoin.c with an point-in-range-containment join.

We are ready with a new prototype for the temporal NORMALIZE operation. In this prototype we do not rewrite queries as in the previous patch, but have one executor node, that solves the normalize operation. This executor is based on a merge-join.

Our new patch is based on top of 75f7855369ec56d4a8e7d6eae98aff1182b85cac from September 6, 2018.

The syntax is
SELECT * FROM (r NORMALIZE s USING() WITH(period_r, period_s)) c;

It currently is only implemented for empty USING clauses, and solely int4range as range attributes.

Example:

A=# table r; a | b | period_r ---+---+---------- a | B | [1,7) b | B | [3,9) c | G | [8,10) (3 rows)


A=# table s; c | d | period_s ---+---+---------- 1 | B | [2,5) 2 | B | [3,4) 3 | B | [7,9) (3 rows)


A=# SELECT * FROM (r NORMALIZE s USING() WITH(period_r, period_s)) c; period_r | a | b ----------+---+--- [1,2) | a | B [2,3) | a | B [3,4) | a | B [4,5) | a | B [5,7) | a | B [3,4) | b | B [4,5) | b | B [5,7) | b | B [7,9) | b | B (9 rows)


A=# EXPLAIN SELECT * FROM (r NORMALIZE s USING() WITH(period_r, period_s)) c; QUERY PLAN -------------------------------------------------------------------------- Result (cost=2.15..2.22 rows=3 width=18) -> Merge ??? Join (cost=2.15..2.23 rows=3 width=22) Merge Cond: (r.period_r @> (range_split(s.period_s))) -> Sort (cost=1.05..1.06 rows=3 width=18) Sort Key: r.period_r -> Seq Scan on r (cost=0.00..1.03 rows=3 width=18) -> Sort (cost=1.09..1.10 rows=3 width=4) Sort Key: (range_split(s.period_s)) USING < -> ProjectSet (cost=0.00..1.07 rows=3 width=4) -> Seq Scan on s (cost=0.00..1.03 rows=3 width=14) (10 rows)


That is, we create a new join path within sort_inner_and_outer
(joinpath.c). First two projection nodes to extract the start- and
end-timepoints of the range type used as interval, and above an
append-plan to merge both subplans. In detail, outer_path is just sort,
whereas inner_path is append of (B, ts) projection with (B, te)
projection.

We changed this implementation and use a set-returning function called "range_split", that extracts the upper and lower bound of a range and returns two tuples. For instance, a tuple '[4,10),a' becomes two tuples of the form '4,a' and '10,a'.

Hereby, B is a set of non-temporal attributes used in join equality
predicates, and [ts,te) forms the valid-time interval. Non-equality
predicates must be handled separately as a filter step.

The current prototype supports only an integer range-type without any additional non-temporal attributes (empty USING clause).

Do you think, it is a good idea to extend the sort_inner_and_outer()
with a new branch, where jointype == NORMALIZE and add the projection
and append sub-paths there?

We actually extended sort_inner_and_outer now. It is an early solution, to support discussions. Please see the two sections starting with "if (jointype == JOIN_TEMPORAL_NORMALIZE)" inside sort_inner_and_outer:

The purpose of these sections is to change the inner path's range type into its single bounds.

We accomplish this with a new function called range_split. We take the inner clause and extract the second operator of an RANGE_EQ expression out of it. We assume *for this prototype*, that their is only one such operator and that it is solely used for NORMALIZE. Then, we replace it with range_split. A range split returns a set of tuples, hence we add a new "set projection path" above the inner path, and another sort path above that.

What we like to discuss now is:
- Is sort_inner_and_outer the correct place to perform this split?
- How could we support OID_RANGE_ELEM_CONTAINED_OP for a NORMALIZE
  mergejoin executor? If we use RANGE_ELEM_CONTAINED as operator, it is
  not an equality operator, but if we use RANGE_EQ it assumes that the
  right-hand-side of the operator must be a range as well.
- Should we better change our mergeclause to a RANGE_ELEM_CONTAINED
  comparison, or keep RANGE_EQ and fix pathkeys later?
- How do we update equivalence classes, pathkeys, and any other struct,
  when changing the inner relation's data type from "int4range" to "int"
  in the query tree inside "sort_inner_and_outer" to get the correct
  ordering and data types


Best regards,
Anton, Johann, Michael, Peter

diff --git a/src/backend/executor/nodeMergejoin.c b/src/backend/executor/nodeMergejoin.c
index 5e52b90c00..ce4ffa992f 100644
--- a/src/backend/executor/nodeMergejoin.c
+++ b/src/backend/executor/nodeMergejoin.c
@@ -98,6 +98,102 @@
 #include "miscadmin.h"
 #include "utils/lsyscache.h"
 #include "utils/memutils.h"
+#include "catalog/pg_operator.h"
+#include "nodes/nodeFuncs.h"
+#include "utils/fmgroids.h"
+#include "utils/rangetypes.h"
+#include "utils/typcache.h"
+#include "access/htup_details.h"                /* for heap_getattr */
+#include "nodes/print.h"                        /* for print_slot */
+#include "utils/datum.h"                        /* for datumCopy */
+
+
+// XXX PEMOSER ----------------------------
+// !!! THis is just for prototyping, delete asap...
+
+#define TEMPORAL_DEBUG
+/*
+ * #define TEMPORAL_DEBUG
+ * XXX PEMOSER Maybe we should use execdebug.h stuff here?
+ */
+#ifdef TEMPORAL_DEBUG
+static char*
+datumToString(Oid typeinfo, Datum attr)
+{
+	Oid         typoutput;
+	bool        typisvarlena;
+	getTypeOutputInfo(typeinfo, &typoutput, &typisvarlena);
+	return OidOutputFunctionCall(typoutput, attr);
+}
+
+#define TPGdebug(...)                   { printf(__VA_ARGS__); printf("\n"); fflush(stdout); }
+#define TPGdebugDatum(attr, typeinfo)   TPGdebug("%s = %s %ld\n", #attr, datumToString(typeinfo, attr), attr)
+#define TPGdebugSlot(slot)              { printf("Printing Slot '%s'\n", #slot); print_slot(slot); fflush(stdout); }
+
+#else
+#define datumToString(typeinfo, attr)
+#define TPGdebug(...)
+#define TPGdebugDatum(attr, typeinfo)
+#define TPGdebugSlot(slot)
+#endif
+
+TypeCacheEntry *testmytypcache;
+#define setSweepline(datum) \
+	node->sweepline = datumCopy(datum, node->datumFormat->attbyval, node->datumFormat->attlen)
+
+#define freeSweepline() \
+	if (! node->datumFormat->attbyval) pfree(DatumGetPointer(node->sweepline))
+
+ /*
+  * slotGetAttrNotNull
+  *      Same as slot_getattr, but throws an error if NULL is returned.
+  */
+static Datum
+slotGetAttrNotNull(TupleTableSlot *slot, int attnum)
+{
+	bool isNull;
+	Datum result;
+
+	result = slot_getattr(slot, attnum, &isNull);
+
+	if(isNull)
+		ereport(ERROR,
+				(errcode(ERRCODE_NOT_NULL_VIOLATION),
+				 errmsg("Attribute \"%s\" at position %d is null. Temporal " \
+						"adjustment not possible.",
+				 NameStr(TupleDescAttr(slot->tts_tupleDescriptor, attnum - 1)->attname),
+				 attnum)));
+
+	return result;
+}
+
+/*
+ * heapGetAttrNotNull
+ *      Same as heap_getattr, but throws an error if NULL is returned.
+ */
+static Datum
+heapGetAttrNotNull(TupleTableSlot *slot, int attnum)
+{
+	bool isNull;
+	Datum result;
+
+	result = heap_getattr(slot->tts_tuple,
+			attnum,
+			slot->tts_tupleDescriptor,
+			&isNull);
+	if(isNull)
+		ereport(ERROR,
+				(errcode(ERRCODE_NOT_NULL_VIOLATION),
+				 errmsg("Attribute \"%s\" at position %d is null. Temporal " \
+						"adjustment not possible.",
+						NameStr(TupleDescAttr(slot->tts_tupleDescriptor,
+								attnum - 1)->attname),
+						attnum)));
+
+	return result;
+}
+// XXX PEMOSER ------------------------
+
 
 
 /*
@@ -138,6 +234,10 @@ typedef struct MergeJoinClauseData
 	 * stored here.
 	 */
 	SortSupportData ssup;
+
+	/* needed for Temporal Normalization */
+	bool             isnormalize;
+	TypeCacheEntry  *range_typcache;
 }			MergeJoinClauseData;
 
 /* Result type for MJEvalOuterValues and MJEvalInnerValues */
@@ -152,6 +252,57 @@ typedef enum
 #define MarkInnerTuple(innerTupleSlot, mergestate) \
 	ExecCopySlot((mergestate)->mj_MarkedTupleSlot, (innerTupleSlot))
 
+/*
+ * temporalAdjustmentStoreTuple
+ *      While we store result tuples, we must add the newly calculated temporal
+ *      boundaries as two scalar fields or create a single range-typed field
+ *      with the two given boundaries.
+ */
+static void
+temporalAdjustmentStoreTuple(MergeJoinState *mergestate,
+							 TupleTableSlot* slotToModify,
+							 TupleTableSlot* slotToStoreIn,
+							 Datum ts,
+							 Datum te,
+							 TypeCacheEntry *typcache)
+{
+	MemoryContext	oldContext;
+	HeapTuple		t;
+	RangeBound  	lower;
+	RangeBound  	upper;
+	bool        	empty = false;
+
+	/*
+	 * This should ideally be done with RangeBound types on the right-hand-side
+	 * created during range_split execution. Otherwise, we loose information about
+	 * inclusive/exclusive bounds and infinity. We would need to implement btree
+	 * operators for RangeBounds.
+	 */
+	lower.val = ts;
+	lower.lower = true;
+	lower.infinite = false;
+	lower.inclusive = true;
+
+	upper.val = te;
+	upper.lower = false;
+	upper.infinite = false;
+	upper.inclusive = false;
+
+	mergestate->newValues[0] = (Datum) make_range(typcache, &lower, &upper, empty);
+
+	oldContext = MemoryContextSwitchTo(mergestate->js.ps.ps_ResultTupleSlot->tts_mcxt);
+	t = heap_modify_tuple(slotToModify->tts_tuple,
+						  slotToModify->tts_tupleDescriptor,
+						  mergestate->newValues,
+						  mergestate->nullMask,
+						  mergestate->tsteMask);
+	MemoryContextSwitchTo(oldContext);
+	slotToStoreIn = ExecStoreTuple(t, slotToStoreIn, InvalidBuffer, true);
+
+	TPGdebug("Storing tuple:");
+	TPGdebugSlot(slotToStoreIn);
+}
+
 
 /*
  * MJExamineQuals
@@ -201,6 +352,8 @@ MJExamineQuals(List *mergeclauses,
 		Oid			op_righttype;
 		Oid			sortfunc;
 
+		pprint(qual);
+
 		if (!IsA(qual, OpExpr))
 			elog(ERROR, "mergejoin clause is not an OpExpr");
 
@@ -221,12 +374,31 @@ MJExamineQuals(List *mergeclauses,
 			elog(ERROR, "unsupported mergejoin strategy %d", opstrategy);
 		clause->ssup.ssup_nulls_first = nulls_first;
 
+		if (qual->opno == OID_RANGE_EQ_OP) {
+			Oid rngtypid;
+
+			// XXX PEMOSER Change opfamily and opfunc
+			qual->opfuncid = F_RANGE_CONTAINS; //<<--- opfuncid can be 0 during planning
+			qual->opno = OID_RANGE_CONTAINS_ELEM_OP; //OID_RANGE_CONTAINS_OP;
+			clause->isnormalize = true;
+
+			// Attention: cannot merge using non-equality operator 3890 <--- OID_RANGE_CONTAINS_OP
+			opfamily = 4103; //range_inclusion_ops from pg_opfamily.h
+
+			rngtypid = exprType((Node*)clause->lexpr->expr);
+			clause->range_typcache = lookup_type_cache(rngtypid, TYPECACHE_RANGE_INFO);
+			testmytypcache = clause->range_typcache;
+		} else {
+			clause->isnormalize = false;
+		}
+
+
 		/* Extract the operator's declared left/right datatypes */
 		get_op_opfamily_properties(qual->opno, opfamily, false,
 								   &op_strategy,
 								   &op_lefttype,
 								   &op_righttype);
-		if (op_strategy != BTEqualStrategyNumber)	/* should not happen */
+		if (op_strategy != BTEqualStrategyNumber && !clause->isnormalize)   /* should not happen */
 			elog(ERROR, "cannot merge using non-equality operator %u",
 				 qual->opno);
 
@@ -249,7 +421,7 @@ MJExamineQuals(List *mergeclauses,
 			/* The sort support function can provide a comparator */
 			OidFunctionCall1(sortfunc, PointerGetDatum(&clause->ssup));
 		}
-		if (clause->ssup.comparator == NULL)
+		if (clause->ssup.comparator == NULL && !clause->isnormalize)
 		{
 			/* support not available, get comparison func */
 			sortfunc = get_opfamily_proc(opfamily,
@@ -269,6 +441,77 @@ MJExamineQuals(List *mergeclauses,
 	return clauses;
 }
 
+static Datum
+getLower(Datum range, TypeCacheEntry *typcache)
+{
+	RangeBound  lower;
+	RangeBound  upper;
+	bool        empty;
+
+	range_deserialize(typcache, DatumGetRangeTypeP(range), &lower, &upper, &empty);
+
+	// XXX This is just a prototype function, we do not check for emptiness nor infinity yet...
+	// We will use RangeBounds in the future directly...
+	return lower.val;
+}
+
+static Datum
+getUpper(Datum range, TypeCacheEntry *typcache)
+{
+	RangeBound  lower;
+	RangeBound  upper;
+	bool        empty;
+
+	range_deserialize(typcache, DatumGetRangeTypeP(range), &lower, &upper, &empty);
+
+	// XXX This is just a prototype function, we do not check for emptiness nor infinity yet...
+	// We will use RangeBounds in the future directly...
+	return upper.val;
+}
+
+/*
+ * Return 0 if point is inside range, <0 if the point is right-of the second, or
+ * >0 if the point is left-of the range.
+ *
+ * This should ideally be done with RangeBound types on the right-hand-side
+ * created during range_split execution. Otherwise, we loose information about
+ * inclusive/exclusive bounds and infinity.
+ */
+static int
+ApplyNormalizeMatch(Datum ldatum, bool lisnull, Datum rdatum, bool risnull,
+					SortSupport ssup, TypeCacheEntry *typcache)
+{
+	RangeBound  lower;
+	RangeBound  upper;
+	bool        empty;
+	int32       result;
+
+	/* can't handle reverse sort order; should be prevented by optimizer */
+	Assert(!ssup->ssup_reverse);
+	Assert(!lisnull || !risnull);
+
+	if (lisnull)
+		return ssup->ssup_nulls_first ? -1 : 1;
+	if (risnull)
+		return ssup->ssup_nulls_first ? 1 : -1;
+
+	range_deserialize(typcache, DatumGetRangeTypeP(ldatum), &lower, &upper, &empty);
+
+	result = DatumGetInt32(FunctionCall2Coll(&typcache->rng_cmp_proc_finfo,
+											 typcache->rng_collation,
+											 lower.val, rdatum));
+	if (result == 1)
+		return 1;
+
+	result = DatumGetInt32(FunctionCall2Coll(&typcache->rng_cmp_proc_finfo,
+											 typcache->rng_collation,
+											 upper.val, rdatum));
+	if (result == 1)
+		return 0;
+
+	return -1;
+}
+
 /*
  * MJEvalOuterValues
  *
@@ -418,9 +661,19 @@ MJCompare(MergeJoinState *mergestate)
 			continue;
 		}
 
-		result = ApplySortComparator(clause->ldatum, clause->lisnull,
-									 clause->rdatum, clause->risnull,
-									 &clause->ssup);
+		if (clause->isnormalize)
+		{
+			result = ApplyNormalizeMatch(clause->ldatum, clause->lisnull,
+										 clause->rdatum, clause->risnull,
+										 &clause->ssup, clause->range_typcache);
+		}
+		else
+		{
+			result = ApplySortComparator(clause->ldatum, clause->lisnull,
+										 clause->rdatum, clause->risnull,
+										 &clause->ssup);
+		}
+
 
 		if (result != 0)
 			break;
@@ -611,6 +864,7 @@ ExecMergeJoin(PlanState *pstate)
 	ExprContext *econtext;
 	bool		doFillOuter;
 	bool		doFillInner;
+	TupleTableSlot *out = NULL;
 
 	CHECK_FOR_INTERRUPTS();
 
@@ -656,6 +910,13 @@ ExecMergeJoin(PlanState *pstate)
 				outerTupleSlot = ExecProcNode(outerPlan);
 				node->mj_OuterTupleSlot = outerTupleSlot;
 
+				/* XXX normalize (first call) */
+				if (node->mj_isNormalizer)
+				{
+					node->sameleft = true;
+					ExecCopySlot(node->prev, outerTupleSlot);
+				}
+
 				/* Compute join values and check for unmatchability */
 				switch (MJEvalOuterValues(node))
 				{
@@ -704,6 +965,18 @@ ExecMergeJoin(PlanState *pstate)
 				innerTupleSlot = ExecProcNode(innerPlan);
 				node->mj_InnerTupleSlot = innerTupleSlot;
 
+				/*
+				 * P1 is made of the lower or upper bounds of the valid time column,
+				 * hence it must have the same type as the range (return element type)
+				 * of lower(T) or upper(T).
+				 */
+				if (node->mj_isNormalizer)
+				{
+					node->datumFormat = TupleDescAttr(innerTupleSlot->tts_tupleDescriptor, 0);
+					setSweepline(getLower(slotGetAttrNotNull(outerTupleSlot, 1), testmytypcache));
+					TPGdebugDatum(node->sweepline, node->datumFormat->atttypid);
+				}
+
 				/* Compute join values and check for unmatchability */
 				switch (MJEvalInnerValues(node, innerTupleSlot))
 				{
@@ -789,6 +1062,10 @@ ExecMergeJoin(PlanState *pstate)
 				innerTupleSlot = node->mj_InnerTupleSlot;
 				econtext->ecxt_innertuple = innerTupleSlot;
 
+				TPGdebugSlot(outerTupleSlot);
+				TPGdebugSlot(node->prev);
+				TPGdebug("sameleft = %d", node->sameleft);
+
 				qualResult = (joinqual == NULL ||
 							  ExecQual(joinqual, econtext));
 				MJ_DEBUG_QUAL(joinqual, qualResult);
@@ -819,13 +1096,56 @@ ExecMergeJoin(PlanState *pstate)
 
 					if (qualResult)
 					{
+						TupleTableSlot *out;
+						bool            isNull;
+						Datum           currP1;
+
 						/*
 						 * qualification succeeded.  now form the desired
 						 * projection tuple and return the slot containing it.
 						 */
 						MJ_printf("ExecMergeJoin: returning tuple\n");
 
-						return ExecProject(node->js.ps.ps_ProjInfo);
+						out = ExecProject(node->js.ps.ps_ProjInfo);
+
+						if (!node->mj_isNormalizer)
+							return out;
+
+						if (node->sameleft)
+						{
+							currP1 = slot_getattr(innerTupleSlot, 1, &isNull);
+							TPGdebugDatum(currP1, node->datumFormat->atttypid);
+							if (node->sweepline < currP1)
+							{
+								temporalAdjustmentStoreTuple(node, outerTupleSlot, out, node->sweepline, currP1, testmytypcache);
+								freeSweepline();
+								setSweepline(currP1);
+
+								TPGdebugDatum(node->sweepline, node->datumFormat->atttypid);
+								TPGdebugSlot(out);
+
+								return out;
+							}
+
+							ExecCopySlot(node->prev, outerTupleSlot);
+							node->mj_JoinState = EXEC_MJ_NEXTINNER;
+						}
+						else /* not node->sameleft */
+						{
+							Datum prevTe = getUpper(heapGetAttrNotNull(node->prev, 1), testmytypcache);
+
+							if (node->sweepline < prevTe)
+								temporalAdjustmentStoreTuple(node, node->prev, out, node->sweepline, prevTe, testmytypcache);
+
+							ExecCopySlot(node->prev, outerTupleSlot);
+							freeSweepline();
+							setSweepline(getLower(slotGetAttrNotNull(outerTupleSlot, 1), testmytypcache));
+							TPGdebugDatum(node->sweepline, node->datumFormat->atttypid);
+							node->sameleft = true;
+							node->mj_JoinState = EXEC_MJ_NEXTINNER;
+							TPGdebugSlot(out);
+							return out;
+						}
 					}
 					else
 						InstrCountFiltered2(node, 1);
@@ -845,6 +1165,9 @@ ExecMergeJoin(PlanState *pstate)
 			case EXEC_MJ_NEXTINNER:
 				MJ_printf("ExecMergeJoin: EXEC_MJ_NEXTINNER\n");
 
+				if (node->mj_isNormalizer)
+					node->sameleft = true;
+
 				if (doFillInner && !node->mj_MatchedInner)
 				{
 					/*
@@ -947,6 +1270,9 @@ ExecMergeJoin(PlanState *pstate)
 			case EXEC_MJ_NEXTOUTER:
 				MJ_printf("ExecMergeJoin: EXEC_MJ_NEXTOUTER\n");
 
+				if (node->mj_isNormalizer)
+					node->sameleft = false;
+
 				if (doFillOuter && !node->mj_MatchedOuter)
 				{
 					/*
@@ -962,6 +1288,11 @@ ExecMergeJoin(PlanState *pstate)
 						return result;
 				}
 
+				// FIXME PEMOSER Only for normalizer...
+				TupleTableSlot *out = NULL;
+				if (node->mj_isNormalizer && !TupIsNull(innerTupleSlot))
+					out = ExecProject(node->js.ps.ps_ProjInfo);
+
 				/*
 				 * now we get the next outer tuple, if any
 				 */
@@ -994,6 +1325,19 @@ ExecMergeJoin(PlanState *pstate)
 							node->mj_JoinState = EXEC_MJ_ENDOUTER;
 							break;
 						}
+
+						if (node->mj_isNormalizer && !TupIsNull(node->prev) && !TupIsNull(innerTupleSlot))
+						{
+							MJ_printf("finalize normalizer!!!\n");
+							Datum prevTe = getUpper(heapGetAttrNotNull(node->prev, 1), testmytypcache);
+							TPGdebugDatum(prevTe, node->datumFormat->atttypid);
+							TPGdebugDatum(node->sweepline, node->datumFormat->atttypid);
+							MJ_debugtup(node->prev);
+							temporalAdjustmentStoreTuple(node, node->prev, out, node->sweepline, prevTe, testmytypcache);
+							node->mj_JoinState = EXEC_MJ_ENDOUTER;
+							return out;
+						}
+
 						/* Otherwise we're done. */
 						return NULL;
 				}
@@ -1048,7 +1392,7 @@ ExecMergeJoin(PlanState *pstate)
 				compareResult = MJCompare(node);
 				MJ_DEBUG_COMPARE(compareResult);
 
-				if (compareResult == 0)
+				if (compareResult == 0 || (node->mj_isNormalizer && node->mj_markSet))
 				{
 					/*
 					 * the merge clause matched so now we restore the inner
@@ -1085,7 +1429,10 @@ ExecMergeJoin(PlanState *pstate)
 						/* we need not do MJEvalInnerValues again */
 					}
 
-					node->mj_JoinState = EXEC_MJ_JOINTUPLES;
+					if (node->mj_isNormalizer)
+						node->mj_JoinState = EXEC_MJ_SKIP_TEST;
+					else
+						node->mj_JoinState = EXEC_MJ_JOINTUPLES;
 				}
 				else
 				{
@@ -1190,6 +1537,7 @@ ExecMergeJoin(PlanState *pstate)
 					MarkInnerTuple(node->mj_InnerTupleSlot, node);
 
 					node->mj_JoinState = EXEC_MJ_JOINTUPLES;
+					node->mj_markSet = true;
 				}
 				else if (compareResult < 0)
 					node->mj_JoinState = EXEC_MJ_SKIPOUTER_ADVANCE;
@@ -1338,6 +1686,9 @@ ExecMergeJoin(PlanState *pstate)
 			case EXEC_MJ_ENDOUTER:
 				MJ_printf("ExecMergeJoin: EXEC_MJ_ENDOUTER\n");
 
+				if (node->mj_isNormalizer)
+					return NULL;
+
 				Assert(doFillInner);
 
 				if (!node->mj_MatchedInner)
@@ -1438,6 +1789,8 @@ ExecInitMergeJoin(MergeJoin *node, EState *estate, int eflags)
 	MergeJoinState *mergestate;
 	TupleDesc	outerDesc,
 				innerDesc;
+	int         numCols = list_length(node->join.plan.targetlist);
+
 
 	/* check for unsupported flags */
 	Assert(!(eflags & (EXEC_FLAG_BACKWARD | EXEC_FLAG_MARK)));
@@ -1529,12 +1882,15 @@ ExecInitMergeJoin(MergeJoin *node, EState *estate, int eflags)
 		ExecInitQual(node->join.joinqual, (PlanState *) mergestate);
 	/* mergeclauses are handled below */
 
+	mergestate->prev = ExecInitExtraTupleSlot(estate, outerDesc);
+
 	/*
 	 * detect whether we need only consider the first matching inner tuple
 	 */
 	mergestate->js.single_match = (node->join.inner_unique ||
 								   node->join.jointype == JOIN_SEMI);
 
+	mergestate->mj_isNormalizer = false;
 	/* set up null tuples for outer joins, if needed */
 	switch (node->join.jointype)
 	{
@@ -1584,6 +1940,20 @@ ExecInitMergeJoin(MergeJoin *node, EState *estate, int eflags)
 						(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
 						 errmsg("FULL JOIN is only supported with merge-joinable join conditions")));
 			break;
+		case JOIN_TEMPORAL_NORMALIZE:
+			mergestate->mj_FillOuter = false;
+			mergestate->mj_FillInner = false;
+			mergestate->mj_isNormalizer = true;
+
+			/* Init buffer values for heap_modify_tuple */
+			mergestate->newValues = palloc0(sizeof(Datum) * numCols);
+			mergestate->nullMask = palloc0(sizeof(bool) * numCols);
+			mergestate->tsteMask = palloc0(sizeof(bool) * numCols);
+
+			/* Not right??? -> Always the last in the list, since we add it during planning phase
+			 * XXX PEMOSER We need to find the correct position of "period" and set that here */
+			mergestate->tsteMask[/*numCols - 1*/0] = true;
+			break;
 		default:
 			elog(ERROR, "unrecognized join type: %d",
 				 (int) node->join.jointype);
diff --git a/src/backend/executor/nodeSort.c b/src/backend/executor/nodeSort.c
index 0d2acb665a..fcfd6f51b9 100644
--- a/src/backend/executor/nodeSort.c
+++ b/src/backend/executor/nodeSort.c
@@ -86,6 +86,12 @@ ExecSort(PlanState *pstate)
 		outerNode = outerPlanState(node);
 		tupDesc = ExecGetResultType(outerNode);
 
+		// XXX PEMOSER Manually fix sort operation of second attribute (former time, now upper(time))
+		// We must fix that in general... this is just a proof of concept brute-force solution!
+		if (plannode->plan.lefttree->type == T_ProjectSet) {
+			plannode->sortOperators[0] = 97; // 97 means "<" for int4, it was "<" for int4range
+		}
+
 		tuplesortstate = tuplesort_begin_heap(tupDesc,
 											  plannode->numCols,
 											  plannode->sortColIdx,
diff --git a/src/backend/optimizer/path/costsize.c b/src/backend/optimizer/path/costsize.c
index 7bf67a0529..05a16bde0c 100644
--- a/src/backend/optimizer/path/costsize.c
+++ b/src/backend/optimizer/path/costsize.c
@@ -4543,6 +4543,7 @@ calc_joinrel_size_estimate(PlannerInfo *root,
 	switch (jointype)
 	{
 		case JOIN_INNER:
+		case JOIN_TEMPORAL_NORMALIZE:
 			nrows = outer_rows * inner_rows * fkselec * jselec;
 			/* pselec not used */
 			break;
diff --git a/src/backend/optimizer/path/joinpath.c b/src/backend/optimizer/path/joinpath.c
index 642f951093..ed779c2c93 100644
--- a/src/backend/optimizer/path/joinpath.c
+++ b/src/backend/optimizer/path/joinpath.c
@@ -22,6 +22,11 @@
 #include "optimizer/pathnode.h"
 #include "optimizer/paths.h"
 #include "optimizer/planmain.h"
+#include "catalog/pg_operator.h"
+#include "optimizer/tlist.h"
+#include "utils/fmgroids.h"
+#include "nodes/makefuncs.h"
+#include "utils/lsyscache.h"
 
 /* Hook for plugins to get control in add_paths_to_joinrel() */
 set_join_pathlist_hook_type set_join_pathlist_hook = NULL;
@@ -195,7 +200,7 @@ add_paths_to_joinrel(PlannerInfo *root,
 	 * way of implementing a full outer join, so override enable_mergejoin if
 	 * it's a full join.
 	 */
-	if (enable_mergejoin || jointype == JOIN_FULL)
+	if (enable_mergejoin || jointype == JOIN_FULL || jointype == JOIN_TEMPORAL_NORMALIZE)
 		extra.mergeclause_list = select_mergejoin_clauses(root,
 														  joinrel,
 														  outerrel,
@@ -937,6 +942,80 @@ sort_inner_and_outer(PlannerInfo *root,
 		Assert(inner_path);
 		jointype = JOIN_INNER;
 	}
+	else if (jointype == JOIN_TEMPORAL_NORMALIZE)
+	{
+		/*
+		 * outer_path is just sort; inner_path is append of (B, ts) projection with (B, te) projection
+		 */
+		List *exprs = NIL; // to collect inner relation's targets
+		FuncExpr *f_split;
+		Var *innervar;
+
+		foreach(l, extra->mergeclause_list)
+		{
+			RestrictInfo       *rinfo = (RestrictInfo *) lfirst(l);
+			Expr               *clause  = (Expr *) rinfo->clause;
+
+			if (IsA(clause, OpExpr))
+			{
+				OpExpr *opexpr = (OpExpr *) clause;
+				if (opexpr->opno == OID_RANGE_EQ_OP)
+				{
+					if (IsA(lsecond(opexpr->args), Var)) {
+
+						// lsecond because it is from the second relation (=inner)
+						innervar = lsecond(opexpr->args);
+
+						f_split = makeFuncExpr(F_RANGE_SPLIT, 23, list_make1(innervar), 0, 0, 0);
+						f_split->funcretset = true;
+
+						/*
+						 * OUTER_VAR cannot be used here, because path creation does not know about it,
+						 * it will be introduced in plan creation.
+						 */
+						innervar = makeVar(2, innervar->varattno, f_split->funcresulttype, -1, 0, 0);
+					}
+				}
+				else
+				{
+					// lsecond because it is from the second relation (=inner)
+					exprs = lappend(exprs, lsecond(opexpr->args));
+				}
+			}
+		}
+
+		RestrictInfo       *rinfo = (RestrictInfo *) linitial(extra->mergeclause_list);
+		OpExpr *opexpr = (OpExpr *) rinfo->clause;
+		lsecond(opexpr->args) = f_split;
+		rinfo->right_em->em_expr = f_split;
+		rinfo->mergeopfamilies = get_mergejoin_opfamilies(opexpr->opno);
+
+		PathTarget *target_split = makeNode(PathTarget);
+		target_split->exprs = lappend(exprs, f_split);
+
+		set_pathtarget_cost_width(root, target_split);
+
+		inner_path = (Path *) create_set_projection_path(root, innerrel, inner_path, target_split);
+		innerrel->reltarget->exprs = inner_path->pathtarget->exprs;//list_make1(innervar);//copyObject(inner_path->pathtarget->exprs);
+		joinrel->reltarget->exprs = list_concat(copyObject(outerrel->reltarget->exprs), innerrel->reltarget->exprs);
+		set_pathtarget_cost_width(root, joinrel->reltarget);
+
+		innerrel->cheapest_total_path = inner_path;
+		innerrel->cheapest_startup_path = inner_path;
+		innerrel->cheapest_parameterized_paths = inner_path;
+		innerrel->pathlist = list_make1(inner_path);
+
+		extra->sjinfo->semi_rhs_exprs = list_make1(f_split);
+		extra->sjinfo->semi_operators = NIL;
+		extra->sjinfo->semi_operators = lappend_oid(extra->sjinfo->semi_operators, 96);
+
+		Assert(inner_path);
+
+		innerrel->cheapest_total_path = inner_path;
+		innerrel->cheapest_startup_path = inner_path;
+		innerrel->cheapest_parameterized_paths = inner_path;
+	}
+
 
 	/*
 	 * If the joinrel is parallel-safe, we may be able to consider a partial
@@ -1028,6 +1107,16 @@ sort_inner_and_outer(PlannerInfo *root,
 		merge_pathkeys = build_join_pathkeys(root, joinrel, jointype,
 											 outerkeys);
 
+		if (jointype == JOIN_TEMPORAL_NORMALIZE)
+		{
+			inner_path = (Path *) create_sort_path(root, innerrel, inner_path, innerkeys, -1);
+			innerrel->cheapest_total_path = inner_path;
+			innerrel->cheapest_startup_path = inner_path;
+			innerrel->cheapest_parameterized_paths = inner_path;
+			innerrel->pathlist = list_make1(inner_path);
+			Assert(inner_path);
+		}
+
 		/*
 		 * And now we can make the path.
 		 *
@@ -1360,6 +1449,7 @@ match_unsorted_outer(PlannerInfo *root,
 			break;
 		case JOIN_RIGHT:
 		case JOIN_FULL:
+		case JOIN_TEMPORAL_NORMALIZE:
 			nestjoinOK = false;
 			useallclauses = true;
 			break;
@@ -1686,6 +1776,10 @@ hash_inner_and_outer(PlannerInfo *root,
 	List	   *hashclauses;
 	ListCell   *l;
 
+	/* Hashjoin is not allowed for temporal NORMALIZE */
+	if (jointype == JOIN_TEMPORAL_NORMALIZE)
+		return;
+
 	/*
 	 * We need to build only one hashclauses list for any given pair of outer
 	 * and inner relations; all of the hashable clauses will be used as keys.
diff --git a/src/backend/optimizer/path/joinrels.c b/src/backend/optimizer/path/joinrels.c
index d3d21fed5d..2dfbcafff3 100644
--- a/src/backend/optimizer/path/joinrels.c
+++ b/src/backend/optimizer/path/joinrels.c
@@ -898,6 +898,21 @@ populate_joinrel_with_paths(PlannerInfo *root, RelOptInfo *rel1,
 								 JOIN_ANTI, sjinfo,
 								 restrictlist);
 			break;
+		case JOIN_TEMPORAL_NORMALIZE:
+			if (is_dummy_rel(rel1) || is_dummy_rel(rel2) ||
+				restriction_is_constant_false(restrictlist, joinrel, false))
+			{
+				mark_dummy_rel(joinrel);
+				break;
+			}
+
+			/*
+			 * Temporal normalization does not support re-ordering of rels
+			 */
+			add_paths_to_joinrel(root, joinrel, rel1, rel2,
+								 JOIN_TEMPORAL_NORMALIZE, sjinfo,
+								 restrictlist);
+			break;
 		default:
 			/* other values not expected here */
 			elog(ERROR, "unrecognized join type: %d", (int) sjinfo->jointype);
diff --git a/src/backend/optimizer/path/pathkeys.c b/src/backend/optimizer/path/pathkeys.c
index ec66cb9c3c..334073b0fb 100644
--- a/src/backend/optimizer/path/pathkeys.c
+++ b/src/backend/optimizer/path/pathkeys.c
@@ -833,7 +833,12 @@ build_join_pathkeys(PlannerInfo *root,
 					JoinType jointype,
 					List *outer_pathkeys)
 {
-	if (jointype == JOIN_FULL || jointype == JOIN_RIGHT)
+	/*
+	 * TEMPORAL NORMALIZE: To improve this, we would need to remove only
+	 * temporal range types from the path key list, not all
+	 */
+	if (jointype == JOIN_FULL || jointype == JOIN_RIGHT
+			|| jointype == JOIN_TEMPORAL_NORMALIZE)
 		return NIL;
 
 	/*
diff --git a/src/backend/optimizer/plan/createplan.c b/src/backend/optimizer/plan/createplan.c
index ae41c9efa0..ef306c7c59 100644
--- a/src/backend/optimizer/plan/createplan.c
+++ b/src/backend/optimizer/plan/createplan.c
@@ -4107,6 +4107,15 @@ create_mergejoin_plan(PlannerInfo *root,
 	/* Costs of sort and material steps are included in path cost already */
 	copy_generic_path_info(&join_plan->join.plan, &best_path->jpath.path);
 
+	/*
+	 * XXX PEMOSER NORMALIZE needs a result node above to properly
+	 * handle target lists, functions and constants
+	 * Maybe we need to refine this like in create_unique_plan:
+	 * "If the top plan node can't do projections..."
+	 */
+	if (best_path->jpath.jointype == JOIN_TEMPORAL_NORMALIZE)
+		join_plan = make_result(tlist, NULL, join_plan);
+
 	return join_plan;
 }
 
diff --git a/src/backend/optimizer/plan/initsplan.c b/src/backend/optimizer/plan/initsplan.c
index 01335db511..aa6298886a 100644
--- a/src/backend/optimizer/plan/initsplan.c
+++ b/src/backend/optimizer/plan/initsplan.c
@@ -31,6 +31,7 @@
 #include "parser/analyze.h"
 #include "rewrite/rewriteManip.h"
 #include "utils/lsyscache.h"
+#include "catalog/pg_operator.h"
 
 
 /* These parameters are set by GUC */
@@ -896,6 +897,7 @@ deconstruct_recurse(PlannerInfo *root, Node *jtnode, bool below_outer_join,
 		switch (j->jointype)
 		{
 			case JOIN_INNER:
+			case JOIN_TEMPORAL_NORMALIZE:
 				leftjoinlist = deconstruct_recurse(root, j->larg,
 												   below_outer_join,
 												   &leftids, &left_inners,
@@ -1018,7 +1020,7 @@ deconstruct_recurse(PlannerInfo *root, Node *jtnode, bool below_outer_join,
 										*inner_join_rels,
 										j->jointype,
 										my_quals);
-			if (j->jointype == JOIN_SEMI)
+			if (j->jointype == JOIN_SEMI || j->jointype == JOIN_TEMPORAL_NORMALIZE)
 				ojscope = NULL;
 			else
 				ojscope = bms_union(sjinfo->min_lefthand,
@@ -1437,7 +1439,8 @@ compute_semijoin_info(SpecialJoinInfo *sjinfo, List *clause)
 	sjinfo->semi_rhs_exprs = NIL;
 
 	/* Nothing more to do if it's not a semijoin */
-	if (sjinfo->jointype != JOIN_SEMI)
+	if (sjinfo->jointype != JOIN_SEMI
+			&& sjinfo->jointype != JOIN_TEMPORAL_NORMALIZE)
 		return;
 
 	/*
@@ -2621,6 +2624,10 @@ check_mergejoinable(RestrictInfo *restrictinfo)
 	opno = ((OpExpr *) clause)->opno;
 	leftarg = linitial(((OpExpr *) clause)->args);
 
+	// XXX PEMOSER Hardcoded NORMALIZE detection... change this. Read the note below...
+	if (opno == OID_RANGE_EQ_OP)
+		restrictinfo->temp_normalizer = true;
+
 	if (op_mergejoinable(opno, exprType(leftarg)) &&
 		!contain_volatile_functions((Node *) clause))
 		restrictinfo->mergeopfamilies = get_mergejoin_opfamilies(opno);
diff --git a/src/backend/optimizer/prep/prepjointree.c b/src/backend/optimizer/prep/prepjointree.c
index c3f46a26c3..81a3152de8 100644
--- a/src/backend/optimizer/prep/prepjointree.c
+++ b/src/backend/optimizer/prep/prepjointree.c
@@ -785,6 +785,7 @@ pull_up_subqueries_recurse(PlannerInfo *root, Node *jtnode,
 		switch (j->jointype)
 		{
 			case JOIN_INNER:
+			case JOIN_TEMPORAL_NORMALIZE:
 
 				/*
 				 * INNER JOIN can allow deletion of either child node, but not
diff --git a/src/backend/optimizer/util/restrictinfo.c b/src/backend/optimizer/util/restrictinfo.c
index edf5a4807f..45d573eccd 100644
--- a/src/backend/optimizer/util/restrictinfo.c
+++ b/src/backend/optimizer/util/restrictinfo.c
@@ -186,6 +186,7 @@ make_restrictinfo_internal(Expr *clause,
 	restrictinfo->outer_selec = -1;
 
 	restrictinfo->mergeopfamilies = NIL;
+	restrictinfo->temp_normalizer = false;
 
 	restrictinfo->left_ec = NULL;
 	restrictinfo->right_ec = NULL;
diff --git a/src/backend/parser/gram.y b/src/backend/parser/gram.y
index 4bd2223f26..3aecacfe1c 100644
--- a/src/backend/parser/gram.y
+++ b/src/backend/parser/gram.y
@@ -432,6 +432,10 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query);
 %type <node>	join_outer join_qual
 %type <jtype>	join_type
 
+%type <node>    normalizer_qual
+%type <jexpr>   normalized_table
+%type <list>	temporal_bounds temporal_bounds_list
+
 %type <list>	extract_list overlay_list position_list
 %type <list>	substr_list trim_list
 %type <list>	opt_interval interval_second
@@ -654,7 +658,7 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query);
 
 	MAPPING MATCH MATERIALIZED MAXVALUE METHOD MINUTE_P MINVALUE MODE MONTH_P MOVE
 
-	NAME_P NAMES NATIONAL NATURAL NCHAR NEW NEXT NO NONE
+	NAME_P NAMES NATIONAL NATURAL NCHAR NEW NEXT NO NONE NORMALIZE
 	NOT NOTHING NOTIFY NOTNULL NOWAIT NULL_P NULLIF
 	NULLS_P NUMERIC
 
@@ -11921,6 +11925,11 @@ table_ref:	relation_expr opt_alias_clause
 					$2->alias = $4;
 					$$ = (Node *) $2;
 				}
+			| '(' normalized_table ')' alias_clause
+				{
+					$2->alias = $4;
+					$$ = (Node *) $2;
+				}
 		;
 
 
@@ -12039,6 +12048,59 @@ opt_alias_clause: alias_clause						{ $$ = $1; }
 			| /*EMPTY*/								{ $$ = NULL; }
 		;
 
+/*
+ * Temporal alignment statements
+ */
+temporal_bounds: WITH '(' temporal_bounds_list ')'				{ $$ = $3; }
+		;
+
+temporal_bounds_list:
+			columnref
+				{
+					$$ = list_make1($1);
+				}
+			| temporal_bounds_list ',' columnref
+				{
+					$$ = lappend($1, $3);
+				}
+		;
+
+normalizer_qual:
+			USING '(' name_list ')'					{ $$ = (Node *) $3; }
+			| USING '(' ')'							{ $$ = (Node *) NIL; }
+		;
+
+normalized_table:
+			table_ref NORMALIZE table_ref normalizer_qual temporal_bounds
+				{
+					JoinExpr *n = makeNode(JoinExpr);
+					n->jointype = JOIN_TEMPORAL_NORMALIZE;
+					n->isNatural = false;
+					n->larg = $1;
+					n->rarg = $3;
+
+					n->usingClause = NIL;
+
+					if ($4 != NULL && IsA($4, List))
+						n->usingClause = (List *) $4; /* USING clause */
+
+					/*
+					 * A list for our valid-time boundaries with two range typed
+					 * values.
+					 */
+					if(list_length($5) == 2)
+						n->temporalBounds = $5;
+					else
+						ereport(ERROR,
+								(errcode(ERRCODE_SYNTAX_ERROR),
+								 errmsg("Temporal adjustment boundaries must " \
+										"have two range typed values"),
+								 parser_errposition(@5)));
+
+					$$ = n;
+				}
+		;
+
 /*
  * func_alias_clause can include both an Alias and a coldeflist, so we make it
  * return a 2-element list that gets disassembled by calling production.
@@ -15451,6 +15513,7 @@ reserved_keyword:
 			| LIMIT
 			| LOCALTIME
 			| LOCALTIMESTAMP
+			| NORMALIZE
 			| NOT
 			| NULL_P
 			| OFFSET
diff --git a/src/backend/parser/parse_clause.c b/src/backend/parser/parse_clause.c
index cfd4b91897..ff2020f724 100644
--- a/src/backend/parser/parse_clause.c
+++ b/src/backend/parser/parse_clause.c
@@ -62,7 +62,7 @@ static void extractRemainingColumns(List *common_colnames,
 						List **res_colnames, List **res_colvars);
 static Node *transformJoinUsingClause(ParseState *pstate,
 						 RangeTblEntry *leftRTE, RangeTblEntry *rightRTE,
-						 List *leftVars, List *rightVars);
+						 List *leftVars, List *rightVars, List *normalizeVars);
 static Node *transformJoinOnClause(ParseState *pstate, JoinExpr *j,
 					  List *namespace);
 static RangeTblEntry *getRTEForSpecialRelationTypes(ParseState *pstate,
@@ -339,7 +339,7 @@ extractRemainingColumns(List *common_colnames,
 static Node *
 transformJoinUsingClause(ParseState *pstate,
 						 RangeTblEntry *leftRTE, RangeTblEntry *rightRTE,
-						 List *leftVars, List *rightVars)
+						 List *leftVars, List *rightVars, List *normalizeVars)
 {
 	Node	   *result;
 	List	   *andargs = NIL;
@@ -373,6 +373,17 @@ transformJoinUsingClause(ParseState *pstate,
 		andargs = lappend(andargs, e);
 	}
 
+	/* Temporal NORMALIZE appends an expression to compare temporal bounds */
+	if (normalizeVars)
+	{
+		A_Expr *e;
+		e = makeSimpleA_Expr(AEXPR_OP, "=",
+							 (Node *) copyObject(linitial(normalizeVars)),
+							 (Node *) copyObject(lsecond(normalizeVars)),
+							 -1);
+		andargs = lappend(andargs, e);
+	}
+
 	/* Only need an AND if there's more than one join column */
 	if (list_length(andargs) == 1)
 		result = (Node *) linitial(andargs);
@@ -1232,6 +1243,7 @@ transformFromClauseItem(ParseState *pstate, Node *n,
 		int			sv_namespace_length;
 		RangeTblEntry *rte;
 		int			k;
+		int         isNormalize = (j->jointype == JOIN_TEMPORAL_NORMALIZE);
 
 		/*
 		 * Recursively process the left subtree, then the right.  We must do
@@ -1342,7 +1354,8 @@ transformFromClauseItem(ParseState *pstate, Node *n,
 		res_colnames = NIL;
 		res_colvars = NIL;
 
-		if (j->usingClause)
+		/* NORMALIZE supports empty using clauses */
+		if (j->usingClause || (isNormalize && j->usingClause == NIL))
 		{
 			/*
 			 * JOIN/USING (or NATURAL JOIN, as transformed above). Transform
@@ -1352,6 +1365,7 @@ transformFromClauseItem(ParseState *pstate, Node *n,
 			List	   *ucols = j->usingClause;
 			List	   *l_usingvars = NIL;
 			List	   *r_usingvars = NIL;
+			List       *normalize_vars = NIL;
 			ListCell   *ucol;
 
 			Assert(j->quals == NULL);	/* shouldn't have ON() too */
@@ -1437,11 +1451,90 @@ transformFromClauseItem(ParseState *pstate, Node *n,
 														 r_colvar));
 			}
 
+			/* Only for temporal NORMALIZE */
+			if (isNormalize)
+			{
+				int         ndx = 0;
+				ListCell   *col;
+				Var        *l_boundvar;
+				Var        *r_boundvar;
+
+				int         l_bound_index = -1;
+				int         r_bound_index = -1;
+				char       *l_bound;
+				char       *r_bound;
+				ListCell *lboundcol = linitial(((ColumnRef *)linitial(j->temporalBounds))->fields);
+				ListCell *rboundcol = linitial(((ColumnRef *)lsecond(j->temporalBounds))->fields);
+
+				l_bound = strVal(lboundcol);
+				r_bound = strVal(rboundcol);
+
+				/* Find the first bound in left input */
+				foreach(col, l_colnames)
+				{
+					char       *l_colname = strVal(lfirst(col));
+
+					if (strcmp(l_colname, l_bound) == 0)
+					{
+						if (l_bound_index >= 0)
+							ereport(ERROR,
+									(errcode(ERRCODE_AMBIGUOUS_COLUMN),
+											 errmsg("temporal bound name \"%s\" appears more than once in left table",
+													l_bound)));
+						l_bound_index = ndx;
+					}
+					ndx++;
+				}
+
+				if (l_bound_index < 0)
+					ereport(ERROR,
+							(errcode(ERRCODE_UNDEFINED_COLUMN),
+									 errmsg("column \"%s\" specified in normalizer's WITH clause does not exist in left table",
+											l_bound)));
+
+				/* Find the second bound in right input */
+				ndx = 0;
+				foreach(col, r_colnames)
+				{
+					char       *r_colname = strVal(lfirst(col));
+
+					if (strcmp(r_colname, r_bound) == 0)
+					{
+						if (r_bound_index >= 0)
+							ereport(ERROR,
+									(errcode(ERRCODE_AMBIGUOUS_COLUMN),
+											 errmsg("temporal bound name \"%s\" appears more than once in right table",
+													l_bound)));
+						r_bound_index = ndx;
+					}
+					ndx++;
+				}
+
+				if (r_bound_index < 0)
+					ereport(ERROR,
+							(errcode(ERRCODE_UNDEFINED_COLUMN),
+									 errmsg("column \"%s\" specified in normalizer's WITH clause does not exist in right table",
+											r_bound)));
+
+				l_boundvar = list_nth(l_colvars, l_bound_index);
+				normalize_vars = lappend(normalize_vars, l_boundvar);
+				r_boundvar = list_nth(r_colvars, r_bound_index);
+				normalize_vars = lappend(normalize_vars, r_boundvar);
+
+				res_colnames = lappend(res_colnames, lboundcol);
+				res_colvars = lappend(res_colvars,
+									  buildMergedJoinVar(pstate,
+														 j->jointype,
+														 l_boundvar,
+														 r_boundvar));
+			}
+
 			j->quals = transformJoinUsingClause(pstate,
 												l_rte,
 												r_rte,
 												l_usingvars,
-												r_usingvars);
+												r_usingvars,
+												normalize_vars);
 		}
 		else if (j->quals)
 		{
@@ -1457,13 +1550,21 @@ transformFromClauseItem(ParseState *pstate, Node *n,
 		extractRemainingColumns(res_colnames,
 								l_colnames, l_colvars,
 								&l_colnames, &l_colvars);
-		extractRemainingColumns(res_colnames,
-								r_colnames, r_colvars,
-								&r_colnames, &r_colvars);
+
+		//Temporal normalizers expose only outer relation's columns...
+		if (!isNormalize)
+			extractRemainingColumns(res_colnames,
+									r_colnames, r_colvars,
+									&r_colnames, &r_colvars);
+
 		res_colnames = list_concat(res_colnames, l_colnames);
 		res_colvars = list_concat(res_colvars, l_colvars);
-		res_colnames = list_concat(res_colnames, r_colnames);
-		res_colvars = list_concat(res_colvars, r_colvars);
+
+		if (!isNormalize)
+		{
+			res_colnames = list_concat(res_colnames, r_colnames);
+			res_colvars = list_concat(res_colvars, r_colvars);
+		}
 
 		/*
 		 * Check alias (AS clause), if any.
@@ -1606,6 +1707,7 @@ buildMergedJoinVar(ParseState *pstate, JoinType jointype,
 	switch (jointype)
 	{
 		case JOIN_INNER:
+		case JOIN_TEMPORAL_NORMALIZE:
 
 			/*
 			 * We can use either var; prefer non-coerced one if available.
diff --git a/src/backend/utils/adt/rangetypes.c b/src/backend/utils/adt/rangetypes.c
index 5a5d0a0b8f..2ca5ba52cc 100644
--- a/src/backend/utils/adt/rangetypes.c
+++ b/src/backend/utils/adt/rangetypes.c
@@ -40,6 +40,7 @@
 #include "utils/lsyscache.h"
 #include "utils/rangetypes.h"
 #include "utils/timestamp.h"
+#include "funcapi.h"
 
 
 #define RANGE_EMPTY_LITERAL "empty"
@@ -466,6 +467,79 @@ range_upper(PG_FUNCTION_ARGS)
 	PG_RETURN_DATUM(upper.val);
 }
 
+/* split lower/upper bound into two rows of data */
+Datum
+range_split(PG_FUNCTION_ARGS)
+{
+	typedef struct
+	{
+		RangeBound	lower;
+		RangeBound	upper;
+		bool		empty;
+	} RangeSplitFuncContext;
+
+	FuncCallContext *funcctx;
+	MemoryContext oldcontext;
+	RangeSplitFuncContext *fctx;
+
+	/* stuff done only on the first call of the function */
+	if (SRF_IS_FIRSTCALL())
+	{
+		RangeType  		*r1;
+		TypeCacheEntry  *typcache;
+
+		/* create a function context for cross-call persistence */
+		funcctx = SRF_FIRSTCALL_INIT();
+
+		/*
+		 * switch to memory context appropriate for multiple function calls
+		 */
+		oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
+
+		r1 = PG_GETARG_RANGE_P(0);
+
+		/* allocate memory for user context */
+		fctx = (RangeSplitFuncContext *) palloc(sizeof(RangeSplitFuncContext));
+
+		/*
+		 * We cannot use range_get_typecache, because it would overwrite
+		 * fcinfo->flinfo->fn_extra
+		 */
+		typcache = lookup_type_cache(RangeTypeGetOid(r1), TYPECACHE_RANGE_INFO);
+		if (typcache->rngelemtype == NULL)
+			elog(ERROR, "type %u is not a range type", RangeTypeGetOid(r1));
+		range_deserialize(typcache, r1, &fctx->lower, &fctx->upper, &fctx->empty);
+
+		funcctx->user_fctx = fctx;
+		MemoryContextSwitchTo(oldcontext);
+	}
+
+	/* stuff done on every call of the function */
+	funcctx = SRF_PERCALL_SETUP();
+	fctx = (RangeSplitFuncContext *) funcctx->user_fctx;
+
+	if (funcctx->call_cntr == 0)
+	{
+		/* Return NULL if there's no finite lower bound */
+		if (fctx->empty || fctx->lower.infinite)
+			SRF_RETURN_NEXT_NULL(funcctx);
+
+		SRF_RETURN_NEXT(funcctx, fctx->lower.val);
+	}
+
+	if (funcctx->call_cntr == 1)
+	{
+		/* Return NULL if there's no finite upper bound */
+		if (fctx->empty || fctx->upper.infinite)
+			SRF_RETURN_NEXT_NULL(funcctx);
+
+		SRF_RETURN_NEXT(funcctx, fctx->upper.val);
+	}
+
+	/* done, after extracting lower and upper bounds */
+	SRF_RETURN_DONE(funcctx);
+}
+
 
 /* range -> bool functions */
 
diff --git a/src/backend/utils/adt/selfuncs.c b/src/backend/utils/adt/selfuncs.c
index f1c78ffb65..2280d0c9a8 100644
--- a/src/backend/utils/adt/selfuncs.c
+++ b/src/backend/utils/adt/selfuncs.c
@@ -2304,6 +2304,7 @@ eqjoinsel(PG_FUNCTION_ARGS)
 		case JOIN_INNER:
 		case JOIN_LEFT:
 		case JOIN_FULL:
+		case JOIN_TEMPORAL_NORMALIZE:
 			selec = eqjoinsel_inner(operator, &vardata1, &vardata2);
 			break;
 		case JOIN_SEMI:
diff --git a/src/include/catalog/pg_operator.dat b/src/include/catalog/pg_operator.dat
index d9b6bad614..d08b87fd18 100644
--- a/src/include/catalog/pg_operator.dat
+++ b/src/include/catalog/pg_operator.dat
@@ -3141,7 +3141,7 @@
   oprrest => 'scalargesel', oprjoin => 'scalargejoinsel' },
 
 # generic range type operators
-{ oid => '3882', descr => 'equal',
+{ oid => '3882', descr => 'equal', oid_symbol => 'OID_RANGE_EQ_OP',
   oprname => '=', oprcanmerge => 't', oprcanhash => 't', oprleft => 'anyrange',
   oprright => 'anyrange', oprresult => 'bool', oprcom => '=(anyrange,anyrange)',
   oprnegate => '<>(anyrange,anyrange)', oprcode => 'range_eq',
diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat
index 860571440a..cc88b5dfc0 100644
--- a/src/include/catalog/pg_proc.dat
+++ b/src/include/catalog/pg_proc.dat
@@ -9538,6 +9538,10 @@
 { oid => '3867',
   proname => 'range_union', prorettype => 'anyrange',
   proargtypes => 'anyrange anyrange', prosrc => 'range_union' },
+{ oid => '3996',
+  descr => 'lower and upper bound of range returned as two tuples',
+  proname => 'range_split', prorettype => 'anyelement', proretset => 't',
+  proargtypes => 'anyrange', prosrc => 'range_split' },
 { oid => '4057',
   descr => 'the smallest range which includes both of the given ranges',
   proname => 'range_merge', prorettype => 'anyrange',
diff --git a/src/include/executor/execdebug.h b/src/include/executor/execdebug.h
index 236b2cc4fd..a952b80d7f 100644
--- a/src/include/executor/execdebug.h
+++ b/src/include/executor/execdebug.h
@@ -82,6 +82,7 @@
  *		sort node debugging defines
  * ----------------
  */
+#define EXEC_SORTDEBUG
 #ifdef EXEC_SORTDEBUG
 #define SO_nodeDisplay(l)				nodeDisplay(l)
 #define SO_printf(s)					printf(s)
@@ -96,14 +97,15 @@
  *		merge join debugging defines
  * ----------------
  */
+#define EXEC_MERGEJOINDEBUG
 #ifdef EXEC_MERGEJOINDEBUG
 
 #define MJ_nodeDisplay(l)				nodeDisplay(l)
-#define MJ_printf(s)					printf(s)
-#define MJ1_printf(s, p)				printf(s, p)
-#define MJ2_printf(s, p1, p2)			printf(s, p1, p2)
-#define MJ_debugtup(slot)				debugtup(slot, NULL)
-#define MJ_dump(state)					ExecMergeTupleDump(state)
+#define MJ_printf(s)					printf(s); fflush(stdout)
+#define MJ1_printf(s, p)				printf(s, p); fflush(stdout)
+#define MJ2_printf(s, p1, p2)			printf(s, p1, p2); fflush(stdout)
+#define MJ_debugtup(slot)				debugtup(slot, NULL); fflush(stdout)
+#define MJ_dump(state)					ExecMergeTupleDump(state); fflush(stdout)
 #define MJ_DEBUG_COMPARE(res) \
   MJ1_printf("  MJCompare() returns %d\n", (res))
 #define MJ_DEBUG_QUAL(clause, res) \
diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h
index c830f141b1..ec23870260 100644
--- a/src/include/nodes/execnodes.h
+++ b/src/include/nodes/execnodes.h
@@ -1762,6 +1762,17 @@ typedef struct MergeJoinState
 	TupleTableSlot *mj_NullInnerTupleSlot;
 	ExprContext *mj_OuterEContext;
 	ExprContext *mj_InnerEContext;
+
+	/* needed by temporal normalization */
+	bool				 mj_markSet;
+	bool				 mj_isNormalizer;
+	bool				 sameleft;
+	bool				*nullMask;		/* See heap_modify_tuple */
+	bool				*tsteMask;		/* See heap_modify_tuple */
+	Datum				*newValues;		/* tuple values that get updated */
+	Datum				 sweepline;
+	Form_pg_attribute	 datumFormat;	/* Datum format of sweepline, P1, P2 */
+	TupleTableSlot 		*prev;
 } MergeJoinState;
 
 /* ----------------
diff --git a/src/include/nodes/nodes.h b/src/include/nodes/nodes.h
index 697d3d7a5f..e784a2f5ed 100644
--- a/src/include/nodes/nodes.h
+++ b/src/include/nodes/nodes.h
@@ -84,6 +84,7 @@ typedef enum NodeTag
 	T_SetOp,
 	T_LockRows,
 	T_Limit,
+	T_TemporalAdjustment,
 	/* these aren't subclasses of Plan: */
 	T_NestLoopParam,
 	T_PlanRowMark,
@@ -140,6 +141,7 @@ typedef enum NodeTag
 	T_SetOpState,
 	T_LockRowsState,
 	T_LimitState,
+	T_TemporalAdjustmentState,
 
 	/*
 	 * TAGS FOR PRIMITIVE NODES (primnodes.h)
@@ -256,6 +258,7 @@ typedef enum NodeTag
 	T_LockRowsPath,
 	T_ModifyTablePath,
 	T_LimitPath,
+	T_TemporalAdjustmentPath,
 	/* these aren't subclasses of Path: */
 	T_EquivalenceClass,
 	T_EquivalenceMember,
@@ -475,6 +478,7 @@ typedef enum NodeTag
 	T_PartitionRangeDatum,
 	T_PartitionCmd,
 	T_VacuumRelation,
+	T_TemporalClause,
 
 	/*
 	 * TAGS FOR REPLICATION GRAMMAR PARSE NODES (replnodes.h)
@@ -706,7 +710,12 @@ typedef enum JoinType
 	 * by the executor (nor, indeed, by most of the planner).
 	 */
 	JOIN_UNIQUE_OUTER,			/* LHS path must be made unique */
-	JOIN_UNIQUE_INNER			/* RHS path must be made unique */
+	JOIN_UNIQUE_INNER,			/* RHS path must be made unique */
+
+	/*
+	 * Temporal adjustment primitives
+	 */
+	JOIN_TEMPORAL_NORMALIZE
 
 	/*
 	 * We might need additional join types someday.
diff --git a/src/include/nodes/primnodes.h b/src/include/nodes/primnodes.h
index 1b4b0d75af..41cc8e395d 100644
--- a/src/include/nodes/primnodes.h
+++ b/src/include/nodes/primnodes.h
@@ -501,6 +501,7 @@ typedef struct OpExpr
 	Oid			inputcollid;	/* OID of collation that operator should use */
 	List	   *args;			/* arguments to the operator (1 or 2) */
 	int			location;		/* token location, or -1 if unknown */
+	bool		isnormalize;
 } OpExpr;
 
 /*
@@ -1461,6 +1462,8 @@ typedef struct JoinExpr
 	Node	   *quals;			/* qualifiers on join, if any */
 	Alias	   *alias;			/* user-written alias clause, if any */
 	int			rtindex;		/* RT index assigned for join, or 0 */
+	List	   *temporalBounds; /* columns that form bounds for both subtrees,
+								 * used by temporal adjustment primitives */
 } JoinExpr;
 
 /*----------
diff --git a/src/include/nodes/relation.h b/src/include/nodes/relation.h
index adb4265047..de65ecf19a 100644
--- a/src/include/nodes/relation.h
+++ b/src/include/nodes/relation.h
@@ -1945,6 +1945,7 @@ typedef struct RestrictInfo
 
 	/* valid if clause is hashjoinable, else InvalidOid: */
 	Oid			hashjoinoperator;	/* copy of clause operator */
+	bool        temp_normalizer;
 
 	/* cache space for hashclause processing; -1 if not yet set */
 	Selectivity left_bucketsize;	/* avg bucketsize of left side */
diff --git a/src/include/parser/kwlist.h b/src/include/parser/kwlist.h
index 23db40147b..872dd0f0f9 100644
--- a/src/include/parser/kwlist.h
+++ b/src/include/parser/kwlist.h
@@ -262,6 +262,7 @@ PG_KEYWORD("new", NEW, UNRESERVED_KEYWORD)
 PG_KEYWORD("next", NEXT, UNRESERVED_KEYWORD)
 PG_KEYWORD("no", NO, UNRESERVED_KEYWORD)
 PG_KEYWORD("none", NONE, COL_NAME_KEYWORD)
+PG_KEYWORD("normalize", NORMALIZE, RESERVED_KEYWORD)
 PG_KEYWORD("not", NOT, RESERVED_KEYWORD)
 PG_KEYWORD("nothing", NOTHING, UNRESERVED_KEYWORD)
 PG_KEYWORD("notify", NOTIFY, UNRESERVED_KEYWORD)

Reply via email to