22.01.2016 01:47, David Rowley:
On 20 January 2016 at 06:08, Anastasia Lubennikova
<a.lubennik...@postgrespro.ru> wrote:


18.01.2016 01:02, David Rowley пишет:

On 14 January 2016 at 08:24, David Rowley <david.row...@2ndquadrant.com> wrote:
I will try to review the omit_opclass_4.0.patch soon.

Hi, as promised, here's my review of the omit_opclass_4.0.patch patch.

Thank you again. All mentioned points are fixed and patches are merged.
I hope it's all right now. Please check comments one more time. I rather doubt 
that I wrote everything correctly.

Thanks for updating.

+        for the searching or ordering of records can defined in the

should be:

+        for the searching or ordering of records can be defined in the

but perhaps "defined" should be "included".

The following is still quite wasteful. CopyIndexTuple() does a
palloc() and memcpy(), and then you throw that away if
rel->rd_index->indnatts != rel->rd_index->indnkeyatts. I think you
just need to add an "else" and move the CopyIndexTuple() below the if.

item = (IndexTuple) PageGetItem(lpage, itemid);
   right_item = CopyIndexTuple(item);
+ if (rel->rd_index->indnatts != rel->rd_index->indnkeyatts)
+ right_item = index_reform_tuple(rel, right_item,
rel->rd_index->indnatts, rel->rd_index->indnkeyatts);
Fixed. Thank you for reminding me.
Tom also commited
http://git.postgresql.org/gitweb/?p=postgresql.git;a=commit;h=65c5fcd353a859da9e61bfb2b92a99f12937de3b
So it looks like you'll need to update your pg_am.h changes. Looks
like you'll need a new struct member in IndexAmRoutine and just
populate that new member in each of the *handler functions listed in
pg_am.h

-#define Natts_pg_am 30
+#define Natts_pg_am 31
Done. I hope that my patch is close to the commit too.

Thank you again for review.

--
Anastasia Lubennikova
Postgres Professional: http://www.postgrespro.com
The Russian Postgres Company

diff --git a/doc/src/sgml/catalogs.sgml b/doc/src/sgml/catalogs.sgml
index 412c845..c201f8b 100644
--- a/doc/src/sgml/catalogs.sgml
+++ b/doc/src/sgml/catalogs.sgml
@@ -3515,6 +3515,14 @@
       <literal>pg_class.relnatts</literal>)</entry>
      </row>
 
+      <row>
+      <entry><structfield>indnkeyatts</structfield></entry>
+      <entry><type>int2</type></entry>
+      <entry></entry>
+      <entry>The number of key columns in the index. "Key columns" are ordinary
+      index columns in contrast with "included" columns.</entry>
+     </row>
+
      <row>
       <entry><structfield>indisunique</structfield></entry>
       <entry><type>bool</type></entry>
diff --git a/doc/src/sgml/indexam.sgml b/doc/src/sgml/indexam.sgml
index 5f7befb..ee40309 100644
--- a/doc/src/sgml/indexam.sgml
+++ b/doc/src/sgml/indexam.sgml
@@ -111,6 +111,8 @@ typedef struct IndexAmRoutine
     bool        amclusterable;
     /* does AM handle predicate locks? */
     bool        ampredlocks;
+    /* does AM support columns included with clause INCLUDING? */
+    bool    amcaninclude;
     /* type of data stored in index, or InvalidOid if variable */
     Oid         amkeytype;
 
@@ -852,7 +854,8 @@ amrestrpos (IndexScanDesc scan);
    using <firstterm>unique indexes</>, which are indexes that disallow
    multiple entries with identical keys.  An access method that supports this
    feature sets <structfield>amcanunique</> true.
-   (At present, only b-tree supports it.)
+   (At present, only b-tree supports it.) Columns which are present in the
+   <literal>INCLUDING</> clause are not used to enforce uniqueness.
   </para>
 
   <para>
diff --git a/doc/src/sgml/indices.sgml b/doc/src/sgml/indices.sgml
index 23bbec6..09d4e6b 100644
--- a/doc/src/sgml/indices.sgml
+++ b/doc/src/sgml/indices.sgml
@@ -633,7 +633,8 @@ CREATE INDEX test3_desc_index ON test3 (id DESC NULLS LAST);
    Indexes can also be used to enforce uniqueness of a column's value,
    or the uniqueness of the combined values of more than one column.
 <synopsis>
-CREATE UNIQUE INDEX <replaceable>name</replaceable> ON <replaceable>table</replaceable> (<replaceable>column</replaceable> <optional>, ...</optional>);
+CREATE UNIQUE INDEX <replaceable>name</replaceable> ON <replaceable>table</replaceable> (<replaceable>column</replaceable> <optional>, ...</optional>)
+<optional>INCLUDING (<replaceable>column</replaceable> <optional>, ...</optional>)</optional>;
 </synopsis>
    Currently, only B-tree indexes can be declared unique.
   </para>
@@ -642,7 +643,9 @@ CREATE UNIQUE INDEX <replaceable>name</replaceable> ON <replaceable>table</repla
    When an index is declared unique, multiple table rows with equal
    indexed values are not allowed.  Null values are not considered
    equal.  A multicolumn unique index will only reject cases where all
-   indexed columns are equal in multiple rows.
+   indexed columns are equal in multiple rows. Columns included with clause
+   <literal>INCLUDING</literal> aren't used to enforce constraints (UNIQUE,
+   PRIMARY KEY, etc).
   </para>
 
   <para>
diff --git a/doc/src/sgml/ref/create_index.sgml b/doc/src/sgml/ref/create_index.sgml
index ce36a1b..8011d6c 100644
--- a/doc/src/sgml/ref/create_index.sgml
+++ b/doc/src/sgml/ref/create_index.sgml
@@ -23,6 +23,7 @@ PostgreSQL documentation
 <synopsis>
 CREATE [ UNIQUE ] INDEX [ CONCURRENTLY ] [ [ IF NOT EXISTS ] <replaceable class="parameter">name</replaceable> ] ON <replaceable class="parameter">table_name</replaceable> [ USING <replaceable class="parameter">method</replaceable> ]
     ( { <replaceable class="parameter">column_name</replaceable> | ( <replaceable class="parameter">expression</replaceable> ) } [ COLLATE <replaceable class="parameter">collation</replaceable> ] [ <replaceable class="parameter">opclass</replaceable> ] [ ASC | DESC ] [ NULLS { FIRST | LAST } ] [, ...] )
+    [ INCLUDING ( { <replaceable class="parameter">column_name</replaceable> | ( <replaceable class="parameter">expression</replaceable> ) } [ COLLATE <replaceable class="parameter">collation</replaceable> ] [ <replaceable class="parameter">opclass</replaceable> ] [ ASC | DESC ] [ NULLS { FIRST | LAST } ] [, ...] )
     [ WITH ( <replaceable class="PARAMETER">storage_parameter</replaceable> = <replaceable class="PARAMETER">value</replaceable> [, ... ] ) ]
     [ TABLESPACE <replaceable class="parameter">tablespace_name</replaceable> ]
     [ WHERE <replaceable class="parameter">predicate</replaceable> ]
@@ -139,6 +140,32 @@ CREATE [ UNIQUE ] INDEX [ CONCURRENTLY ] [ [ IF NOT EXISTS ] <replaceable class=
      </varlistentry>
 
      <varlistentry>
+      <term><literal>INCLUDING</literal></term>
+      <listitem>
+       <para>
+        An optional <literal>INCLUDING</> clause allows a list of columns to be
+        specified which will be included in the index, in the non-key portion of
+        the index. Columns which are part of this clause cannot also exist in the
+        key columns portion of the index, and vice versa. The
+        <literal>INCLUDING</> columns exist solely to allow more queries to benefit
+        from <firstterm>index-only scans</> by including certain columns in the
+        index, the value of which would otherwise have to be obtained by reading
+        the table's heap. Having these columns in the <literal>INCLUDING</> clause
+        in some cases allows <productname>PostgreSQL</> to skip the heap read
+        completely. This also allows <literal>UNIQUE</> indexes to be defined on
+        one set of columns, which can include another set of column in the
+        <literal>INCLUDING</> clause, on which the uniqueness is not enforced upon.
+        It's the same with other constraints (PRIMARY KEY and EXCLUDE). This can
+        also can be used for non-unique indexes as any columns which are not required
+        for the searching or ordering of records can be included in the
+        <literal>INCLUDING</> clause, which can slightly reduce the size of the index,
+        due to storing included attributes only in leaf index pages.
+        Currently, only the B-tree access method supports this feature.
+       </para>
+      </listitem>
+     </varlistentry>
+
+     <varlistentry>
       <term><replaceable class="parameter">name</replaceable></term>
       <listitem>
        <para>
@@ -596,7 +623,7 @@ Indexes:
   <title>Examples</title>
 
   <para>
-   To create a B-tree index on the column <literal>title</literal> in
+   To create a unique B-tree index on the column <literal>title</literal> in
    the table <literal>films</literal>:
 <programlisting>
 CREATE UNIQUE INDEX title_idx ON films (title);
@@ -604,6 +631,15 @@ CREATE UNIQUE INDEX title_idx ON films (title);
   </para>
 
   <para>
+   To create a unique B-tree index on the column <literal>title</literal>
+   and included columns <literal>director</literal> and <literal>rating</literal>
+   in the table <literal>films</literal>:
+<programlisting>
+CREATE UNIQUE INDEX title_idx ON films (title) INCLUDING (director, rating);
+</programlisting>
+  </para>
+
+  <para>
    To create an index on the expression <literal>lower(title)</>,
    allowing efficient case-insensitive searches:
 <programlisting>
diff --git a/src/backend/access/brin/brin.c b/src/backend/access/brin/brin.c
index c740952..c68df18 100644
--- a/src/backend/access/brin/brin.c
+++ b/src/backend/access/brin/brin.c
@@ -92,6 +92,7 @@ brinhandler(PG_FUNCTION_ARGS)
 	amroutine->amstorage = true;
 	amroutine->amclusterable = false;
 	amroutine->ampredlocks = false;
+	amroutine->amcaninclude = false;
 	amroutine->amkeytype = InvalidOid;
 
 	amroutine->ambuild = brinbuild;
diff --git a/src/backend/access/common/indextuple.c b/src/backend/access/common/indextuple.c
index 274a6c2..db723c4 100644
--- a/src/backend/access/common/indextuple.c
+++ b/src/backend/access/common/indextuple.c
@@ -19,6 +19,7 @@
 #include "access/heapam.h"
 #include "access/itup.h"
 #include "access/tuptoaster.h"
+#include "utils/rel.h"
 
 
 /* ----------------------------------------------------------------
@@ -441,3 +442,30 @@ CopyIndexTuple(IndexTuple source)
 	memcpy(result, source, size);
 	return result;
 }
+
+/*
+ * Reform index tuple. Truncate nonkey (INCLUDING) attributes.
+ */
+IndexTuple
+index_reform_tuple(Relation idxrel, IndexTuple olditup, int natts, int nkeyatts)
+{
+	TupleDesc   itupdesc = RelationGetDescr(idxrel);
+	Datum       values[INDEX_MAX_KEYS];
+	bool        isnull[INDEX_MAX_KEYS];
+	IndexTuple	newitup;
+
+	Assert(natts <= INDEX_MAX_KEYS);
+	Assert(nkeyatts > 0);
+	Assert(nkeyatts <= natts);
+
+	index_deform_tuple(olditup, itupdesc, values, isnull);
+
+	/* form new tuple that will contain only key attributes */
+	itupdesc->natts = nkeyatts;
+	newitup = index_form_tuple(itupdesc, values, isnull);
+	newitup->t_tid = olditup->t_tid;
+
+	itupdesc->natts = natts;
+
+	return newitup;
+}
diff --git a/src/backend/access/gin/ginutil.c b/src/backend/access/gin/ginutil.c
index 9450267..b4c69e7 100644
--- a/src/backend/access/gin/ginutil.c
+++ b/src/backend/access/gin/ginutil.c
@@ -47,6 +47,7 @@ ginhandler(PG_FUNCTION_ARGS)
 	amroutine->amstorage = true;
 	amroutine->amclusterable = false;
 	amroutine->ampredlocks = false;
+	amroutine->amcaninclude = false;
 	amroutine->amkeytype = InvalidOid;
 
 	amroutine->ambuild = ginbuild;
diff --git a/src/backend/access/gist/gist.c b/src/backend/access/gist/gist.c
index 996363c..64cc8df 100644
--- a/src/backend/access/gist/gist.c
+++ b/src/backend/access/gist/gist.c
@@ -69,6 +69,7 @@ gisthandler(PG_FUNCTION_ARGS)
 	amroutine->amstorage = true;
 	amroutine->amclusterable = true;
 	amroutine->ampredlocks = false;
+	amroutine->amcaninclude = false;
 	amroutine->amkeytype = InvalidOid;
 
 	amroutine->ambuild = gistbuild;
diff --git a/src/backend/access/hash/hash.c b/src/backend/access/hash/hash.c
index 3d48c4f..6d8d68d 100644
--- a/src/backend/access/hash/hash.c
+++ b/src/backend/access/hash/hash.c
@@ -64,6 +64,7 @@ hashhandler(PG_FUNCTION_ARGS)
 	amroutine->amstorage = false;
 	amroutine->amclusterable = false;
 	amroutine->ampredlocks = false;
+	amroutine->amcaninclude = false;
 	amroutine->amkeytype = INT4OID;
 
 	amroutine->ambuild = hashbuild;
diff --git a/src/backend/access/index/genam.c b/src/backend/access/index/genam.c
index 65c941d..fa719c2 100644
--- a/src/backend/access/index/genam.c
+++ b/src/backend/access/index/genam.c
@@ -175,6 +175,7 @@ BuildIndexValueDescription(Relation indexRelation,
 	Form_pg_index idxrec;
 	HeapTuple	ht_idx;
 	int			natts = indexRelation->rd_rel->relnatts;
+	int			nkeyatts;
 	int			i;
 	int			keyno;
 	Oid			indexrelid = RelationGetRelid(indexRelation);
@@ -244,6 +245,7 @@ BuildIndexValueDescription(Relation indexRelation,
 	appendStringInfo(&buf, "(%s)=(",
 					 pg_get_indexdef_columns(indexrelid, true));
 
+	nkeyatts = IndexRelationGetNumberOfKeyAttributes(indexRelation);
 	for (i = 0; i < natts; i++)
 	{
 		char	   *val;
@@ -254,20 +256,31 @@ BuildIndexValueDescription(Relation indexRelation,
 		{
 			Oid			foutoid;
 			bool		typisvarlena;
-
+			TupleDesc	tupdesc = RelationGetDescr(indexRelation);
 			/*
-			 * The provided data is not necessarily of the type stored in the
-			 * index; rather it is of the index opclass's input type. So look
-			 * at rd_opcintype not the index tupdesc.
+			 * For key attributes the provided data is not necessarily of the
+			 * type stored in the index; rather it is of the index opclass's
+			 * input type. So look at rd_opcintype not the index tupdesc.
 			 *
 			 * Note: this is a bit shaky for opclasses that have pseudotype
 			 * input types such as ANYARRAY or RECORD.  Currently, the
 			 * typoutput functions associated with the pseudotypes will work
 			 * okay, but we might have to try harder in future.
+			 *
+			 * For included attributes just use info stored in the index
+			 * tupdesc.
 			 */
-			getTypeOutputInfo(indexRelation->rd_opcintype[i],
-							  &foutoid, &typisvarlena);
-			val = OidOutputFunctionCall(foutoid, values[i]);
+			if (i < nkeyatts)
+			{
+				getTypeOutputInfo(indexRelation->rd_opcintype[i],
+								&foutoid, &typisvarlena);
+				val = OidOutputFunctionCall(foutoid, values[i]);
+			}
+			else
+			{
+				getTypeOutputInfo(tupdesc->attrs[i]->atttypid, &foutoid, &typisvarlena);
+				val = OidOutputFunctionCall(foutoid, values[i]);
+			}
 		}
 
 		if (i > 0)
diff --git a/src/backend/access/nbtree/nbtinsert.c b/src/backend/access/nbtree/nbtinsert.c
index e3c55eb..db82f78 100644
--- a/src/backend/access/nbtree/nbtinsert.c
+++ b/src/backend/access/nbtree/nbtinsert.c
@@ -108,18 +108,22 @@ _bt_doinsert(Relation rel, IndexTuple itup,
 			 IndexUniqueCheck checkUnique, Relation heapRel)
 {
 	bool		is_unique = false;
-	int			natts = rel->rd_rel->relnatts;
+	int			nkeyatts;
 	ScanKey		itup_scankey;
 	BTStack		stack;
 	Buffer		buf;
 	OffsetNumber offset;
 
+	Assert(rel->rd_index->indnatts != 0);
+	Assert(rel->rd_index->indnkeyatts != 0);
+	nkeyatts = IndexRelationGetNumberOfKeyAttributes(rel);
+
 	/* we need an insertion scan key to do our search, so build one */
 	itup_scankey = _bt_mkscankey(rel, itup);
 
 top:
 	/* find the first page containing this key */
-	stack = _bt_search(rel, natts, itup_scankey, false, &buf, BT_WRITE);
+	stack = _bt_search(rel, nkeyatts, itup_scankey, false, &buf, BT_WRITE);
 
 	offset = InvalidOffsetNumber;
 
@@ -134,7 +138,7 @@ top:
 	 * move right in the tree.  See Lehman and Yao for an excruciatingly
 	 * precise description.
 	 */
-	buf = _bt_moveright(rel, buf, natts, itup_scankey, false,
+	buf = _bt_moveright(rel, buf, nkeyatts, itup_scankey, false,
 						true, stack, BT_WRITE);
 
 	/*
@@ -163,7 +167,7 @@ top:
 		TransactionId xwait;
 		uint32		speculativeToken;
 
-		offset = _bt_binsrch(rel, buf, natts, itup_scankey, false);
+		offset = _bt_binsrch(rel, buf, nkeyatts, itup_scankey, false);
 		xwait = _bt_check_unique(rel, itup, heapRel, buf, offset, itup_scankey,
 								 checkUnique, &is_unique, &speculativeToken);
 
@@ -199,7 +203,7 @@ top:
 		 */
 		CheckForSerializableConflictIn(rel, NULL, buf);
 		/* do the insertion */
-		_bt_findinsertloc(rel, &buf, &offset, natts, itup_scankey, itup,
+		_bt_findinsertloc(rel, &buf, &offset, nkeyatts, itup_scankey, itup,
 						  stack, heapRel);
 		_bt_insertonpg(rel, buf, InvalidBuffer, stack, itup, offset, false);
 	}
@@ -242,7 +246,7 @@ _bt_check_unique(Relation rel, IndexTuple itup, Relation heapRel,
 				 uint32 *speculativeToken)
 {
 	TupleDesc	itupdesc = RelationGetDescr(rel);
-	int			natts = rel->rd_rel->relnatts;
+	int			nkeyatts = rel->rd_index->indnkeyatts;
 	SnapshotData SnapshotDirty;
 	OffsetNumber maxoff;
 	Page		page;
@@ -301,7 +305,7 @@ _bt_check_unique(Relation rel, IndexTuple itup, Relation heapRel,
 				 * in real comparison, but only for ordering/finding items on
 				 * pages. - vadim 03/24/97
 				 */
-				if (!_bt_isequal(itupdesc, page, offset, natts, itup_scankey))
+				if (!_bt_isequal(itupdesc, page, offset, nkeyatts, itup_scankey))
 					break;		/* we're past all the equal tuples */
 
 				/* okay, we gotta fetch the heap tuple ... */
@@ -457,7 +461,7 @@ _bt_check_unique(Relation rel, IndexTuple itup, Relation heapRel,
 			if (P_RIGHTMOST(opaque))
 				break;
 			if (!_bt_isequal(itupdesc, page, P_HIKEY,
-							 natts, itup_scankey))
+							 nkeyatts, itup_scankey))
 				break;
 			/* Advance to next non-dead page --- there must be one */
 			for (;;)
@@ -745,6 +749,14 @@ _bt_insertonpg(Relation rel,
 		elog(ERROR, "cannot insert to incompletely split page %u",
 			 BufferGetBlockNumber(buf));
 
+	/* Truncate nonkey attributes when inserting on nonleaf pages. */
+	if (rel->rd_index->indnatts != rel->rd_index->indnkeyatts
+		&& !P_ISLEAF(lpageop))
+	{
+			itup = index_reform_tuple(rel, itup,
+				rel->rd_index->indnatts, rel->rd_index->indnkeyatts);
+	}
+
 	itemsz = IndexTupleDSize(*itup);
 	itemsz = MAXALIGN(itemsz);	/* be safe, PageAddItem will do this but we
 								 * need to be consistent */
@@ -1961,7 +1973,11 @@ _bt_newroot(Relation rel, Buffer lbuf, Buffer rbuf)
 	itemid = PageGetItemId(lpage, P_HIKEY);
 	right_item_sz = ItemIdGetLength(itemid);
 	item = (IndexTuple) PageGetItem(lpage, itemid);
-	right_item = CopyIndexTuple(item);
+
+	if (rel->rd_index->indnatts != rel->rd_index->indnkeyatts)
+		right_item = index_reform_tuple(rel, item, rel->rd_index->indnatts, rel->rd_index->indnkeyatts);
+	else
+		right_item = CopyIndexTuple(item);
 	ItemPointerSet(&(right_item->t_tid), rbkno, P_HIKEY);
 
 	/* NO EREPORT(ERROR) from here till newroot op is logged */
diff --git a/src/backend/access/nbtree/nbtpage.c b/src/backend/access/nbtree/nbtpage.c
index 67755d7..6d64a8b 100644
--- a/src/backend/access/nbtree/nbtpage.c
+++ b/src/backend/access/nbtree/nbtpage.c
@@ -1254,8 +1254,9 @@ _bt_pagedel(Relation rel, Buffer buf)
 				/* we need an insertion scan key for the search, so build one */
 				itup_scankey = _bt_mkscankey(rel, targetkey);
 				/* find the leftmost leaf page containing this key */
-				stack = _bt_search(rel, rel->rd_rel->relnatts, itup_scankey,
-								   false, &lbuf, BT_READ);
+				stack = _bt_search(rel,
+								   IndexRelationGetNumberOfKeyAttributes(rel),
+								   itup_scankey, false, &lbuf, BT_READ);
 				/* don't need a pin on the page */
 				_bt_relbuf(rel, lbuf);
 
diff --git a/src/backend/access/nbtree/nbtree.c b/src/backend/access/nbtree/nbtree.c
index f2905cb..24f2b9e 100644
--- a/src/backend/access/nbtree/nbtree.c
+++ b/src/backend/access/nbtree/nbtree.c
@@ -98,6 +98,7 @@ bthandler(PG_FUNCTION_ARGS)
 	amroutine->amstorage = false;
 	amroutine->amclusterable = true;
 	amroutine->ampredlocks = true;
+	amroutine->amcaninclude = true;
 	amroutine->amkeytype = InvalidOid;
 
 	amroutine->ambuild = btbuild;
diff --git a/src/backend/access/nbtree/nbtsort.c b/src/backend/access/nbtree/nbtsort.c
index 99a014e..ae74c15 100644
--- a/src/backend/access/nbtree/nbtsort.c
+++ b/src/backend/access/nbtree/nbtsort.c
@@ -593,6 +593,22 @@ _bt_buildadd(BTWriteState *wstate, BTPageState *state, IndexTuple itup)
 		state->btps_minkey = CopyIndexTuple(itup);
 	}
 
+	/* Truncate nonkey attributes when inserting on nonleaf pages */
+	if (wstate->index->rd_index->indnatts
+		!= wstate->index->rd_index->indnkeyatts)
+	{
+		BTPageOpaque pageop = (BTPageOpaque) PageGetSpecialPointer(npage);
+
+		if (!P_ISLEAF(pageop))
+		{
+			itup = index_reform_tuple(wstate->index,
+					itup, wstate->index->rd_index->indnatts,
+					wstate->index->rd_index->indnkeyatts);
+			itupsz = IndexTupleDSize(*itup);
+			itupsz = MAXALIGN(itupsz);
+		}
+	}
+
 	/*
 	 * Add the new item into the current page.
 	 */
diff --git a/src/backend/access/nbtree/nbtutils.c b/src/backend/access/nbtree/nbtutils.c
index c850b48..98bc4f7 100644
--- a/src/backend/access/nbtree/nbtutils.c
+++ b/src/backend/access/nbtree/nbtutils.c
@@ -63,17 +63,24 @@ _bt_mkscankey(Relation rel, IndexTuple itup)
 {
 	ScanKey		skey;
 	TupleDesc	itupdesc;
-	int			natts;
+	int			nkeyatts;
 	int16	   *indoption;
 	int			i;
 
 	itupdesc = RelationGetDescr(rel);
-	natts = RelationGetNumberOfAttributes(rel);
+	nkeyatts = rel->rd_index->indnkeyatts;
 	indoption = rel->rd_indoption;
 
-	skey = (ScanKey) palloc(natts * sizeof(ScanKeyData));
+	Assert(rel->rd_index->indnkeyatts != 0);
+	Assert(rel->rd_index->indnkeyatts <= rel->rd_index->indnatts);
 
-	for (i = 0; i < natts; i++)
+	/*
+	 * We'll execute search using ScanKey constructed on key columns.
+	 * Non key (included) columns must be omitted.
+	 */
+	skey = (ScanKey) palloc(nkeyatts * sizeof(ScanKeyData));
+
+	for (i = 0; i < nkeyatts; i++)
 	{
 		FmgrInfo   *procinfo;
 		Datum		arg;
@@ -115,16 +122,16 @@ ScanKey
 _bt_mkscankey_nodata(Relation rel)
 {
 	ScanKey		skey;
-	int			natts;
+	int			nkeyatts;
 	int16	   *indoption;
 	int			i;
 
-	natts = RelationGetNumberOfAttributes(rel);
+	nkeyatts = IndexRelationGetNumberOfKeyAttributes(rel);
 	indoption = rel->rd_indoption;
 
-	skey = (ScanKey) palloc(natts * sizeof(ScanKeyData));
+	skey = (ScanKey) palloc(nkeyatts * sizeof(ScanKeyData));
 
-	for (i = 0; i < natts; i++)
+	for (i = 0; i < nkeyatts; i++)
 	{
 		FmgrInfo   *procinfo;
 		int			flags;
diff --git a/src/backend/bootstrap/bootparse.y b/src/backend/bootstrap/bootparse.y
index 41d2fd4..c47c387 100644
--- a/src/backend/bootstrap/bootparse.y
+++ b/src/backend/bootstrap/bootparse.y
@@ -293,6 +293,7 @@ Boot_DeclareIndexStmt:
 					stmt->accessMethod = $8;
 					stmt->tableSpace = NULL;
 					stmt->indexParams = $10;
+					stmt->indexIncludingParams = NIL;
 					stmt->options = NIL;
 					stmt->whereClause = NULL;
 					stmt->excludeOpNames = NIL;
@@ -336,6 +337,7 @@ Boot_DeclareUniqueIndexStmt:
 					stmt->accessMethod = $9;
 					stmt->tableSpace = NULL;
 					stmt->indexParams = $11;
+					stmt->indexIncludingParams = NIL;
 					stmt->options = NIL;
 					stmt->whereClause = NULL;
 					stmt->excludeOpNames = NIL;
diff --git a/src/backend/catalog/index.c b/src/backend/catalog/index.c
index 313ee9c..dd7f328 100644
--- a/src/backend/catalog/index.c
+++ b/src/backend/catalog/index.c
@@ -215,7 +215,7 @@ index_check_primary_key(Relation heapRel,
 	 * null, otherwise attempt to ALTER TABLE .. SET NOT NULL
 	 */
 	cmds = NIL;
-	for (i = 0; i < indexInfo->ii_NumIndexAttrs; i++)
+	for (i = 0; i < indexInfo->ii_NumIndexKeyAttrs; i++)
 	{
 		AttrNumber	attnum = indexInfo->ii_KeyAttrNumbers[i];
 		HeapTuple	atttuple;
@@ -423,6 +423,9 @@ ConstructTupleDescriptor(Relation heapRelation,
 		namestrcpy(&to->attname, (const char *) lfirst(colnames_item));
 		colnames_item = lnext(colnames_item);
 
+		if (i >= indexInfo->ii_NumIndexKeyAttrs)
+			continue;
+
 		/*
 		 * Check the opclass and index AM to see if either provides a keytype
 		 * (overriding the attribute type).  Opclass takes precedence.
@@ -604,6 +607,7 @@ UpdateIndexRelation(Oid indexoid,
 	values[Anum_pg_index_indexrelid - 1] = ObjectIdGetDatum(indexoid);
 	values[Anum_pg_index_indrelid - 1] = ObjectIdGetDatum(heapoid);
 	values[Anum_pg_index_indnatts - 1] = Int16GetDatum(indexInfo->ii_NumIndexAttrs);
+	values[Anum_pg_index_indnkeyatts - 1] = Int16GetDatum(indexInfo->ii_NumIndexKeyAttrs);
 	values[Anum_pg_index_indisunique - 1] = BoolGetDatum(indexInfo->ii_Unique);
 	values[Anum_pg_index_indisprimary - 1] = BoolGetDatum(primary);
 	values[Anum_pg_index_indisexclusion - 1] = BoolGetDatum(isexclusion);
@@ -1009,7 +1013,7 @@ index_create(Relation heapRelation,
 		}
 
 		/* Store dependency on operator classes */
-		for (i = 0; i < indexInfo->ii_NumIndexAttrs; i++)
+		for (i = 0; i < indexInfo->ii_NumIndexKeyAttrs; i++)
 		{
 			referenced.classId = OperatorClassRelationId;
 			referenced.objectId = classObjectId[i];
@@ -1067,6 +1071,8 @@ index_create(Relation heapRelation,
 	else
 		Assert(indexRelation->rd_indexcxt != NULL);
 
+	indexRelation->rd_index->indnkeyatts = indexInfo->ii_NumIndexKeyAttrs;
+
 	/*
 	 * If this is bootstrap (initdb) time, then we don't actually fill in the
 	 * index yet.  We'll be creating more indexes and classes later, so we
@@ -1187,7 +1193,7 @@ index_constraint_create(Relation heapRelation,
 								   true,
 								   RelationGetRelid(heapRelation),
 								   indexInfo->ii_KeyAttrNumbers,
-								   indexInfo->ii_NumIndexAttrs,
+								   indexInfo->ii_NumIndexKeyAttrs,
 								   InvalidOid,	/* no domain */
 								   indexRelationId,		/* index OID */
 								   InvalidOid,	/* no foreign key */
@@ -1627,15 +1633,19 @@ BuildIndexInfo(Relation index)
 	IndexInfo  *ii = makeNode(IndexInfo);
 	Form_pg_index indexStruct = index->rd_index;
 	int			i;
-	int			numKeys;
+	int			numAtts;
 
 	/* check the number of keys, and copy attr numbers into the IndexInfo */
-	numKeys = indexStruct->indnatts;
-	if (numKeys < 1 || numKeys > INDEX_MAX_KEYS)
+	numAtts = indexStruct->indnatts;
+	if (numAtts < 1 || numAtts > INDEX_MAX_KEYS)
 		elog(ERROR, "invalid indnatts %d for index %u",
-			 numKeys, RelationGetRelid(index));
-	ii->ii_NumIndexAttrs = numKeys;
-	for (i = 0; i < numKeys; i++)
+			 numAtts, RelationGetRelid(index));
+	ii->ii_NumIndexAttrs = numAtts;
+	ii->ii_NumIndexKeyAttrs = indexStruct->indnkeyatts;
+	Assert(ii->ii_NumIndexKeyAttrs != 0);
+	Assert(ii->ii_NumIndexKeyAttrs <= ii->ii_NumIndexAttrs);
+
+	for (i = 0; i < numAtts; i++)
 		ii->ii_KeyAttrNumbers[i] = indexStruct->indkey.values[i];
 
 	/* fetch any expressions needed for expressional indexes */
@@ -1691,9 +1701,11 @@ BuildIndexInfo(Relation index)
 void
 BuildSpeculativeIndexInfo(Relation index, IndexInfo *ii)
 {
-	int			ncols = index->rd_rel->relnatts;
+	int			nkeycols;
 	int			i;
 
+	nkeycols = IndexRelationGetNumberOfKeyAttributes(index);
+
 	/*
 	 * fetch info for checking unique indexes
 	 */
@@ -1702,16 +1714,16 @@ BuildSpeculativeIndexInfo(Relation index, IndexInfo *ii)
 	if (index->rd_rel->relam != BTREE_AM_OID)
 		elog(ERROR, "unexpected non-btree speculative unique index");
 
-	ii->ii_UniqueOps = (Oid *) palloc(sizeof(Oid) * ncols);
-	ii->ii_UniqueProcs = (Oid *) palloc(sizeof(Oid) * ncols);
-	ii->ii_UniqueStrats = (uint16 *) palloc(sizeof(uint16) * ncols);
+	ii->ii_UniqueOps = (Oid *) palloc(sizeof(Oid) * nkeycols);
+	ii->ii_UniqueProcs = (Oid *) palloc(sizeof(Oid) * nkeycols);
+	ii->ii_UniqueStrats = (uint16 *) palloc(sizeof(uint16) * nkeycols);
 
 	/*
 	 * We have to look up the operator's strategy number.  This provides a
 	 * cross-check that the operator does match the index.
 	 */
 	/* We need the func OIDs and strategy numbers too */
-	for (i = 0; i < ncols; i++)
+	for (i = 0; i < nkeycols; i++)
 	{
 		ii->ii_UniqueStrats[i] = BTEqualStrategyNumber;
 		ii->ii_UniqueOps[i] =
diff --git a/src/backend/catalog/indexing.c b/src/backend/catalog/indexing.c
index b9fe102..6096fa5 100644
--- a/src/backend/catalog/indexing.c
+++ b/src/backend/catalog/indexing.c
@@ -119,6 +119,7 @@ CatalogIndexInsert(CatalogIndexState indstate, HeapTuple heapTuple)
 		Assert(indexInfo->ii_Predicate == NIL);
 		Assert(indexInfo->ii_ExclusionOps == NULL);
 		Assert(relationDescs[i]->rd_index->indimmediate);
+		Assert(indexInfo->ii_NumIndexKeyAttrs != 0);
 
 		/*
 		 * FormIndexDatum fills in its values and isnull parameters with the
diff --git a/src/backend/catalog/toasting.c b/src/backend/catalog/toasting.c
index f40a005..e09c825 100644
--- a/src/backend/catalog/toasting.c
+++ b/src/backend/catalog/toasting.c
@@ -314,6 +314,7 @@ create_toast_table(Relation rel, Oid toastOid, Oid toastIndexOid,
 
 	indexInfo = makeNode(IndexInfo);
 	indexInfo->ii_NumIndexAttrs = 2;
+	indexInfo->ii_NumIndexKeyAttrs = 2;
 	indexInfo->ii_KeyAttrNumbers[0] = 1;
 	indexInfo->ii_KeyAttrNumbers[1] = 2;
 	indexInfo->ii_Expressions = NIL;
diff --git a/src/backend/commands/indexcmds.c b/src/backend/commands/indexcmds.c
index 13b04e6..d7d9208 100644
--- a/src/backend/commands/indexcmds.c
+++ b/src/backend/commands/indexcmds.c
@@ -213,7 +213,7 @@ CheckIndexCompatible(Oid oldId,
 	}
 
 	/* Any change in operator class or collation breaks compatibility. */
-	old_natts = indexForm->indnatts;
+	old_natts = indexForm->indnkeyatts;
 	Assert(old_natts == numberOfAttributes);
 
 	d = SysCacheGetAttr(INDEXRELID, tuple, Anum_pg_index_indcollation, &isnull);
@@ -327,6 +327,7 @@ DefineIndex(Oid relationId,
 	int16	   *coloptions;
 	IndexInfo  *indexInfo;
 	int			numberOfAttributes;
+	int			numberOfKeyAttributes;
 	TransactionId limitXmin;
 	VirtualTransactionId *old_snapshots;
 	ObjectAddress address;
@@ -337,10 +338,30 @@ DefineIndex(Oid relationId,
 	Snapshot	snapshot;
 	int			i;
 
+	if(list_intersection(stmt->indexParams, stmt->indexIncludingParams) != NIL)
+		ereport(ERROR,
+				(errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
+				 errmsg("included columns must not intersect with key columns")));
 	/*
 	 * count attributes in index
 	 */
+	numberOfKeyAttributes = list_length(stmt->indexParams);
+	if (numberOfKeyAttributes <= 0)
+		ereport(ERROR,
+				(errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
+				 errmsg("must specify at least one key column")));
+
+	/*
+	 * We append any INCLUDING columns onto the indexParams list so that
+	 * we have one list with all columns. Later we can determine which of these
+	 * are key columns, and which are just part of the INCLUDING list by check the list
+	 * position. A list item in a position less than ii_NumIndexKeyAttrs is part of
+	 * the key columns, and anything equal to and over is part of the
+	 * INCLUDING columns.
+	 */
+	stmt->indexParams = list_concat(stmt->indexParams, stmt->indexIncludingParams);
 	numberOfAttributes = list_length(stmt->indexParams);
+
 	if (numberOfAttributes <= 0)
 		ereport(ERROR,
 				(errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
@@ -507,6 +528,11 @@ DefineIndex(Oid relationId,
 				(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
 			   errmsg("access method \"%s\" does not support unique indexes",
 					  accessMethodName)));
+	if (list_length(stmt->indexIncludingParams) > 0 && !amRoutine->amcaninclude)
+		ereport(ERROR,
+				(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+				errmsg("access method \"%s\" does not support included columns",
+						accessMethodName)));
 	if (numberOfAttributes > 1 && !amRoutine->amcanmulticol)
 		ereport(ERROR,
 				(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
@@ -544,6 +570,7 @@ DefineIndex(Oid relationId,
 	 */
 	indexInfo = makeNode(IndexInfo);
 	indexInfo->ii_NumIndexAttrs = numberOfAttributes;
+	indexInfo->ii_NumIndexKeyAttrs = numberOfKeyAttributes;
 	indexInfo->ii_Expressions = NIL;	/* for now */
 	indexInfo->ii_ExpressionsState = NIL;
 	indexInfo->ii_Predicate = make_ands_implicit((Expr *) stmt->whereClause);
@@ -966,16 +993,15 @@ ComputeIndexAttrs(IndexInfo *indexInfo,
 	ListCell   *nextExclOp;
 	ListCell   *lc;
 	int			attn;
+	int 		nkeycols = indexInfo->ii_NumIndexKeyAttrs;
 
 	/* Allocate space for exclusion operator info, if needed */
 	if (exclusionOpNames)
 	{
-		int			ncols = list_length(attList);
-
-		Assert(list_length(exclusionOpNames) == ncols);
-		indexInfo->ii_ExclusionOps = (Oid *) palloc(sizeof(Oid) * ncols);
-		indexInfo->ii_ExclusionProcs = (Oid *) palloc(sizeof(Oid) * ncols);
-		indexInfo->ii_ExclusionStrats = (uint16 *) palloc(sizeof(uint16) * ncols);
+		Assert(list_length(exclusionOpNames) == nkeycols);
+		indexInfo->ii_ExclusionOps = (Oid *) palloc(sizeof(Oid) * nkeycols);
+		indexInfo->ii_ExclusionProcs = (Oid *) palloc(sizeof(Oid) * nkeycols);
+		indexInfo->ii_ExclusionStrats = (uint16 *) palloc(sizeof(uint16) * nkeycols);
 		nextExclOp = list_head(exclusionOpNames);
 	}
 	else
@@ -1028,6 +1054,11 @@ ComputeIndexAttrs(IndexInfo *indexInfo,
 			Node	   *expr = attribute->expr;
 
 			Assert(expr != NULL);
+
+			if (attn >= nkeycols)
+				ereport(ERROR,
+					(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+						errmsg("Expressions in INCLUDING clause are not supported")));
 			atttype = exprType(expr);
 			attcollation = exprCollation(expr);
 
@@ -1108,6 +1139,11 @@ ComputeIndexAttrs(IndexInfo *indexInfo,
 		/*
 		 * Identify the opclass to use.
 		 */
+		if (attn >= nkeycols)
+		{
+			attn++;
+			continue;
+		}
 		classOidP[attn] = GetIndexOpClass(attribute->opclass,
 										  atttype,
 										  accessMethodName,
diff --git a/src/backend/executor/execIndexing.c b/src/backend/executor/execIndexing.c
index 838cee7..cffdc2e 100644
--- a/src/backend/executor/execIndexing.c
+++ b/src/backend/executor/execIndexing.c
@@ -646,7 +646,7 @@ check_exclusion_or_unique_constraint(Relation heap, Relation index,
 	Oid		   *constr_procs;
 	uint16	   *constr_strats;
 	Oid		   *index_collations = index->rd_indcollation;
-	int			index_natts = index->rd_index->indnatts;
+	int			index_nkeyatts = index->rd_index->indnkeyatts;
 	IndexScanDesc index_scan;
 	HeapTuple	tup;
 	ScanKeyData scankeys[INDEX_MAX_KEYS];
@@ -673,7 +673,7 @@ check_exclusion_or_unique_constraint(Relation heap, Relation index,
 	 * If any of the input values are NULL, the constraint check is assumed to
 	 * pass (i.e., we assume the operators are strict).
 	 */
-	for (i = 0; i < index_natts; i++)
+	for (i = 0; i < index_nkeyatts; i++)
 	{
 		if (isnull[i])
 			return true;
@@ -685,7 +685,7 @@ check_exclusion_or_unique_constraint(Relation heap, Relation index,
 	 */
 	InitDirtySnapshot(DirtySnapshot);
 
-	for (i = 0; i < index_natts; i++)
+	for (i = 0; i < index_nkeyatts; i++)
 	{
 		ScanKeyEntryInitialize(&scankeys[i],
 							   0,
@@ -717,8 +717,8 @@ check_exclusion_or_unique_constraint(Relation heap, Relation index,
 retry:
 	conflict = false;
 	found_self = false;
-	index_scan = index_beginscan(heap, index, &DirtySnapshot, index_natts, 0);
-	index_rescan(index_scan, scankeys, index_natts, NULL, 0);
+	index_scan = index_beginscan(heap, index, &DirtySnapshot, index_nkeyatts, 0);
+	index_rescan(index_scan, scankeys, index_nkeyatts, NULL, 0);
 
 	while ((tup = index_getnext(index_scan,
 								ForwardScanDirection)) != NULL)
diff --git a/src/backend/nodes/copyfuncs.c b/src/backend/nodes/copyfuncs.c
index 5877037..0728f44 100644
--- a/src/backend/nodes/copyfuncs.c
+++ b/src/backend/nodes/copyfuncs.c
@@ -3112,6 +3112,7 @@ _copyIndexStmt(const IndexStmt *from)
 	COPY_STRING_FIELD(accessMethod);
 	COPY_STRING_FIELD(tableSpace);
 	COPY_NODE_FIELD(indexParams);
+	COPY_NODE_FIELD(indexIncludingParams);
 	COPY_NODE_FIELD(options);
 	COPY_NODE_FIELD(whereClause);
 	COPY_NODE_FIELD(excludeOpNames);
diff --git a/src/backend/nodes/equalfuncs.c b/src/backend/nodes/equalfuncs.c
index 08ccc0d..f01c10b 100644
--- a/src/backend/nodes/equalfuncs.c
+++ b/src/backend/nodes/equalfuncs.c
@@ -1229,6 +1229,7 @@ _equalIndexStmt(const IndexStmt *a, const IndexStmt *b)
 	COMPARE_STRING_FIELD(accessMethod);
 	COMPARE_STRING_FIELD(tableSpace);
 	COMPARE_NODE_FIELD(indexParams);
+	COMPARE_NODE_FIELD(indexIncludingParams);
 	COMPARE_NODE_FIELD(options);
 	COMPARE_NODE_FIELD(whereClause);
 	COMPARE_NODE_FIELD(excludeOpNames);
diff --git a/src/backend/nodes/outfuncs.c b/src/backend/nodes/outfuncs.c
index b5e0b55..88d3d26 100644
--- a/src/backend/nodes/outfuncs.c
+++ b/src/backend/nodes/outfuncs.c
@@ -2175,6 +2175,7 @@ _outIndexStmt(StringInfo str, const IndexStmt *node)
 	WRITE_STRING_FIELD(accessMethod);
 	WRITE_STRING_FIELD(tableSpace);
 	WRITE_NODE_FIELD(indexParams);
+	WRITE_NODE_FIELD(indexIncludingParams);
 	WRITE_NODE_FIELD(options);
 	WRITE_NODE_FIELD(whereClause);
 	WRITE_NODE_FIELD(excludeOpNames);
diff --git a/src/backend/optimizer/path/pathkeys.c b/src/backend/optimizer/path/pathkeys.c
index eed39b9..3aadc9a 100644
--- a/src/backend/optimizer/path/pathkeys.c
+++ b/src/backend/optimizer/path/pathkeys.c
@@ -450,6 +450,13 @@ build_index_pathkeys(PlannerInfo *root,
 		bool		nulls_first;
 		PathKey    *cpathkey;
 
+		/*
+		 * INCLUDING columns are stored in index unordered,
+		 * so they don't support ordered index scan.
+		 */
+		if(i >= index->nkeycolumns)
+			break;
+
 		/* We assume we don't need to make a copy of the tlist item */
 		indexkey = indextle->expr;
 
diff --git a/src/backend/optimizer/util/plancat.c b/src/backend/optimizer/util/plancat.c
index 0ea9fcf..6b13a34 100644
--- a/src/backend/optimizer/util/plancat.c
+++ b/src/backend/optimizer/util/plancat.c
@@ -166,7 +166,7 @@ get_relation_info(PlannerInfo *root, Oid relationObjectId, bool inhparent,
 			Form_pg_index index;
 			IndexAmRoutine *amroutine;
 			IndexOptInfo *info;
-			int			ncolumns;
+			int			ncolumns, nkeycolumns;
 			int			i;
 
 			/*
@@ -209,19 +209,25 @@ get_relation_info(PlannerInfo *root, Oid relationObjectId, bool inhparent,
 				RelationGetForm(indexRelation)->reltablespace;
 			info->rel = rel;
 			info->ncolumns = ncolumns = index->indnatts;
+			info->nkeycolumns = nkeycolumns = index->indnkeyatts;
+
 			info->indexkeys = (int *) palloc(sizeof(int) * ncolumns);
 			info->indexcollations = (Oid *) palloc(sizeof(Oid) * ncolumns);
-			info->opfamily = (Oid *) palloc(sizeof(Oid) * ncolumns);
-			info->opcintype = (Oid *) palloc(sizeof(Oid) * ncolumns);
+			info->opfamily = (Oid *) palloc(sizeof(Oid) * nkeycolumns);
+			info->opcintype = (Oid *) palloc(sizeof(Oid) * nkeycolumns);
 			info->canreturn = (bool *) palloc(sizeof(bool) * ncolumns);
 
 			for (i = 0; i < ncolumns; i++)
 			{
 				info->indexkeys[i] = index->indkey.values[i];
 				info->indexcollations[i] = indexRelation->rd_indcollation[i];
+				info->canreturn[i] = index_can_return(indexRelation, i + 1);
+			}
+
+			for (i = 0; i < nkeycolumns; i++)
+			{
 				info->opfamily[i] = indexRelation->rd_opfamily[i];
 				info->opcintype[i] = indexRelation->rd_opcintype[i];
-				info->canreturn[i] = index_can_return(indexRelation, i + 1);
 			}
 
 			info->relam = indexRelation->rd_rel->relam;
@@ -249,10 +255,10 @@ get_relation_info(PlannerInfo *root, Oid relationObjectId, bool inhparent,
 				Assert(amroutine->amcanorder);
 
 				info->sortopfamily = info->opfamily;
-				info->reverse_sort = (bool *) palloc(sizeof(bool) * ncolumns);
-				info->nulls_first = (bool *) palloc(sizeof(bool) * ncolumns);
+				info->reverse_sort = (bool *) palloc(sizeof(bool) * nkeycolumns);
+				info->nulls_first = (bool *) palloc(sizeof(bool) * nkeycolumns);
 
-				for (i = 0; i < ncolumns; i++)
+				for (i = 0; i < nkeycolumns; i++)
 				{
 					int16		opt = indexRelation->rd_indoption[i];
 
@@ -276,11 +282,11 @@ get_relation_info(PlannerInfo *root, Oid relationObjectId, bool inhparent,
 				 * of current or foreseeable amcanorder index types, it's not
 				 * worth expending more effort on now.
 				 */
-				info->sortopfamily = (Oid *) palloc(sizeof(Oid) * ncolumns);
-				info->reverse_sort = (bool *) palloc(sizeof(bool) * ncolumns);
-				info->nulls_first = (bool *) palloc(sizeof(bool) * ncolumns);
+				info->sortopfamily = (Oid *) palloc(sizeof(Oid) * nkeycolumns);
+				info->reverse_sort = (bool *) palloc(sizeof(bool) * nkeycolumns);
+				info->nulls_first = (bool *) palloc(sizeof(bool) * nkeycolumns);
 
-				for (i = 0; i < ncolumns; i++)
+				for (i = 0; i < nkeycolumns; i++)
 				{
 					int16		opt = indexRelation->rd_indoption[i];
 					Oid			ltopr;
diff --git a/src/backend/parser/gram.y b/src/backend/parser/gram.y
index b307b48..53f5641 100644
--- a/src/backend/parser/gram.y
+++ b/src/backend/parser/gram.y
@@ -354,6 +354,7 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query);
 				oper_argtypes RuleActionList RuleActionMulti
 				opt_column_list columnList opt_name_list
 				sort_clause opt_sort_clause sortby_list index_params
+				optincluding opt_including index_including_params
 				name_list role_list from_clause from_list opt_array_bounds
 				qualified_name_list any_name any_name_list type_name_list
 				any_operator expr_list attrs
@@ -6604,7 +6605,7 @@ defacl_privilege_target:
 
 IndexStmt:	CREATE opt_unique INDEX opt_concurrently opt_index_name
 			ON qualified_name access_method_clause '(' index_params ')'
-			opt_reloptions OptTableSpace where_clause
+			opt_including opt_reloptions OptTableSpace where_clause
 				{
 					IndexStmt *n = makeNode(IndexStmt);
 					n->unique = $2;
@@ -6613,9 +6614,10 @@ IndexStmt:	CREATE opt_unique INDEX opt_concurrently opt_index_name
 					n->relation = $7;
 					n->accessMethod = $8;
 					n->indexParams = $10;
-					n->options = $12;
-					n->tableSpace = $13;
-					n->whereClause = $14;
+					n->indexIncludingParams = $12;
+					n->options = $13;
+					n->tableSpace = $14;
+					n->whereClause = $15;
 					n->excludeOpNames = NIL;
 					n->idxcomment = NULL;
 					n->indexOid = InvalidOid;
@@ -6720,6 +6722,16 @@ index_elem:	ColId opt_collate opt_class opt_asc_desc opt_nulls_order
 				}
 		;
 
+optincluding : '(' index_including_params ')'		{ $$ = $2; }
+		;
+opt_including:		INCLUDING optincluding			{ $$ = $2; }
+			 |		/* EMPTY */						{ $$ = NIL; }
+		;
+
+index_including_params:	index_elem						{ $$ = list_make1($1); }
+			| index_including_params ',' index_elem		{ $$ = lappend($1, $3); }
+		;
+
 opt_collate: COLLATE any_name						{ $$ = $2; }
 			| /*EMPTY*/								{ $$ = NIL; }
 		;
diff --git a/src/backend/utils/adt/ruleutils.c b/src/backend/utils/adt/ruleutils.c
index 4efd298..ac330f1 100644
--- a/src/backend/utils/adt/ruleutils.c
+++ b/src/backend/utils/adt/ruleutils.c
@@ -1140,10 +1140,23 @@ pg_get_indexdef_worker(Oid indexrelid, int colno,
 		Oid			keycoltype;
 		Oid			keycolcollation;
 
+		/* Report the INCLUDED attributes, if any. */
+		if(keyno == idxrec->indnkeyatts)
+		{
+			if(!attrsOnly)
+			{
+				appendStringInfoString(&buf, ") INCLUDING (");
+				sep = "";
+			}
+		}
+		else if (keyno > idxrec->indnkeyatts)
+			sep = ", ";
+
 		if (!colno)
 			appendStringInfoString(&buf, sep);
 		sep = ", ";
 
+
 		if (attnum != 0)
 		{
 			/* Simple index column */
@@ -1151,8 +1164,13 @@ pg_get_indexdef_worker(Oid indexrelid, int colno,
 			int32		keycoltypmod;
 
 			attname = get_relid_attribute_name(indrelid, attnum);
+
 			if (!colno || colno == keyno + 1)
+			{
 				appendStringInfoString(&buf, quote_identifier(attname));
+				if (attrsOnly && keyno >= idxrec->indnkeyatts)
+					appendStringInfoString(&buf, " (included)");
+			}
 			get_atttypetypmodcoll(indrelid, attnum,
 								  &keycoltype, &keycoltypmod,
 								  &keycolcollation);
@@ -1192,6 +1210,9 @@ pg_get_indexdef_worker(Oid indexrelid, int colno,
 				appendStringInfo(&buf, " COLLATE %s",
 								 generate_collation_name((indcoll)));
 
+			if(keyno >= idxrec->indnkeyatts)
+				continue;
+
 			/* Add the operator class name, if not default */
 			get_opclass_name(indclass->values[keyno], keycoltype, &buf);
 
diff --git a/src/backend/utils/cache/relcache.c b/src/backend/utils/cache/relcache.c
index 130c06d..5ee80ab 100644
--- a/src/backend/utils/cache/relcache.c
+++ b/src/backend/utils/cache/relcache.c
@@ -1206,6 +1206,7 @@ RelationInitIndexAccessInfo(Relation relation)
 	MemoryContext indexcxt;
 	MemoryContext oldcontext;
 	int			natts;
+	int			nkeyatts;
 	uint16		amsupport;
 
 	/*
@@ -1239,6 +1240,7 @@ RelationInitIndexAccessInfo(Relation relation)
 	if (natts != relation->rd_index->indnatts)
 		elog(ERROR, "relnatts disagrees with indnatts for index %u",
 			 RelationGetRelid(relation));
+	nkeyatts = IndexRelationGetNumberOfKeyAttributes(relation);
 
 	/*
 	 * Make the private context to hold index access info.  The reason we need
@@ -1264,9 +1266,9 @@ RelationInitIndexAccessInfo(Relation relation)
 	 * Allocate arrays to hold data
 	 */
 	relation->rd_opfamily = (Oid *)
-		MemoryContextAllocZero(indexcxt, natts * sizeof(Oid));
+		MemoryContextAllocZero(indexcxt, nkeyatts * sizeof(Oid));
 	relation->rd_opcintype = (Oid *)
-		MemoryContextAllocZero(indexcxt, natts * sizeof(Oid));
+		MemoryContextAllocZero(indexcxt, nkeyatts * sizeof(Oid));
 
 	amsupport = relation->rd_amroutine->amsupport;
 	if (amsupport > 0)
@@ -1322,7 +1324,7 @@ RelationInitIndexAccessInfo(Relation relation)
 	 */
 	IndexSupportInitialize(indclass, relation->rd_support,
 						   relation->rd_opfamily, relation->rd_opcintype,
-						   amsupport, natts);
+						   amsupport, nkeyatts);
 
 	/*
 	 * Similarly extract indoption and copy it to the cache entry
@@ -1968,6 +1970,7 @@ RelationReloadIndexInfo(Relation relation)
 		 * it's not worth it to track exactly which ones they are.  None of
 		 * the array fields are allowed to change, though.
 		 */
+		relation->rd_index->indnkeyatts = index->indnkeyatts;
 		relation->rd_index->indisunique = index->indisunique;
 		relation->rd_index->indisprimary = index->indisprimary;
 		relation->rd_index->indisexclusion = index->indisexclusion;
@@ -4397,7 +4400,7 @@ RelationGetExclusionInfo(Relation indexRelation,
 						 Oid **procs,
 						 uint16 **strategies)
 {
-	int			ncols = indexRelation->rd_rel->relnatts;
+	int			nkeycols;
 	Oid		   *ops;
 	Oid		   *funcs;
 	uint16	   *strats;
@@ -4409,17 +4412,19 @@ RelationGetExclusionInfo(Relation indexRelation,
 	MemoryContext oldcxt;
 	int			i;
 
+	nkeycols = IndexRelationGetNumberOfKeyAttributes(indexRelation);
+
 	/* Allocate result space in caller context */
-	*operators = ops = (Oid *) palloc(sizeof(Oid) * ncols);
-	*procs = funcs = (Oid *) palloc(sizeof(Oid) * ncols);
-	*strategies = strats = (uint16 *) palloc(sizeof(uint16) * ncols);
+	*operators = ops = (Oid *) palloc(sizeof(Oid) * nkeycols);
+	*procs = funcs = (Oid *) palloc(sizeof(Oid) * nkeycols);
+	*strategies = strats = (uint16 *) palloc(sizeof(uint16) * nkeycols);
 
 	/* Quick exit if we have the data cached already */
 	if (indexRelation->rd_exclstrats != NULL)
 	{
-		memcpy(ops, indexRelation->rd_exclops, sizeof(Oid) * ncols);
-		memcpy(funcs, indexRelation->rd_exclprocs, sizeof(Oid) * ncols);
-		memcpy(strats, indexRelation->rd_exclstrats, sizeof(uint16) * ncols);
+		memcpy(ops, indexRelation->rd_exclops, sizeof(Oid) * nkeycols);
+		memcpy(funcs, indexRelation->rd_exclprocs, sizeof(Oid) * nkeycols);
+		memcpy(strats, indexRelation->rd_exclstrats, sizeof(uint16) * nkeycols);
 		return;
 	}
 
@@ -4468,12 +4473,12 @@ RelationGetExclusionInfo(Relation indexRelation,
 		arr = DatumGetArrayTypeP(val);	/* ensure not toasted */
 		nelem = ARR_DIMS(arr)[0];
 		if (ARR_NDIM(arr) != 1 ||
-			nelem != ncols ||
+			nelem != nkeycols ||
 			ARR_HASNULL(arr) ||
 			ARR_ELEMTYPE(arr) != OIDOID)
 			elog(ERROR, "conexclop is not a 1-D Oid array");
 
-		memcpy(ops, ARR_DATA_PTR(arr), sizeof(Oid) * ncols);
+		memcpy(ops, ARR_DATA_PTR(arr), sizeof(Oid) * nkeycols);
 	}
 
 	systable_endscan(conscan);
@@ -4484,7 +4489,7 @@ RelationGetExclusionInfo(Relation indexRelation,
 			 RelationGetRelationName(indexRelation));
 
 	/* We need the func OIDs and strategy numbers too */
-	for (i = 0; i < ncols; i++)
+	for (i = 0; i < nkeycols; i++)
 	{
 		funcs[i] = get_opcode(ops[i]);
 		strats[i] = get_op_opfamily_strategy(ops[i],
@@ -4497,12 +4502,12 @@ RelationGetExclusionInfo(Relation indexRelation,
 
 	/* Save a copy of the results in the relcache entry. */
 	oldcxt = MemoryContextSwitchTo(indexRelation->rd_indexcxt);
-	indexRelation->rd_exclops = (Oid *) palloc(sizeof(Oid) * ncols);
-	indexRelation->rd_exclprocs = (Oid *) palloc(sizeof(Oid) * ncols);
-	indexRelation->rd_exclstrats = (uint16 *) palloc(sizeof(uint16) * ncols);
-	memcpy(indexRelation->rd_exclops, ops, sizeof(Oid) * ncols);
-	memcpy(indexRelation->rd_exclprocs, funcs, sizeof(Oid) * ncols);
-	memcpy(indexRelation->rd_exclstrats, strats, sizeof(uint16) * ncols);
+	indexRelation->rd_exclops = (Oid *) palloc(sizeof(Oid) * nkeycols);
+	indexRelation->rd_exclprocs = (Oid *) palloc(sizeof(Oid) * nkeycols);
+	indexRelation->rd_exclstrats = (uint16 *) palloc(sizeof(uint16) * nkeycols);
+	memcpy(indexRelation->rd_exclops, ops, sizeof(Oid) * nkeycols);
+	memcpy(indexRelation->rd_exclprocs, funcs, sizeof(Oid) * nkeycols);
+	memcpy(indexRelation->rd_exclstrats, strats, sizeof(uint16) * nkeycols);
 	MemoryContextSwitchTo(oldcxt);
 }
 
diff --git a/src/backend/utils/sort/tuplesort.c b/src/backend/utils/sort/tuplesort.c
index a30e170..bc7a17b 100644
--- a/src/backend/utils/sort/tuplesort.c
+++ b/src/backend/utils/sort/tuplesort.c
@@ -813,7 +813,7 @@ tuplesort_begin_index_btree(Relation heapRel,
 	state->enforceUnique = enforceUnique;
 
 	indexScanKey = _bt_mkscankey_nodata(indexRel);
-	state->nKeys = RelationGetNumberOfAttributes(indexRel);
+	state->nKeys = IndexRelationGetNumberOfKeyAttributes(indexRel);
 
 	/* Prepare SortSupport data for each column */
 	state->sortKeys = (SortSupport) palloc0(state->nKeys *
diff --git a/src/include/access/amapi.h b/src/include/access/amapi.h
index 35f1061..17e652c 100644
--- a/src/include/access/amapi.h
+++ b/src/include/access/amapi.h
@@ -142,6 +142,8 @@ typedef struct IndexAmRoutine
 	bool		amclusterable;
 	/* does AM handle predicate locks? */
 	bool		ampredlocks;
+	/* does AM support columns included with clause INCLUDING? */
+	bool		amcaninclude;
 	/* type of data stored in index, or InvalidOid if variable */
 	Oid			amkeytype;
 
diff --git a/src/include/access/itup.h b/src/include/access/itup.h
index 8350fa0..a3191f6 100644
--- a/src/include/access/itup.h
+++ b/src/include/access/itup.h
@@ -18,6 +18,7 @@
 #include "access/tupmacs.h"
 #include "storage/bufpage.h"
 #include "storage/itemptr.h"
+#include "utils/rel.h"
 
 /*
  * Index tuple header structure
@@ -147,5 +148,7 @@ extern Datum nocache_index_getattr(IndexTuple tup, int attnum,
 extern void index_deform_tuple(IndexTuple tup, TupleDesc tupleDescriptor,
 				   Datum *values, bool *isnull);
 extern IndexTuple CopyIndexTuple(IndexTuple source);
+extern IndexTuple index_reform_tuple(Relation idxrel, IndexTuple tup,
+									 int natts, int nkeyatts);
 
 #endif   /* ITUP_H */
diff --git a/src/include/catalog/pg_index.h b/src/include/catalog/pg_index.h
index ee97c5d..fcbd18a 100644
--- a/src/include/catalog/pg_index.h
+++ b/src/include/catalog/pg_index.h
@@ -32,7 +32,8 @@ CATALOG(pg_index,2610) BKI_WITHOUT_OIDS BKI_SCHEMA_MACRO
 {
 	Oid			indexrelid;		/* OID of the index */
 	Oid			indrelid;		/* OID of the relation it indexes */
-	int16		indnatts;		/* number of columns in index */
+	int16		indnatts;		/* total number of columns in index */
+	int16		indnkeyatts;	/* number of key columns in index */
 	bool		indisunique;	/* is this a unique index? */
 	bool		indisprimary;	/* is this index for primary key? */
 	bool		indisexclusion; /* is this index for exclusion constraint? */
@@ -70,26 +71,27 @@ typedef FormData_pg_index *Form_pg_index;
  *		compiler constants for pg_index
  * ----------------
  */
-#define Natts_pg_index					19
+#define Natts_pg_index					20
 #define Anum_pg_index_indexrelid		1
 #define Anum_pg_index_indrelid			2
 #define Anum_pg_index_indnatts			3
-#define Anum_pg_index_indisunique		4
-#define Anum_pg_index_indisprimary		5
-#define Anum_pg_index_indisexclusion	6
-#define Anum_pg_index_indimmediate		7
-#define Anum_pg_index_indisclustered	8
-#define Anum_pg_index_indisvalid		9
-#define Anum_pg_index_indcheckxmin		10
-#define Anum_pg_index_indisready		11
-#define Anum_pg_index_indislive			12
-#define Anum_pg_index_indisreplident	13
-#define Anum_pg_index_indkey			14
-#define Anum_pg_index_indcollation		15
-#define Anum_pg_index_indclass			16
-#define Anum_pg_index_indoption			17
-#define Anum_pg_index_indexprs			18
-#define Anum_pg_index_indpred			19
+#define Anum_pg_index_indnkeyatts		4
+#define Anum_pg_index_indisunique		5
+#define Anum_pg_index_indisprimary		6
+#define Anum_pg_index_indisexclusion	7
+#define Anum_pg_index_indimmediate		8
+#define Anum_pg_index_indisclustered	9
+#define Anum_pg_index_indisvalid		10
+#define Anum_pg_index_indcheckxmin		11
+#define Anum_pg_index_indisready		12
+#define Anum_pg_index_indislive			13
+#define Anum_pg_index_indisreplident	14
+#define Anum_pg_index_indkey			15
+#define Anum_pg_index_indcollation		16
+#define Anum_pg_index_indclass			17
+#define Anum_pg_index_indoption			18
+#define Anum_pg_index_indexprs			19
+#define Anum_pg_index_indpred			20
 
 /*
  * Index AMs that support ordered scans must support these two indoption
diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h
index 07cd20a..251b977 100644
--- a/src/include/nodes/execnodes.h
+++ b/src/include/nodes/execnodes.h
@@ -33,9 +33,11 @@
  *		entries for a particular index.  Used for both index_build and
  *		retail creation of index entries.
  *
- *		NumIndexAttrs		number of columns in this index
+ *		NumIndexAttrs		total number of columns in this index
+ *		NumIndexKeyAttrs	number of key columns in index
  *		KeyAttrNumbers		underlying-rel attribute numbers used as keys
- *							(zeroes indicate expressions)
+ *							(zeroes indicate expressions). It also contains
+ * 							info about included columns.
  *		Expressions			expr trees for expression entries, or NIL if none
  *		ExpressionsState	exec state for expressions, or NIL if none
  *		Predicate			partial-index predicate, or NIL if none
@@ -58,7 +60,8 @@
 typedef struct IndexInfo
 {
 	NodeTag		type;
-	int			ii_NumIndexAttrs;
+	int			ii_NumIndexAttrs; /* total number of columns in index */
+	int			ii_NumIndexKeyAttrs; /* number of key columns in index */
 	AttrNumber	ii_KeyAttrNumbers[INDEX_MAX_KEYS];
 	List	   *ii_Expressions; /* list of Expr */
 	List	   *ii_ExpressionsState;	/* list of ExprState */
diff --git a/src/include/nodes/parsenodes.h b/src/include/nodes/parsenodes.h
index 2fd0629..d30b254 100644
--- a/src/include/nodes/parsenodes.h
+++ b/src/include/nodes/parsenodes.h
@@ -2426,6 +2426,8 @@ typedef struct IndexStmt
 	char	   *accessMethod;	/* name of access method (eg. btree) */
 	char	   *tableSpace;		/* tablespace, or NULL for default */
 	List	   *indexParams;	/* columns to index: a list of IndexElem */
+	List	   *indexIncludingParams;	/* additional columns to index:
+										 * a list of IndexElem */
 	List	   *options;		/* WITH clause options: a list of DefElem */
 	Node	   *whereClause;	/* qualification (partial-index predicate) */
 	List	   *excludeOpNames; /* exclusion operator names, or NIL if none */
diff --git a/src/include/nodes/relation.h b/src/include/nodes/relation.h
index b233b62..9ed6f2b 100644
--- a/src/include/nodes/relation.h
+++ b/src/include/nodes/relation.h
@@ -507,10 +507,11 @@ typedef struct RelOptInfo
  * IndexOptInfo
  *		Per-index information for planning/optimization
  *
- *		indexkeys[], indexcollations[], opfamily[], and opcintype[]
- *		each have ncolumns entries.
+ *		indexkeys[], indexcollations[] each have ncolumns entries.
+ *		opfamily[], and opcintype[]	each have nkeycolumns entries. They do
+ *		not contain any information about included attributes.
  *
- *		sortopfamily[], reverse_sort[], and nulls_first[] likewise have
+ *		sortopfamily[], reverse_sort[], and nulls_first[] have
  *		ncolumns entries, if the index is ordered; but if it is unordered,
  *		those pointers are NULL.
  *
@@ -544,7 +545,9 @@ typedef struct IndexOptInfo
 
 	/* index descriptor information */
 	int			ncolumns;		/* number of columns in index */
-	int		   *indexkeys;		/* column numbers of index's keys, or 0 */
+	int			nkeycolumns;	/* number of key columns in index */
+	int		   *indexkeys;		/* column numbers of index's attributes
+								 * both key and included columns, or 0 */
 	Oid		   *indexcollations;	/* OIDs of collations of index columns */
 	Oid		   *opfamily;		/* OIDs of operator families for columns */
 	Oid		   *opcintype;		/* OIDs of opclass declared input data types */
diff --git a/src/include/utils/rel.h b/src/include/utils/rel.h
index f2bebf2..470d789 100644
--- a/src/include/utils/rel.h
+++ b/src/include/utils/rel.h
@@ -329,11 +329,18 @@ typedef struct ViewOptions
 
 /*
  * RelationGetNumberOfAttributes
- *		Returns the number of attributes in a relation.
+ *		Returns the total number of attributes in a relation.
  */
 #define RelationGetNumberOfAttributes(relation) ((relation)->rd_rel->relnatts)
 
 /*
+ * IndexRelationGetNumberOfKeyAttributes
+ *		Returns the number of key attributes in an index.
+ */
+#define IndexRelationGetNumberOfKeyAttributes(relation) \
+		((relation)->rd_index->indnkeyatts)
+
+/*
  * RelationGetDescr
  *		Returns tuple descriptor for a relation.
  */
diff --git a/src/test/regress/expected/create_index.out b/src/test/regress/expected/create_index.out
index b72e65d..8ea9e67 100644
--- a/src/test/regress/expected/create_index.out
+++ b/src/test/regress/expected/create_index.out
@@ -2376,6 +2376,22 @@ DETAIL:  Key ((f1 || f2))=(ABCDEF) already exists.
 -- but this shouldn't:
 INSERT INTO func_index_heap VALUES('QWERTY');
 --
+-- Test unique index with included columns
+--
+CREATE TABLE covering_index_heap (f1 int, f2 int, f3 text);
+CREATE UNIQUE INDEX covering_index_index on covering_index_heap (f1,f2) INCLUDING(f3);
+INSERT INTO covering_index_heap VALUES(1,1,'AAA');
+INSERT INTO covering_index_heap VALUES(1,2,'AAA');
+-- this should fail because of unique index on f1,f2:
+INSERT INTO covering_index_heap VALUES(1,2,'BBB');
+ERROR:  duplicate key value violates unique constraint "covering_index_index"
+DETAIL:  Key (f1, f2, f3 (included))=(1, 2, BBB) already exists.
+-- and this shouldn't:
+INSERT INTO covering_index_heap VALUES(1,4,'AAA');
+-- Try to build index on table that already contains data
+CREATE UNIQUE INDEX covering_index_index2 on covering_index_heap (f1,f2) INCLUDING(f3);
+DROP TABLE covering_index_heap;
+--
 -- Also try building functional, expressional, and partial indexes on
 -- tables that already contain data.
 --
diff --git a/src/test/regress/sql/create_index.sql b/src/test/regress/sql/create_index.sql
index ff86953..42d4881 100644
--- a/src/test/regress/sql/create_index.sql
+++ b/src/test/regress/sql/create_index.sql
@@ -722,6 +722,22 @@ INSERT INTO func_index_heap VALUES('ABCD', 'EF');
 INSERT INTO func_index_heap VALUES('QWERTY');
 
 --
+-- Test unique index with included columns
+--
+CREATE TABLE covering_index_heap (f1 int, f2 int, f3 text);
+CREATE UNIQUE INDEX covering_index_index on covering_index_heap (f1,f2) INCLUDING(f3);
+
+INSERT INTO covering_index_heap VALUES(1,1,'AAA');
+INSERT INTO covering_index_heap VALUES(1,2,'AAA');
+-- this should fail because of unique index on f1,f2:
+INSERT INTO covering_index_heap VALUES(1,2,'BBB');
+-- and this shouldn't:
+INSERT INTO covering_index_heap VALUES(1,4,'AAA');
+-- Try to build index on table that already contains data
+CREATE UNIQUE INDEX covering_index_index2 on covering_index_heap (f1,f2) INCLUDING(f3);
+DROP TABLE covering_index_heap;
+
+--
 -- Also try building functional, expressional, and partial indexes on
 -- tables that already contain data.
 --
-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to