Hi hackers,

I'm working on a patch that allows to combine covering and unique functionality for btree indexes.

_Previous discussion was here:_
1) Proposal thread <http://www.postgresql.org/message-id/55f2ccd0.7040...@postgrespro.ru> 2) Message with proposal clarification <http://www.postgresql.org/message-id/55f84df4.5030...@postgrespro.ru>

In a nutshell, the feature allows to create index with "key" columns and "included" columns. "key" columns can be used as scan keys. Unique constraint relates only to "key" columns.
"included" columns may be used as scan keys if they have suitable opclass.
Both "key" and "included" columns can be returned from index by IndexOnlyScan.

Btree is the default index and it's used everywhere. So it requires properly testing. Volunteers are welcome)

_Use case:_
- We have a table (c1, c2, c3, c4);
- We need to have an unique index on (c1, c2).
- We would like to have a covering index on all columns to avoid reading of heap pages.

Old way:
CREATE UNIQUE INDEX olduniqueidx ON oldt USING btree (c1, c2);
CREATE INDEX oldcoveringidx ON oldt USING btree (c1, c2, c3, c4);

What's wrong?
Two indexes contain repeated data. Overhead to data manipulation operations and database size.

New way:
CREATE UNIQUE INDEX newidx ON newt USING btree (c1, c2) INCLUDING (c3, c4);

The patch is attached.
In 'test.sql' you can find a test with detailed comments on each step, and comparison of old and new indexes.

New feature has following syntax:
CREATE UNIQUE INDEX newidx ON newt USING btree (c1, c2) INCLUDING (c3, c4);
Keyword INCLUDING defines the "included" columns of index. These columns aren't concern to unique constraint. Also, them are not stored in index inner pages. It allows to decrease index size.

_Results:_
1) Additional covering index is not required anymore.
2) New index can use IndexOnlyScan on queries, where old index can't.

For example,
explain analyze select c1, c2 from newt where c1<10000 and c3<20;

*more examples in 'test.sql'

_Future work:_
To do opclasses for "included" columns optional.

CREATE TABLE tbl (c1 int, c4 box);
CREATE UNIQUE INDEX idx ON tbl USING btree (c1) INCLUDING (c4);

If we don't need c4 as an index scankey, we don't need any btree opclass on it.
But we still want to have it in covering index for queries like

SELECT c4 FROM tbl WHERE c1=1000;
SELECT * FROM tbl WHERE c1=1000;

--
Anastasia Lubennikova
Postgres Professional:http://www.postgrespro.com
The Russian Postgres Company



Attachment: test.sql
Description: application/sql

diff --git a/src/backend/access/common/indextuple.c b/src/backend/access/common/indextuple.c
index dc588d7..83f24c3 100644
--- a/src/backend/access/common/indextuple.c
+++ b/src/backend/access/common/indextuple.c
@@ -19,6 +19,7 @@
 #include "access/heapam.h"
 #include "access/itup.h"
 #include "access/tuptoaster.h"
+#include "utils/rel.h"
 
 
 /* ----------------------------------------------------------------
@@ -441,3 +442,30 @@ CopyIndexTuple(IndexTuple source)
 	memcpy(result, source, size);
 	return result;
 }
+
+/*
+ * Reform index tuple. Truncate nonkey (INCLUDED) attributes.
+ */
+IndexTuple
+index_reform_tuple(Relation idxrel, IndexTuple olditup, int natts, int nkeyatts)
+{
+	TupleDesc   itupdesc = RelationGetDescr(idxrel);
+	Datum       values[INDEX_MAX_KEYS];
+	bool        isnull[INDEX_MAX_KEYS];
+	IndexTuple	newitup;
+
+	Assert(natts <= INDEX_MAX_KEYS);
+	Assert(nkeyatts > 0);
+	Assert(nkeyatts <= natts);
+
+	index_deform_tuple(olditup, itupdesc, values, isnull);
+
+	/* form new tuple that will contain only key attributes */
+	itupdesc->natts = nkeyatts;
+	newitup = index_form_tuple(itupdesc, values, isnull);
+	newitup->t_tid = olditup->t_tid;
+
+	itupdesc->natts = natts;
+
+	return newitup;
+}
diff --git a/src/backend/access/nbtree/nbtinsert.c b/src/backend/access/nbtree/nbtinsert.c
index 77c2fdf..d14df12 100644
--- a/src/backend/access/nbtree/nbtinsert.c
+++ b/src/backend/access/nbtree/nbtinsert.c
@@ -108,18 +108,23 @@ _bt_doinsert(Relation rel, IndexTuple itup,
 			 IndexUniqueCheck checkUnique, Relation heapRel)
 {
 	bool		is_unique = false;
-	int			natts = rel->rd_rel->relnatts;
+	int			nkeyatts = rel->rd_rel->relnatts;
 	ScanKey		itup_scankey;
 	BTStack		stack;
 	Buffer		buf;
 	OffsetNumber offset;
 
+	Assert (rel->rd_index != NULL);
+	Assert(rel->rd_index->indnatts != 0);
+	Assert(rel->rd_index->indnkeyatts != 0);
+	nkeyatts = IndexRelationGetNumberOfKeyAttributes(rel);
+
 	/* we need an insertion scan key to do our search, so build one */
 	itup_scankey = _bt_mkscankey(rel, itup);
 
 top:
 	/* find the first page containing this key */
-	stack = _bt_search(rel, natts, itup_scankey, false, &buf, BT_WRITE);
+	stack = _bt_search(rel, nkeyatts, itup_scankey, false, &buf, BT_WRITE);
 
 	offset = InvalidOffsetNumber;
 
@@ -134,7 +139,7 @@ top:
 	 * move right in the tree.  See Lehman and Yao for an excruciatingly
 	 * precise description.
 	 */
-	buf = _bt_moveright(rel, buf, natts, itup_scankey, false,
+	buf = _bt_moveright(rel, buf, nkeyatts, itup_scankey, false,
 						true, stack, BT_WRITE);
 
 	/*
@@ -163,7 +168,7 @@ top:
 		TransactionId xwait;
 		uint32		speculativeToken;
 
-		offset = _bt_binsrch(rel, buf, natts, itup_scankey, false);
+		offset = _bt_binsrch(rel, buf, nkeyatts, itup_scankey, false);
 		xwait = _bt_check_unique(rel, itup, heapRel, buf, offset, itup_scankey,
 								 checkUnique, &is_unique, &speculativeToken);
 
@@ -199,7 +204,7 @@ top:
 		 */
 		CheckForSerializableConflictIn(rel, NULL, buf);
 		/* do the insertion */
-		_bt_findinsertloc(rel, &buf, &offset, natts, itup_scankey, itup,
+		_bt_findinsertloc(rel, &buf, &offset, nkeyatts, itup_scankey, itup,
 						  stack, heapRel);
 		_bt_insertonpg(rel, buf, InvalidBuffer, stack, itup, offset, false);
 	}
@@ -242,7 +247,12 @@ _bt_check_unique(Relation rel, IndexTuple itup, Relation heapRel,
 				 uint32 *speculativeToken)
 {
 	TupleDesc	itupdesc = RelationGetDescr(rel);
-	int			natts = rel->rd_rel->relnatts;
+	int			nkeyatts = rel->rd_index->indnkeyatts;
+
+	Assert (rel->rd_index != NULL);
+	Assert(rel->rd_index->indnatts != 0);
+	Assert(rel->rd_index->indnkeyatts != 0);
+
 	SnapshotData SnapshotDirty;
 	OffsetNumber maxoff;
 	Page		page;
@@ -301,7 +311,7 @@ _bt_check_unique(Relation rel, IndexTuple itup, Relation heapRel,
 				 * in real comparison, but only for ordering/finding items on
 				 * pages. - vadim 03/24/97
 				 */
-				if (!_bt_isequal(itupdesc, page, offset, natts, itup_scankey))
+				if (!_bt_isequal(itupdesc, page, offset, nkeyatts, itup_scankey))
 					break;		/* we're past all the equal tuples */
 
 				/* okay, we gotta fetch the heap tuple ... */
@@ -457,7 +467,7 @@ _bt_check_unique(Relation rel, IndexTuple itup, Relation heapRel,
 			if (P_RIGHTMOST(opaque))
 				break;
 			if (!_bt_isequal(itupdesc, page, P_HIKEY,
-							 natts, itup_scankey))
+							 nkeyatts, itup_scankey))
 				break;
 			/* Advance to next non-dead page --- there must be one */
 			for (;;)
@@ -745,6 +755,11 @@ _bt_insertonpg(Relation rel,
 		elog(ERROR, "cannot insert to incompletely split page %u",
 			 BufferGetBlockNumber(buf));
 
+	/* Truncate nonkey attributes when inserting on nonleaf pages. */
+	if (rel->rd_index->indnatts != rel->rd_index->indnkeyatts)
+		if (!P_ISLEAF(lpageop))
+			itup = index_reform_tuple(rel, itup, rel->rd_index->indnatts, rel->rd_index->indnkeyatts);
+
 	itemsz = IndexTupleDSize(*itup);
 	itemsz = MAXALIGN(itemsz);	/* be safe, PageAddItem will do this but we
 								 * need to be consistent */
@@ -1962,6 +1977,7 @@ _bt_newroot(Relation rel, Buffer lbuf, Buffer rbuf)
 	right_item_sz = ItemIdGetLength(itemid);
 	item = (IndexTuple) PageGetItem(lpage, itemid);
 	right_item = CopyIndexTuple(item);
+	right_item = index_reform_tuple(rel, right_item, rel->rd_index->indnatts, rel->rd_index->indnkeyatts);
 	ItemPointerSet(&(right_item->t_tid), rbkno, P_HIKEY);
 
 	/* NO EREPORT(ERROR) from here till newroot op is logged */
diff --git a/src/backend/access/nbtree/nbtpage.c b/src/backend/access/nbtree/nbtpage.c
index 6e65db9..73560b7 100644
--- a/src/backend/access/nbtree/nbtpage.c
+++ b/src/backend/access/nbtree/nbtpage.c
@@ -1254,7 +1254,7 @@ _bt_pagedel(Relation rel, Buffer buf)
 				/* we need an insertion scan key for the search, so build one */
 				itup_scankey = _bt_mkscankey(rel, targetkey);
 				/* find the leftmost leaf page containing this key */
-				stack = _bt_search(rel, rel->rd_rel->relnatts, itup_scankey,
+				stack = _bt_search(rel, IndexRelationGetNumberOfKeyAttributes(rel), itup_scankey,
 								   false, &lbuf, BT_READ);
 				/* don't need a pin on the page */
 				_bt_relbuf(rel, lbuf);
diff --git a/src/backend/access/nbtree/nbtsort.c b/src/backend/access/nbtree/nbtsort.c
index f95f67a..c3501e7 100644
--- a/src/backend/access/nbtree/nbtsort.c
+++ b/src/backend/access/nbtree/nbtsort.c
@@ -593,6 +593,19 @@ _bt_buildadd(BTWriteState *wstate, BTPageState *state, IndexTuple itup)
 		state->btps_minkey = CopyIndexTuple(itup);
 	}
 
+	/* Truncate nonkey attributes when inserting on nonleaf pages */
+	if (wstate->index->rd_index->indnatts != wstate->index->rd_index->indnkeyatts)
+	{
+		BTPageOpaque pageop = (BTPageOpaque) PageGetSpecialPointer(npage);
+
+		if (!P_ISLEAF(pageop))
+		{
+			itup = index_reform_tuple(wstate->index, itup, wstate->index->rd_index->indnatts, wstate->index->rd_index->indnkeyatts);
+			itupsz = IndexTupleDSize(*itup);
+			itupsz = MAXALIGN(itupsz);
+		}
+	}
+
 	/*
 	 * Add the new item into the current page.
 	 */
diff --git a/src/backend/access/nbtree/nbtutils.c b/src/backend/access/nbtree/nbtutils.c
index 91331ba..6932289 100644
--- a/src/backend/access/nbtree/nbtutils.c
+++ b/src/backend/access/nbtree/nbtutils.c
@@ -63,17 +63,24 @@ _bt_mkscankey(Relation rel, IndexTuple itup)
 {
 	ScanKey		skey;
 	TupleDesc	itupdesc;
-	int			natts;
-	int16	   *indoption;
+	int			nkeyatts = rel->rd_rel->relnatts;
+	int16	   *indoption = rel->rd_indoption;
 	int			i;
-
 	itupdesc = RelationGetDescr(rel);
-	natts = RelationGetNumberOfAttributes(rel);
-	indoption = rel->rd_indoption;
 
-	skey = (ScanKey) palloc(natts * sizeof(ScanKeyData));
+	Assert(rel->rd_index != NULL);
+	Assert(rel->rd_index->indnkeyatts != 0);
+	Assert(rel->rd_index->indnkeyatts <= rel->rd_index->indnatts);
 
-	for (i = 0; i < natts; i++)
+	nkeyatts = rel->rd_index->indnkeyatts;
+
+	/*
+	 * We'll execute search using ScanKey constructed on key columns.
+	 * Non key (included) columns must be omitted.
+	 */
+	skey = (ScanKey) palloc(nkeyatts * sizeof(ScanKeyData));
+
+	for (i = 0; i < nkeyatts; i++)
 	{
 		FmgrInfo   *procinfo;
 		Datum		arg;
diff --git a/src/backend/bootstrap/bootparse.y b/src/backend/bootstrap/bootparse.y
index d8d1b06..002bcd5 100644
--- a/src/backend/bootstrap/bootparse.y
+++ b/src/backend/bootstrap/bootparse.y
@@ -293,6 +293,7 @@ Boot_DeclareIndexStmt:
 					stmt->accessMethod = $8;
 					stmt->tableSpace = NULL;
 					stmt->indexParams = $10;
+					stmt->indexIncludingParams = NIL;
 					stmt->options = NIL;
 					stmt->whereClause = NULL;
 					stmt->excludeOpNames = NIL;
@@ -336,6 +337,7 @@ Boot_DeclareUniqueIndexStmt:
 					stmt->accessMethod = $9;
 					stmt->tableSpace = NULL;
 					stmt->indexParams = $11;
+					stmt->indexIncludingParams = NIL;
 					stmt->options = NIL;
 					stmt->whereClause = NULL;
 					stmt->excludeOpNames = NIL;
diff --git a/src/backend/catalog/index.c b/src/backend/catalog/index.c
index e59b163..27a12a0 100644
--- a/src/backend/catalog/index.c
+++ b/src/backend/catalog/index.c
@@ -211,7 +211,7 @@ index_check_primary_key(Relation heapRel,
 	 * null, otherwise attempt to ALTER TABLE .. SET NOT NULL
 	 */
 	cmds = NIL;
-	for (i = 0; i < indexInfo->ii_NumIndexAttrs; i++)
+	for (i = 0; i < indexInfo->ii_NumIndexKeyAttrs; i++)
 	{
 		AttrNumber	attnum = indexInfo->ii_KeyAttrNumbers[i];
 		HeapTuple	atttuple;
@@ -606,6 +606,7 @@ UpdateIndexRelation(Oid indexoid,
 	values[Anum_pg_index_indexrelid - 1] = ObjectIdGetDatum(indexoid);
 	values[Anum_pg_index_indrelid - 1] = ObjectIdGetDatum(heapoid);
 	values[Anum_pg_index_indnatts - 1] = Int16GetDatum(indexInfo->ii_NumIndexAttrs);
+	values[Anum_pg_index_indnkeyatts - 1] = Int16GetDatum(indexInfo->ii_NumIndexKeyAttrs);
 	values[Anum_pg_index_indisunique - 1] = BoolGetDatum(indexInfo->ii_Unique);
 	values[Anum_pg_index_indisprimary - 1] = BoolGetDatum(primary);
 	values[Anum_pg_index_indisexclusion - 1] = BoolGetDatum(isexclusion);
@@ -1069,6 +1070,8 @@ index_create(Relation heapRelation,
 	else
 		Assert(indexRelation->rd_indexcxt != NULL);
 
+	indexRelation->rd_index->indnkeyatts = indexInfo->ii_NumIndexKeyAttrs;
+
 	/*
 	 * If this is bootstrap (initdb) time, then we don't actually fill in the
 	 * index yet.  We'll be creating more indexes and classes later, so we
@@ -1189,7 +1192,7 @@ index_constraint_create(Relation heapRelation,
 								   true,
 								   RelationGetRelid(heapRelation),
 								   indexInfo->ii_KeyAttrNumbers,
-								   indexInfo->ii_NumIndexAttrs,
+								   indexInfo->ii_NumIndexKeyAttrs,
 								   InvalidOid,	/* no domain */
 								   indexRelationId,		/* index OID */
 								   InvalidOid,	/* no foreign key */
@@ -1637,6 +1640,10 @@ BuildIndexInfo(Relation index)
 		elog(ERROR, "invalid indnatts %d for index %u",
 			 numKeys, RelationGetRelid(index));
 	ii->ii_NumIndexAttrs = numKeys;
+	ii->ii_NumIndexKeyAttrs = indexStruct->indnkeyatts;
+	Assert(ii->ii_NumIndexKeyAttrs != 0);
+	Assert(ii->ii_NumIndexKeyAttrs <= ii->ii_NumIndexAttrs);
+
 	for (i = 0; i < numKeys; i++)
 		ii->ii_KeyAttrNumbers[i] = indexStruct->indkey.values[i];
 
diff --git a/src/backend/catalog/indexing.c b/src/backend/catalog/indexing.c
index 0231084..89e20fa 100644
--- a/src/backend/catalog/indexing.c
+++ b/src/backend/catalog/indexing.c
@@ -119,6 +119,7 @@ CatalogIndexInsert(CatalogIndexState indstate, HeapTuple heapTuple)
 		Assert(indexInfo->ii_Predicate == NIL);
 		Assert(indexInfo->ii_ExclusionOps == NULL);
 		Assert(relationDescs[i]->rd_index->indimmediate);
+		Assert(indexInfo->ii_NumIndexKeyAttrs != 0);
 
 		/*
 		 * FormIndexDatum fills in its values and isnull parameters with the
diff --git a/src/backend/catalog/toasting.c b/src/backend/catalog/toasting.c
index 3652d7b..fc47a37 100644
--- a/src/backend/catalog/toasting.c
+++ b/src/backend/catalog/toasting.c
@@ -313,6 +313,7 @@ create_toast_table(Relation rel, Oid toastOid, Oid toastIndexOid,
 
 	indexInfo = makeNode(IndexInfo);
 	indexInfo->ii_NumIndexAttrs = 2;
+	indexInfo->ii_NumIndexKeyAttrs = 2;
 	indexInfo->ii_KeyAttrNumbers[0] = 1;
 	indexInfo->ii_KeyAttrNumbers[1] = 2;
 	indexInfo->ii_Expressions = NIL;
diff --git a/src/backend/commands/indexcmds.c b/src/backend/commands/indexcmds.c
index b450bcf..8b25aea 100644
--- a/src/backend/commands/indexcmds.c
+++ b/src/backend/commands/indexcmds.c
@@ -137,7 +137,6 @@ CheckIndexCompatible(Oid oldId,
 	Relation	irel;
 	int			i;
 	Datum		d;
-
 	/* Caller should already have the relation locked in some way. */
 	relationId = IndexGetRelation(oldId, false);
 
@@ -208,7 +207,7 @@ CheckIndexCompatible(Oid oldId,
 	}
 
 	/* Any change in operator class or collation breaks compatibility. */
-	old_natts = indexForm->indnatts;
+	old_natts = indexForm->indnkeyatts;
 	Assert(old_natts == numberOfAttributes);
 
 	d = SysCacheGetAttr(INDEXRELID, tuple, Anum_pg_index_indcollation, &isnull);
@@ -320,7 +319,8 @@ DefineIndex(Oid relationId,
 	Datum		reloptions;
 	int16	   *coloptions;
 	IndexInfo  *indexInfo;
-	int			numberOfAttributes;
+	int			numberOfAttributes,
+				numberOfKeyAttributes;
 	TransactionId limitXmin;
 	VirtualTransactionId *old_snapshots;
 	ObjectAddress address;
@@ -331,10 +331,27 @@ DefineIndex(Oid relationId,
 	Snapshot	snapshot;
 	int			i;
 
+	if(list_intersection(stmt->indexParams, stmt->indexIncludingParams) != NIL)
+		ereport(ERROR,
+				(errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
+				 errmsg("included columns must not intersects with key columns")));
+
 	/*
 	 * count attributes in index
 	 */
+	numberOfKeyAttributes = list_length(stmt->indexParams);
+	if (numberOfKeyAttributes <= 0)
+		ereport(ERROR,
+				(errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
+				 errmsg("must specify at least one key column")));
+
+	/*
+	 * All information about key and included cols is in numberOfKeyAttributes number.
+	 * So we can concat all index params into one list.
+	 */
+	stmt->indexParams = list_concat(stmt->indexParams, stmt->indexIncludingParams);
 	numberOfAttributes = list_length(stmt->indexParams);
+
 	if (numberOfAttributes <= 0)
 		ereport(ERROR,
 				(errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
@@ -448,6 +465,12 @@ DefineIndex(Oid relationId,
 	/*
 	 * Choose the index column names.
 	 */
+	/*
+	 * TODO Alter ChooseIndexName function.
+	 * Add info about included cols into index_name.
+	 * Now there's no way to distinguish insex with included cols
+	 * and index without them.
+	 */
 	indexColNames = ChooseIndexColumnNames(stmt->indexParams);
 
 	/*
@@ -466,6 +489,7 @@ DefineIndex(Oid relationId,
 	 * look up the access method, verify it can handle the requested features
 	 */
 	accessMethodName = stmt->accessMethod;
+
 	tuple = SearchSysCache1(AMNAME, PointerGetDatum(accessMethodName));
 	if (!HeapTupleIsValid(tuple))
 	{
@@ -511,6 +535,12 @@ DefineIndex(Oid relationId,
 		errmsg("access method \"%s\" does not support exclusion constraints",
 			   accessMethodName)));
 
+	if (list_length(stmt->indexIncludingParams) > 0 && !accessMethodForm->amcanincluding)
+		ereport(ERROR,
+		(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+		errmsg("access method \"%s\" does not support included columns",
+				accessMethodName)));
+
 	amcanorder = accessMethodForm->amcanorder;
 	amoptions = accessMethodForm->amoptions;
 
@@ -536,6 +566,7 @@ DefineIndex(Oid relationId,
 	 */
 	indexInfo = makeNode(IndexInfo);
 	indexInfo->ii_NumIndexAttrs = numberOfAttributes;
+	indexInfo->ii_NumIndexKeyAttrs = numberOfKeyAttributes;
 	indexInfo->ii_Expressions = NIL;	/* for now */
 	indexInfo->ii_ExpressionsState = NIL;
 	indexInfo->ii_Predicate = make_ands_implicit((Expr *) stmt->whereClause);
diff --git a/src/backend/executor/execIndexing.c b/src/backend/executor/execIndexing.c
index f42bd8f..fd55eb4 100644
--- a/src/backend/executor/execIndexing.c
+++ b/src/backend/executor/execIndexing.c
@@ -646,7 +646,7 @@ check_exclusion_or_unique_constraint(Relation heap, Relation index,
 	Oid		   *constr_procs;
 	uint16	   *constr_strats;
 	Oid		   *index_collations = index->rd_indcollation;
-	int			index_natts = index->rd_index->indnatts;
+	int			index_nkeyatts = index->rd_index->indnkeyatts;
 	IndexScanDesc index_scan;
 	HeapTuple	tup;
 	ScanKeyData scankeys[INDEX_MAX_KEYS];
@@ -673,7 +673,7 @@ check_exclusion_or_unique_constraint(Relation heap, Relation index,
 	 * If any of the input values are NULL, the constraint check is assumed to
 	 * pass (i.e., we assume the operators are strict).
 	 */
-	for (i = 0; i < index_natts; i++)
+	for (i = 0; i < index_nkeyatts; i++)
 	{
 		if (isnull[i])
 			return true;
@@ -685,7 +685,7 @@ check_exclusion_or_unique_constraint(Relation heap, Relation index,
 	 */
 	InitDirtySnapshot(DirtySnapshot);
 
-	for (i = 0; i < index_natts; i++)
+	for (i = 0; i < index_nkeyatts; i++)
 	{
 		ScanKeyEntryInitialize(&scankeys[i],
 							   0,
@@ -717,8 +717,8 @@ check_exclusion_or_unique_constraint(Relation heap, Relation index,
 retry:
 	conflict = false;
 	found_self = false;
-	index_scan = index_beginscan(heap, index, &DirtySnapshot, index_natts, 0);
-	index_rescan(index_scan, scankeys, index_natts, NULL, 0);
+	index_scan = index_beginscan(heap, index, &DirtySnapshot, index_nkeyatts, 0);
+	index_rescan(index_scan, scankeys, index_nkeyatts, NULL, 0);
 
 	while ((tup = index_getnext(index_scan,
 								ForwardScanDirection)) != NULL)
diff --git a/src/backend/nodes/copyfuncs.c b/src/backend/nodes/copyfuncs.c
index bd2e80e..8febcab 100644
--- a/src/backend/nodes/copyfuncs.c
+++ b/src/backend/nodes/copyfuncs.c
@@ -3096,6 +3096,7 @@ _copyIndexStmt(const IndexStmt *from)
 	COPY_STRING_FIELD(accessMethod);
 	COPY_STRING_FIELD(tableSpace);
 	COPY_NODE_FIELD(indexParams);
+	COPY_NODE_FIELD(indexIncludingParams);
 	COPY_NODE_FIELD(options);
 	COPY_NODE_FIELD(whereClause);
 	COPY_NODE_FIELD(excludeOpNames);
diff --git a/src/backend/nodes/equalfuncs.c b/src/backend/nodes/equalfuncs.c
index 19412fe..20773ab 100644
--- a/src/backend/nodes/equalfuncs.c
+++ b/src/backend/nodes/equalfuncs.c
@@ -1238,6 +1238,7 @@ _equalIndexStmt(const IndexStmt *a, const IndexStmt *b)
 	COMPARE_STRING_FIELD(accessMethod);
 	COMPARE_STRING_FIELD(tableSpace);
 	COMPARE_NODE_FIELD(indexParams);
+	COMPARE_NODE_FIELD(indexIncludingParams);
 	COMPARE_NODE_FIELD(options);
 	COMPARE_NODE_FIELD(whereClause);
 	COMPARE_NODE_FIELD(excludeOpNames);
diff --git a/src/backend/nodes/outfuncs.c b/src/backend/nodes/outfuncs.c
index a878498..97de06a 100644
--- a/src/backend/nodes/outfuncs.c
+++ b/src/backend/nodes/outfuncs.c
@@ -2144,6 +2144,7 @@ _outIndexStmt(StringInfo str, const IndexStmt *node)
 	WRITE_STRING_FIELD(accessMethod);
 	WRITE_STRING_FIELD(tableSpace);
 	WRITE_NODE_FIELD(indexParams);
+	WRITE_NODE_FIELD(indexIncludingParams);
 	WRITE_NODE_FIELD(options);
 	WRITE_NODE_FIELD(whereClause);
 	WRITE_NODE_FIELD(excludeOpNames);
diff --git a/src/backend/optimizer/util/plancat.c b/src/backend/optimizer/util/plancat.c
index 9442e5f..49c3330 100644
--- a/src/backend/optimizer/util/plancat.c
+++ b/src/backend/optimizer/util/plancat.c
@@ -164,7 +164,7 @@ get_relation_info(PlannerInfo *root, Oid relationObjectId, bool inhparent,
 			Relation	indexRelation;
 			Form_pg_index index;
 			IndexOptInfo *info;
-			int			ncolumns;
+			int			ncolumns, nkeycolumns;
 			int			i;
 
 			/*
@@ -207,12 +207,31 @@ get_relation_info(PlannerInfo *root, Oid relationObjectId, bool inhparent,
 				RelationGetForm(indexRelation)->reltablespace;
 			info->rel = rel;
 			info->ncolumns = ncolumns = index->indnatts;
+			info->nkeycolumns = nkeycolumns = index->indnkeyatts;
+
+			/* TODO
+			 * All these arrays still has length = ncolumns.
+			 * Fix, when optional opclass functionality will be added.
+			 */
 			info->indexkeys = (int *) palloc(sizeof(int) * ncolumns);
 			info->indexcollations = (Oid *) palloc(sizeof(Oid) * ncolumns);
 			info->opfamily = (Oid *) palloc(sizeof(Oid) * ncolumns);
 			info->opcintype = (Oid *) palloc(sizeof(Oid) * ncolumns);
 			info->canreturn = (bool *) palloc(sizeof(bool) * ncolumns);
 
+			/* TODO
+			 * Any column could be returned.
+			 * Even if it doesn't have opclass for that type of index.
+			 *
+			 * For example,
+			 * we have an index "create index on tbl(c1) including c2".
+			 * If there's no suitable oplass on c2
+			 * query "select c2 from tbl where c1 < 10" will use index-only scan
+			 * and query "select c2 from tbl where c2 < 10" will not.
+			 *
+			 * But now every column must have an opclass.
+			 * Add recheck and so on later.
+			 */
 			for (i = 0; i < ncolumns; i++)
 			{
 				info->indexkeys[i] = index->indkey.values[i];
diff --git a/src/backend/parser/gram.y b/src/backend/parser/gram.y
index 1efc6d6..ff82428 100644
--- a/src/backend/parser/gram.y
+++ b/src/backend/parser/gram.y
@@ -354,6 +354,7 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query);
 				oper_argtypes RuleActionList RuleActionMulti
 				opt_column_list columnList opt_name_list
 				sort_clause opt_sort_clause sortby_list index_params
+				optincluding opt_including index_including_params
 				name_list role_list from_clause from_list opt_array_bounds
 				qualified_name_list any_name any_name_list type_name_list
 				any_operator expr_list attrs
@@ -6591,7 +6592,7 @@ defacl_privilege_target:
 
 IndexStmt:	CREATE opt_unique INDEX opt_concurrently opt_index_name
 			ON qualified_name access_method_clause '(' index_params ')'
-			opt_reloptions OptTableSpace where_clause
+			opt_including opt_reloptions OptTableSpace where_clause
 				{
 					IndexStmt *n = makeNode(IndexStmt);
 					n->unique = $2;
@@ -6600,9 +6601,10 @@ IndexStmt:	CREATE opt_unique INDEX opt_concurrently opt_index_name
 					n->relation = $7;
 					n->accessMethod = $8;
 					n->indexParams = $10;
-					n->options = $12;
-					n->tableSpace = $13;
-					n->whereClause = $14;
+					n->indexIncludingParams = $12;
+					n->options = $13;
+					n->tableSpace = $14;
+					n->whereClause = $15;
 					n->excludeOpNames = NIL;
 					n->idxcomment = NULL;
 					n->indexOid = InvalidOid;
@@ -6707,6 +6709,16 @@ index_elem:	ColId opt_collate opt_class opt_asc_desc opt_nulls_order
 				}
 		;
 
+optincluding : '(' index_including_params ')'		{ $$ = $2; }
+		;
+opt_including:		INCLUDING optincluding			{ $$ = $2; }
+			 |		/* EMPTY */						{ $$ = NIL; }
+		;
+
+index_including_params:	index_elem						{ $$ = list_make1($1); }
+			| index_including_params ',' index_elem		{ $$ = lappend($1, $3); }
+		;
+
 opt_collate: COLLATE any_name						{ $$ = $2; }
 			| /*EMPTY*/								{ $$ = NIL; }
 		;
diff --git a/src/backend/utils/adt/ruleutils.c b/src/backend/utils/adt/ruleutils.c
index 51391f6..a32bb5c 100644
--- a/src/backend/utils/adt/ruleutils.c
+++ b/src/backend/utils/adt/ruleutils.c
@@ -1115,7 +1115,7 @@ pg_get_indexdef_worker(Oid indexrelid, int colno,
 	 * Report the indexed attributes
 	 */
 	sep = "";
-	for (keyno = 0; keyno < idxrec->indnatts; keyno++)
+	for (keyno = 0; keyno < idxrec->indnkeyatts; keyno++)
 	{
 		AttrNumber	attnum = idxrec->indkey.values[keyno];
 		int16		opt = indoption->values[keyno];
@@ -1263,6 +1263,38 @@ pg_get_indexdef_worker(Oid indexrelid, int colno,
 		}
 	}
 
+	/*
+	 * Report the INCLUDED attributes, if any.
+	 */
+	if (idxrec->indnkeyatts < idxrec->indnatts) {
+		if (!attrsOnly) {
+			appendStringInfoString(&buf, " INCLUDING(");
+			sep = "";
+		}
+		else
+			sep = ", ";
+		for (keyno = idxrec->indnkeyatts; keyno < idxrec->indnatts; keyno++)
+		{
+			AttrNumber	attnum = idxrec->indkey.values[keyno];
+			char	   *attname;
+
+			if (!colno)
+				appendStringInfoString(&buf, sep);
+			sep = ", ";
+
+			attname = get_relid_attribute_name(indrelid, attnum);
+			if (!colno || colno == keyno + 1)
+			{
+				appendStringInfoString(&buf, quote_identifier(attname));
+				if (attrsOnly)
+					appendStringInfoString(&buf, " (included)");
+			}
+
+		}
+		if (!attrsOnly)
+			appendStringInfoString(&buf, ")");
+	}
+
 	/* Clean up */
 	ReleaseSysCache(ht_idx);
 	ReleaseSysCache(ht_idxrel);
diff --git a/src/backend/utils/cache/relcache.c b/src/backend/utils/cache/relcache.c
index 420ef3d..513fa57 100644
--- a/src/backend/utils/cache/relcache.c
+++ b/src/backend/utils/cache/relcache.c
@@ -1940,6 +1940,7 @@ RelationReloadIndexInfo(Relation relation)
 		 * it's not worth it to track exactly which ones they are.  None of
 		 * the array fields are allowed to change, though.
 		 */
+		relation->rd_index->indnkeyatts = index->indnkeyatts;
 		relation->rd_index->indisunique = index->indisunique;
 		relation->rd_index->indisprimary = index->indisprimary;
 		relation->rd_index->indisexclusion = index->indisexclusion;
diff --git a/src/include/access/itup.h b/src/include/access/itup.h
index c997545..00fdaee 100644
--- a/src/include/access/itup.h
+++ b/src/include/access/itup.h
@@ -18,6 +18,7 @@
 #include "access/tupmacs.h"
 #include "storage/bufpage.h"
 #include "storage/itemptr.h"
+#include "utils/rel.h"
 
 /*
  * Index tuple header structure
@@ -147,5 +148,7 @@ extern Datum nocache_index_getattr(IndexTuple tup, int attnum,
 extern void index_deform_tuple(IndexTuple tup, TupleDesc tupleDescriptor,
 				   Datum *values, bool *isnull);
 extern IndexTuple CopyIndexTuple(IndexTuple source);
+extern IndexTuple index_reform_tuple(Relation idxrel, IndexTuple tup,
+									 int natts, int nkeyatts);
 
 #endif   /* ITUP_H */
diff --git a/src/include/catalog/pg_am.h b/src/include/catalog/pg_am.h
index 8a28b8e..f9f0841 100644
--- a/src/include/catalog/pg_am.h
+++ b/src/include/catalog/pg_am.h
@@ -51,6 +51,7 @@ CATALOG(pg_am,2601)
 	bool		amstorage;		/* can storage type differ from column type? */
 	bool		amclusterable;	/* does AM support cluster command? */
 	bool		ampredlocks;	/* does AM handle predicate locks? */
+	bool		amcanincluding;	/* does AM support INCLUDING columns? */
 	Oid			amkeytype;		/* type of data in index, or InvalidOid */
 	regproc		aminsert;		/* "insert this tuple" function */
 	regproc		ambeginscan;	/* "prepare for index scan" function */
@@ -80,7 +81,7 @@ typedef FormData_pg_am *Form_pg_am;
  *		compiler constants for pg_am
  * ----------------
  */
-#define Natts_pg_am						30
+#define Natts_pg_am						31
 #define Anum_pg_am_amname				1
 #define Anum_pg_am_amstrategies			2
 #define Anum_pg_am_amsupport			3
@@ -95,44 +96,45 @@ typedef FormData_pg_am *Form_pg_am;
 #define Anum_pg_am_amstorage			12
 #define Anum_pg_am_amclusterable		13
 #define Anum_pg_am_ampredlocks			14
-#define Anum_pg_am_amkeytype			15
-#define Anum_pg_am_aminsert				16
-#define Anum_pg_am_ambeginscan			17
-#define Anum_pg_am_amgettuple			18
-#define Anum_pg_am_amgetbitmap			19
-#define Anum_pg_am_amrescan				20
-#define Anum_pg_am_amendscan			21
-#define Anum_pg_am_ammarkpos			22
-#define Anum_pg_am_amrestrpos			23
-#define Anum_pg_am_ambuild				24
-#define Anum_pg_am_ambuildempty			25
-#define Anum_pg_am_ambulkdelete			26
-#define Anum_pg_am_amvacuumcleanup		27
-#define Anum_pg_am_amcanreturn			28
-#define Anum_pg_am_amcostestimate		29
-#define Anum_pg_am_amoptions			30
+#define Anum_pg_am_amcanincluding		15
+#define Anum_pg_am_amkeytype			16
+#define Anum_pg_am_aminsert				17
+#define Anum_pg_am_ambeginscan			18
+#define Anum_pg_am_amgettuple			19
+#define Anum_pg_am_amgetbitmap			20
+#define Anum_pg_am_amrescan				21
+#define Anum_pg_am_amendscan			22
+#define Anum_pg_am_ammarkpos			23
+#define Anum_pg_am_amrestrpos			24
+#define Anum_pg_am_ambuild				25
+#define Anum_pg_am_ambuildempty			26
+#define Anum_pg_am_ambulkdelete			27
+#define Anum_pg_am_amvacuumcleanup		28
+#define Anum_pg_am_amcanreturn			29
+#define Anum_pg_am_amcostestimate		30
+#define Anum_pg_am_amoptions			31
 
 /* ----------------
  *		initial contents of pg_am
  * ----------------
  */
 
-DATA(insert OID = 403 (  btree		5 2 t f t t t t t t f t t 0 btinsert btbeginscan btgettuple btgetbitmap btrescan btendscan btmarkpos btrestrpos btbuild btbuildempty btbulkdelete btvacuumcleanup btcanreturn btcostestimate btoptions ));
+DATA(insert OID = 403 (  btree		5 2 t f t t t t t t f t t t 0 btinsert btbeginscan btgettuple btgetbitmap btrescan btendscan btmarkpos btrestrpos btbuild btbuildempty btbulkdelete btvacuumcleanup btcanreturn btcostestimate btoptions ));
 DESCR("b-tree index access method");
 #define BTREE_AM_OID 403
-DATA(insert OID = 405 (  hash		1 1 f f t f f f f f f f f 23 hashinsert hashbeginscan hashgettuple hashgetbitmap hashrescan hashendscan hashmarkpos hashrestrpos hashbuild hashbuildempty hashbulkdelete hashvacuumcleanup - hashcostestimate hashoptions ));
+DATA(insert OID = 405 (  hash		1 1 f f t f f f f f f f f f 23 hashinsert hashbeginscan hashgettuple hashgetbitmap hashrescan hashendscan hashmarkpos hashrestrpos hashbuild hashbuildempty hashbulkdelete hashvacuumcleanup - hashcostestimate hashoptions ));
 DESCR("hash index access method");
 #define HASH_AM_OID 405
-DATA(insert OID = 783 (  gist		0 9 f t f f t t f t t t f 0 gistinsert gistbeginscan gistgettuple gistgetbitmap gistrescan gistendscan gistmarkpos gistrestrpos gistbuild gistbuildempty gistbulkdelete gistvacuumcleanup gistcanreturn gistcostestimate gistoptions ));
+DATA(insert OID = 783 (  gist		0 9 f t f f t t f t t t f f 0 gistinsert gistbeginscan gistgettuple gistgetbitmap gistrescan gistendscan gistmarkpos gistrestrpos gistbuild gistbuildempty gistbulkdelete gistvacuumcleanup gistcanreturn gistcostestimate gistoptions ));
 DESCR("GiST index access method");
 #define GIST_AM_OID 783
-DATA(insert OID = 2742 (  gin		0 6 f f f f t t f f t f f 0 gininsert ginbeginscan - gingetbitmap ginrescan ginendscan ginmarkpos ginrestrpos ginbuild ginbuildempty ginbulkdelete ginvacuumcleanup - gincostestimate ginoptions ));
+DATA(insert OID = 2742 (  gin		0 6 f f f f t t f f t f f f 0 gininsert ginbeginscan - gingetbitmap ginrescan ginendscan ginmarkpos ginrestrpos ginbuild ginbuildempty ginbulkdelete ginvacuumcleanup - gincostestimate ginoptions ));
 DESCR("GIN index access method");
 #define GIN_AM_OID 2742
-DATA(insert OID = 4000 (  spgist	0 5 f f f f f t f t f f f 0 spginsert spgbeginscan spggettuple spggetbitmap spgrescan spgendscan spgmarkpos spgrestrpos spgbuild spgbuildempty spgbulkdelete spgvacuumcleanup spgcanreturn spgcostestimate spgoptions ));
+DATA(insert OID = 4000 (  spgist	0 5 f f f f f t f t f f f f 0 spginsert spgbeginscan spggettuple spggetbitmap spgrescan spgendscan spgmarkpos spgrestrpos spgbuild spgbuildempty spgbulkdelete spgvacuumcleanup spgcanreturn spgcostestimate spgoptions ));
 DESCR("SP-GiST index access method");
 #define SPGIST_AM_OID 4000
-DATA(insert OID = 3580 (  brin	   0 15 f f f f t t f t t f f 0 brininsert brinbeginscan - bringetbitmap brinrescan brinendscan brinmarkpos brinrestrpos brinbuild brinbuildempty brinbulkdelete brinvacuumcleanup - brincostestimate brinoptions ));
+DATA(insert OID = 3580 (  brin	   0 15 f f f f t t f t t f f f 0 brininsert brinbeginscan - bringetbitmap brinrescan brinendscan brinmarkpos brinrestrpos brinbuild brinbuildempty brinbulkdelete brinvacuumcleanup - brincostestimate brinoptions ));
 DESCR("block range index (BRIN) access method");
 #define BRIN_AM_OID 3580
 
diff --git a/src/include/catalog/pg_index.h b/src/include/catalog/pg_index.h
index 45c96e3..452e1cf 100644
--- a/src/include/catalog/pg_index.h
+++ b/src/include/catalog/pg_index.h
@@ -33,6 +33,7 @@ CATALOG(pg_index,2610) BKI_WITHOUT_OIDS BKI_SCHEMA_MACRO
 	Oid			indexrelid;		/* OID of the index */
 	Oid			indrelid;		/* OID of the relation it indexes */
 	int16		indnatts;		/* number of columns in index */
+	int16		indnkeyatts;	/* number of key columns in index */
 	bool		indisunique;	/* is this a unique index? */
 	bool		indisprimary;	/* is this index for primary key? */
 	bool		indisexclusion; /* is this index for exclusion constraint? */
@@ -70,26 +71,27 @@ typedef FormData_pg_index *Form_pg_index;
  *		compiler constants for pg_index
  * ----------------
  */
-#define Natts_pg_index					19
+#define Natts_pg_index					20
 #define Anum_pg_index_indexrelid		1
 #define Anum_pg_index_indrelid			2
 #define Anum_pg_index_indnatts			3
-#define Anum_pg_index_indisunique		4
-#define Anum_pg_index_indisprimary		5
-#define Anum_pg_index_indisexclusion	6
-#define Anum_pg_index_indimmediate		7
-#define Anum_pg_index_indisclustered	8
-#define Anum_pg_index_indisvalid		9
-#define Anum_pg_index_indcheckxmin		10
-#define Anum_pg_index_indisready		11
-#define Anum_pg_index_indislive			12
-#define Anum_pg_index_indisreplident	13
-#define Anum_pg_index_indkey			14
-#define Anum_pg_index_indcollation		15
-#define Anum_pg_index_indclass			16
-#define Anum_pg_index_indoption			17
-#define Anum_pg_index_indexprs			18
-#define Anum_pg_index_indpred			19
+#define Anum_pg_index_indnkeyatts		4
+#define Anum_pg_index_indisunique		5
+#define Anum_pg_index_indisprimary		6
+#define Anum_pg_index_indisexclusion	7
+#define Anum_pg_index_indimmediate		8
+#define Anum_pg_index_indisclustered	9
+#define Anum_pg_index_indisvalid		10
+#define Anum_pg_index_indcheckxmin		11
+#define Anum_pg_index_indisready		12
+#define Anum_pg_index_indislive			13
+#define Anum_pg_index_indisreplident	14
+#define Anum_pg_index_indkey			15
+#define Anum_pg_index_indcollation		16
+#define Anum_pg_index_indclass			17
+#define Anum_pg_index_indoption			18
+#define Anum_pg_index_indexprs			19
+#define Anum_pg_index_indpred			20
 
 /*
  * Index AMs that support ordered scans must support these two indoption
diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h
index 4ae2f3e..4bd0d6e 100644
--- a/src/include/nodes/execnodes.h
+++ b/src/include/nodes/execnodes.h
@@ -59,6 +59,7 @@ typedef struct IndexInfo
 {
 	NodeTag		type;
 	int			ii_NumIndexAttrs;
+	int			ii_NumIndexKeyAttrs;
 	AttrNumber	ii_KeyAttrNumbers[INDEX_MAX_KEYS];
 	List	   *ii_Expressions; /* list of Expr */
 	List	   *ii_ExpressionsState;	/* list of ExprState */
diff --git a/src/include/nodes/parsenodes.h b/src/include/nodes/parsenodes.h
index f0dcd2f..a0abba8 100644
--- a/src/include/nodes/parsenodes.h
+++ b/src/include/nodes/parsenodes.h
@@ -2414,6 +2414,7 @@ typedef struct IndexStmt
 	char	   *accessMethod;	/* name of access method (eg. btree) */
 	char	   *tableSpace;		/* tablespace, or NULL for default */
 	List	   *indexParams;	/* columns to index: a list of IndexElem */
+	List	   *indexIncludingParams;	/* additional columns to index: a list of IndexElem */
 	List	   *options;		/* WITH clause options: a list of DefElem */
 	Node	   *whereClause;	/* qualification (partial-index predicate) */
 	List	   *excludeOpNames; /* exclusion operator names, or NIL if none */
diff --git a/src/include/nodes/relation.h b/src/include/nodes/relation.h
index 5dc23d9..6c4e202 100644
--- a/src/include/nodes/relation.h
+++ b/src/include/nodes/relation.h
@@ -532,6 +532,7 @@ typedef struct IndexOptInfo
 
 	/* index descriptor information */
 	int			ncolumns;		/* number of columns in index */
+	int			nkeycolumns;	/* number of key columns in index */
 	int		   *indexkeys;		/* column numbers of index's keys, or 0 */
 	Oid		   *indexcollations;	/* OIDs of collations of index columns */
 	Oid		   *opfamily;		/* OIDs of operator families for columns */
diff --git a/src/include/utils/rel.h b/src/include/utils/rel.h
index 8a55a09..b0f5930 100644
--- a/src/include/utils/rel.h
+++ b/src/include/utils/rel.h
@@ -351,6 +351,12 @@ typedef struct ViewOptions
 #define RelationGetNumberOfAttributes(relation) ((relation)->rd_rel->relnatts)
 
 /*
+ * RelationGetNumberOfAttributes
+ *		Returns the number of attributes in a relation.
+ */
+#define IndexRelationGetNumberOfKeyAttributes(relation) ((relation)->rd_index->indnkeyatts)
+
+/*
  * RelationGetDescr
  *		Returns tuple descriptor for a relation.
  */

-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to