01.03.2016 19:55, Anastasia Lubennikova:

29.02.2016 18:17, Anastasia Lubennikova:
25.02.2016 21:39, Jeff Janes:
As promised, here's the new version of the patch "including_columns_4.0".
I fixed all issues except some points mentioned below.
Thanks for the update patch.  I get a compiler warning:

genam.c: In function 'BuildIndexValueDescription':
genam.c:259: warning: unused variable 'tupdesc'

Thank you for the notice, I'll fix it in the next update.
Also, I can't create a primary key INCLUDING columns directly:

jjanes=# create table foobar (a int, b int, c int);
jjanes=# alter table foobar add constraint foobar_pkey primary key
(a,b) including (c);
ERROR:  syntax error at or near "including"

But I can get there using a circuitous route:

jjanes=# create unique index on foobar (a,b) including (c);
jjanes=# alter table foobar add constraint foobar_pkey primary key
using index foobar_a_b_c_idx;

The description of the table's index knows to include the including column:

jjanes=# \d foobar
     Table "public.foobar"
  Column |  Type   | Modifiers
--------+---------+-----------
  a      | integer | not null
  b      | integer | not null
  c      | integer |
Indexes:
     "foobar_pkey" PRIMARY KEY, btree (a, b) INCLUDING (c)


Since the machinery appears to all be in place to have primary keys
with INCLUDING columns, it would be nice if the syntax for adding
primary keys allowed one to implement them directly.

Is this something or future expansion, or could it be added at the
same time as the main patch?

Good point.
At quick glance, this looks easy to implement it. The only problem is that there are too many places in code which must be updated. I'll try to do it, and if there would be difficulties, it's fine with me to delay this feature for the future work.

I found one more thing to do. Pgdump does not handle included columns now. I will fix it in the next version of the patch.


As promised, fixed patch is in attachments. It allows to perform following statements:

create table utbl (a int, b box);
alter table utbl add unique (a) including(b);
create table ptbl (a int, b box);
alter table ptbl add primary key (a) including(b);

And now they can be dumped/restored successfully.
I used following settings
pg_dump --verbose -Fc postgres -f pg.dump
pg_restore -d newdb pg.dump

It is not the final version, because it breaks pg_dump for previous versions. I need some help from hackers here.
pgdump. line 5466
if (fout->remoteVersion >= 90400)

What does 'remoteVersion' mean? And what is the right way to change it? Or it changes between releases? I guess that 90400 is for 9.4 and 80200 is for 8.2 but is it really so? That is totally new to me. BTW, While we are on the subject, maybe it's worth to replace these magic numbers with some set of macro?

P.S. I'll update documentation for ALTER TABLE in the next patch.

Sorry for missed attachment. Now it's here.

--
Anastasia Lubennikova
Postgres Professional: http://www.postgrespro.com
The Russian Postgres Company

diff --git a/contrib/dblink/dblink.c b/contrib/dblink/dblink.c
index 9c8e308..891325d 100644
--- a/contrib/dblink/dblink.c
+++ b/contrib/dblink/dblink.c
@@ -100,7 +100,7 @@ static remoteConn *getConnectionByName(const char *name);
 static HTAB *createConnHash(void);
 static void createNewConnection(const char *name, remoteConn *rconn);
 static void deleteConnection(const char *name);
-static char **get_pkey_attnames(Relation rel, int16 *numatts);
+static char **get_pkey_attnames(Relation rel, int16 *indnkeyatts);
 static char **get_text_array_contents(ArrayType *array, int *numitems);
 static char *get_sql_insert(Relation rel, int *pkattnums, int pknumatts, char **src_pkattvals, char **tgt_pkattvals);
 static char *get_sql_delete(Relation rel, int *pkattnums, int pknumatts, char **tgt_pkattvals);
@@ -1485,7 +1485,7 @@ PG_FUNCTION_INFO_V1(dblink_get_pkey);
 Datum
 dblink_get_pkey(PG_FUNCTION_ARGS)
 {
-	int16		numatts;
+	int16		indnkeyatts;
 	char	  **results;
 	FuncCallContext *funcctx;
 	int32		call_cntr;
@@ -1511,7 +1511,7 @@ dblink_get_pkey(PG_FUNCTION_ARGS)
 		rel = get_rel_from_relname(PG_GETARG_TEXT_P(0), AccessShareLock, ACL_SELECT);
 
 		/* get the array of attnums */
-		results = get_pkey_attnames(rel, &numatts);
+		results = get_pkey_attnames(rel, &indnkeyatts);
 
 		relation_close(rel, AccessShareLock);
 
@@ -1531,9 +1531,9 @@ dblink_get_pkey(PG_FUNCTION_ARGS)
 		attinmeta = TupleDescGetAttInMetadata(tupdesc);
 		funcctx->attinmeta = attinmeta;
 
-		if ((results != NULL) && (numatts > 0))
+		if ((results != NULL) && (indnkeyatts > 0))
 		{
-			funcctx->max_calls = numatts;
+			funcctx->max_calls = indnkeyatts;
 
 			/* got results, keep track of them */
 			funcctx->user_fctx = results;
@@ -2023,10 +2023,10 @@ dblink_fdw_validator(PG_FUNCTION_ARGS)
  * get_pkey_attnames
  *
  * Get the primary key attnames for the given relation.
- * Return NULL, and set numatts = 0, if no primary key exists.
+ * Return NULL, and set indnkeyatts = 0, if no primary key exists.
  */
 static char **
-get_pkey_attnames(Relation rel, int16 *numatts)
+get_pkey_attnames(Relation rel, int16 *indnkeyatts)
 {
 	Relation	indexRelation;
 	ScanKeyData skey;
@@ -2036,8 +2036,8 @@ get_pkey_attnames(Relation rel, int16 *numatts)
 	char	  **result = NULL;
 	TupleDesc	tupdesc;
 
-	/* initialize numatts to 0 in case no primary key exists */
-	*numatts = 0;
+	/* initialize indnkeyatts to 0 in case no primary key exists */
+	*indnkeyatts = 0;
 
 	tupdesc = rel->rd_att;
 
@@ -2058,12 +2058,12 @@ get_pkey_attnames(Relation rel, int16 *numatts)
 		/* we're only interested if it is the primary key */
 		if (index->indisprimary)
 		{
-			*numatts = index->indnatts;
-			if (*numatts > 0)
+			*indnkeyatts = index->indnkeyatts;
+			if (*indnkeyatts > 0)
 			{
-				result = (char **) palloc(*numatts * sizeof(char *));
+				result = (char **) palloc(*indnkeyatts * sizeof(char *));
 
-				for (i = 0; i < *numatts; i++)
+				for (i = 0; i < *indnkeyatts; i++)
 					result[i] = SPI_fname(tupdesc, index->indkey.values[i]);
 			}
 			break;
diff --git a/contrib/tcn/tcn.c b/contrib/tcn/tcn.c
index 7352b29..142730a 100644
--- a/contrib/tcn/tcn.c
+++ b/contrib/tcn/tcn.c
@@ -138,9 +138,9 @@ triggered_change_notification(PG_FUNCTION_ARGS)
 		/* we're only interested if it is the primary key and valid */
 		if (index->indisprimary && IndexIsValid(index))
 		{
-			int			numatts = index->indnatts;
+			int			indnkeyatts = index->indnkeyatts;
 
-			if (numatts > 0)
+			if (indnkeyatts > 0)
 			{
 				int			i;
 
@@ -150,7 +150,7 @@ triggered_change_notification(PG_FUNCTION_ARGS)
 				appendStringInfoCharMacro(payload, ',');
 				appendStringInfoCharMacro(payload, operation);
 
-				for (i = 0; i < numatts; i++)
+				for (i = 0; i < indnkeyatts; i++)
 				{
 					int			colno = index->indkey.values[i];
 
diff --git a/doc/src/sgml/catalogs.sgml b/doc/src/sgml/catalogs.sgml
index 412c845..c201f8b 100644
--- a/doc/src/sgml/catalogs.sgml
+++ b/doc/src/sgml/catalogs.sgml
@@ -3515,6 +3515,14 @@
       <literal>pg_class.relnatts</literal>)</entry>
      </row>
 
+      <row>
+      <entry><structfield>indnkeyatts</structfield></entry>
+      <entry><type>int2</type></entry>
+      <entry></entry>
+      <entry>The number of key columns in the index. "Key columns" are ordinary
+      index columns in contrast with "included" columns.</entry>
+     </row>
+
      <row>
       <entry><structfield>indisunique</structfield></entry>
       <entry><type>bool</type></entry>
diff --git a/doc/src/sgml/indexam.sgml b/doc/src/sgml/indexam.sgml
index 5f7befb..aaed5a7 100644
--- a/doc/src/sgml/indexam.sgml
+++ b/doc/src/sgml/indexam.sgml
@@ -111,6 +111,8 @@ typedef struct IndexAmRoutine
     bool        amclusterable;
     /* does AM handle predicate locks? */
     bool        ampredlocks;
+    /* does AM support columns included with clause INCLUDING? */
+    bool    amcaninclude;
     /* type of data stored in index, or InvalidOid if variable */
     Oid         amkeytype;
 
@@ -852,7 +854,8 @@ amrestrpos (IndexScanDesc scan);
    using <firstterm>unique indexes</>, which are indexes that disallow
    multiple entries with identical keys.  An access method that supports this
    feature sets <structfield>amcanunique</> true.
-   (At present, only b-tree supports it.)
+   (At present, only b-tree supports it.) Columns which are present in the
+   <literal>INCLUDING</> clause are not used to enforce uniqueness.
   </para>
 
   <para>
diff --git a/doc/src/sgml/indices.sgml b/doc/src/sgml/indices.sgml
index 23bbec6..09d4e6b 100644
--- a/doc/src/sgml/indices.sgml
+++ b/doc/src/sgml/indices.sgml
@@ -633,7 +633,8 @@ CREATE INDEX test3_desc_index ON test3 (id DESC NULLS LAST);
    Indexes can also be used to enforce uniqueness of a column's value,
    or the uniqueness of the combined values of more than one column.
 <synopsis>
-CREATE UNIQUE INDEX <replaceable>name</replaceable> ON <replaceable>table</replaceable> (<replaceable>column</replaceable> <optional>, ...</optional>);
+CREATE UNIQUE INDEX <replaceable>name</replaceable> ON <replaceable>table</replaceable> (<replaceable>column</replaceable> <optional>, ...</optional>)
+<optional>INCLUDING (<replaceable>column</replaceable> <optional>, ...</optional>)</optional>;
 </synopsis>
    Currently, only B-tree indexes can be declared unique.
   </para>
@@ -642,7 +643,9 @@ CREATE UNIQUE INDEX <replaceable>name</replaceable> ON <replaceable>table</repla
    When an index is declared unique, multiple table rows with equal
    indexed values are not allowed.  Null values are not considered
    equal.  A multicolumn unique index will only reject cases where all
-   indexed columns are equal in multiple rows.
+   indexed columns are equal in multiple rows. Columns included with clause
+   <literal>INCLUDING</literal> aren't used to enforce constraints (UNIQUE,
+   PRIMARY KEY, etc).
   </para>
 
   <para>
diff --git a/doc/src/sgml/ref/create_index.sgml b/doc/src/sgml/ref/create_index.sgml
index ec4146f..432181d 100644
--- a/doc/src/sgml/ref/create_index.sgml
+++ b/doc/src/sgml/ref/create_index.sgml
@@ -23,6 +23,7 @@ PostgreSQL documentation
 <synopsis>
 CREATE [ UNIQUE ] INDEX [ CONCURRENTLY ] [ [ IF NOT EXISTS ] <replaceable class="parameter">name</replaceable> ] ON <replaceable class="parameter">table_name</replaceable> [ USING <replaceable class="parameter">method</replaceable> ]
     ( { <replaceable class="parameter">column_name</replaceable> | ( <replaceable class="parameter">expression</replaceable> ) } [ COLLATE <replaceable class="parameter">collation</replaceable> ] [ <replaceable class="parameter">opclass</replaceable> ] [ ASC | DESC ] [ NULLS { FIRST | LAST } ] [, ...] )
+    [ INCLUDING ( { <replaceable class="parameter">column_name</replaceable> | ( <replaceable class="parameter">expression</replaceable> ) } [ COLLATE <replaceable class="parameter">collation</replaceable> ] [ <replaceable class="parameter">opclass</replaceable> ] [ ASC | DESC ] [ NULLS { FIRST | LAST } ] [, ...] )
     [ WITH ( <replaceable class="PARAMETER">storage_parameter</replaceable> = <replaceable class="PARAMETER">value</replaceable> [, ... ] ) ]
     [ TABLESPACE <replaceable class="parameter">tablespace_name</replaceable> ]
     [ WHERE <replaceable class="parameter">predicate</replaceable> ]
@@ -139,6 +140,32 @@ CREATE [ UNIQUE ] INDEX [ CONCURRENTLY ] [ [ IF NOT EXISTS ] <replaceable class=
      </varlistentry>
 
      <varlistentry>
+      <term><literal>INCLUDING</literal></term>
+      <listitem>
+       <para>
+        An optional <literal>INCLUDING</> clause allows a list of columns to be
+        specified which will be included in the index, in the non-key portion of
+        the index. Columns which are part of this clause cannot also exist in the
+        key columns portion of the index, and vice versa. The
+        <literal>INCLUDING</> columns exist solely to allow more queries to benefit
+        from <firstterm>index-only scans</> by including certain columns in the
+        index, the value of which would otherwise have to be obtained by reading
+        the table's heap. Having these columns in the <literal>INCLUDING</> clause
+        in some cases allows <productname>PostgreSQL</> to skip the heap read
+        completely. This also allows <literal>UNIQUE</> indexes to be defined on
+        one set of columns, which can include another set of column in the
+        <literal>INCLUDING</> clause, on which the uniqueness is not enforced upon.
+        It's the same with other constraints (PRIMARY KEY and EXCLUDE). This can
+        also can be used for non-unique indexes as any columns which are not required
+        for the searching or ordering of records can be included in the
+        <literal>INCLUDING</> clause, which can slightly reduce the size of the index,
+        due to storing included attributes only in leaf index pages.
+        Currently, only the B-tree access method supports this feature.
+       </para>
+      </listitem>
+     </varlistentry>
+
+     <varlistentry>
       <term><replaceable class="parameter">name</replaceable></term>
       <listitem>
        <para>
@@ -596,7 +623,7 @@ Indexes:
   <title>Examples</title>
 
   <para>
-   To create a B-tree index on the column <literal>title</literal> in
+   To create a unique B-tree index on the column <literal>title</literal> in
    the table <literal>films</literal>:
 <programlisting>
 CREATE UNIQUE INDEX title_idx ON films (title);
@@ -604,6 +631,15 @@ CREATE UNIQUE INDEX title_idx ON films (title);
   </para>
 
   <para>
+   To create a unique B-tree index on the column <literal>title</literal>
+   and included columns <literal>director</literal> and <literal>rating</literal>
+   in the table <literal>films</literal>:
+<programlisting>
+CREATE UNIQUE INDEX title_idx ON films (title) INCLUDING (director, rating);
+</programlisting>
+  </para>
+
+  <para>
    To create an index on the expression <literal>lower(title)</>,
    allowing efficient case-insensitive searches:
 <programlisting>
diff --git a/src/backend/access/brin/brin.c b/src/backend/access/brin/brin.c
index c740952..c68df18 100644
--- a/src/backend/access/brin/brin.c
+++ b/src/backend/access/brin/brin.c
@@ -92,6 +92,7 @@ brinhandler(PG_FUNCTION_ARGS)
 	amroutine->amstorage = true;
 	amroutine->amclusterable = false;
 	amroutine->ampredlocks = false;
+	amroutine->amcaninclude = false;
 	amroutine->amkeytype = InvalidOid;
 
 	amroutine->ambuild = brinbuild;
diff --git a/src/backend/access/common/indextuple.c b/src/backend/access/common/indextuple.c
index 274a6c2..f26ee20 100644
--- a/src/backend/access/common/indextuple.c
+++ b/src/backend/access/common/indextuple.c
@@ -19,6 +19,7 @@
 #include "access/heapam.h"
 #include "access/itup.h"
 #include "access/tuptoaster.h"
+#include "utils/rel.h"
 
 
 /* ----------------------------------------------------------------
@@ -441,3 +442,30 @@ CopyIndexTuple(IndexTuple source)
 	memcpy(result, source, size);
 	return result;
 }
+
+/*
+ * Reform index tuple. Truncate nonkey (INCLUDING) attributes.
+ */
+IndexTuple
+index_reform_tuple(Relation idxrel, IndexTuple olditup, int indnatts, int indnkeyatts)
+{
+	TupleDesc   itupdesc = RelationGetDescr(idxrel);
+	Datum       values[INDEX_MAX_KEYS];
+	bool        isnull[INDEX_MAX_KEYS];
+	IndexTuple	newitup;
+
+	Assert(indnatts <= INDEX_MAX_KEYS);
+	Assert(indnkeyatts > 0);
+	Assert(indnkeyatts <= indnatts);
+
+	index_deform_tuple(olditup, itupdesc, values, isnull);
+
+	/* form new tuple that will contain only key attributes */
+	itupdesc->natts = indnkeyatts;
+	newitup = index_form_tuple(itupdesc, values, isnull);
+	newitup->t_tid = olditup->t_tid;
+
+	itupdesc->natts = indnatts;
+
+	return newitup;
+}
diff --git a/src/backend/access/gin/ginutil.c b/src/backend/access/gin/ginutil.c
index 9450267..b4c69e7 100644
--- a/src/backend/access/gin/ginutil.c
+++ b/src/backend/access/gin/ginutil.c
@@ -47,6 +47,7 @@ ginhandler(PG_FUNCTION_ARGS)
 	amroutine->amstorage = true;
 	amroutine->amclusterable = false;
 	amroutine->ampredlocks = false;
+	amroutine->amcaninclude = false;
 	amroutine->amkeytype = InvalidOid;
 
 	amroutine->ambuild = ginbuild;
diff --git a/src/backend/access/gist/gist.c b/src/backend/access/gist/gist.c
index 996363c..64cc8df 100644
--- a/src/backend/access/gist/gist.c
+++ b/src/backend/access/gist/gist.c
@@ -69,6 +69,7 @@ gisthandler(PG_FUNCTION_ARGS)
 	amroutine->amstorage = true;
 	amroutine->amclusterable = true;
 	amroutine->ampredlocks = false;
+	amroutine->amcaninclude = false;
 	amroutine->amkeytype = InvalidOid;
 
 	amroutine->ambuild = gistbuild;
diff --git a/src/backend/access/hash/hash.c b/src/backend/access/hash/hash.c
index 3d48c4f..6d8d68d 100644
--- a/src/backend/access/hash/hash.c
+++ b/src/backend/access/hash/hash.c
@@ -64,6 +64,7 @@ hashhandler(PG_FUNCTION_ARGS)
 	amroutine->amstorage = false;
 	amroutine->amclusterable = false;
 	amroutine->ampredlocks = false;
+	amroutine->amcaninclude = false;
 	amroutine->amkeytype = INT4OID;
 
 	amroutine->ambuild = hashbuild;
diff --git a/src/backend/access/index/genam.c b/src/backend/access/index/genam.c
index 65c941d..de57814 100644
--- a/src/backend/access/index/genam.c
+++ b/src/backend/access/index/genam.c
@@ -174,13 +174,15 @@ BuildIndexValueDescription(Relation indexRelation,
 	StringInfoData buf;
 	Form_pg_index idxrec;
 	HeapTuple	ht_idx;
-	int			natts = indexRelation->rd_rel->relnatts;
+	int			indnkeyatts;
 	int			i;
 	int			keyno;
 	Oid			indexrelid = RelationGetRelid(indexRelation);
 	Oid			indrelid;
 	AclResult	aclresult;
 
+	indnkeyatts = IndexRelationGetNumberOfKeyAttributes(indexRelation);
+
 	/*
 	 * Check permissions- if the user does not have access to view all of the
 	 * key columns then return NULL to avoid leaking data.
@@ -218,7 +220,7 @@ BuildIndexValueDescription(Relation indexRelation,
 		 * No table-level access, so step through the columns in the index and
 		 * make sure the user has SELECT rights on all of them.
 		 */
-		for (keyno = 0; keyno < idxrec->indnatts; keyno++)
+		for (keyno = 0; keyno < idxrec->indnkeyatts; keyno++)
 		{
 			AttrNumber	attnum = idxrec->indkey.values[keyno];
 
@@ -244,7 +246,7 @@ BuildIndexValueDescription(Relation indexRelation,
 	appendStringInfo(&buf, "(%s)=(",
 					 pg_get_indexdef_columns(indexrelid, true));
 
-	for (i = 0; i < natts; i++)
+	for (i = 0; i < indnkeyatts; i++)
 	{
 		char	   *val;
 
@@ -362,7 +364,7 @@ systable_beginscan(Relation heapRelation,
 		{
 			int			j;
 
-			for (j = 0; j < irel->rd_index->indnatts; j++)
+			for (j = 0; j < IndexRelationGetNumberOfAttributes(irel); j++)
 			{
 				if (key[i].sk_attno == irel->rd_index->indkey.values[j])
 				{
@@ -370,7 +372,7 @@ systable_beginscan(Relation heapRelation,
 					break;
 				}
 			}
-			if (j == irel->rd_index->indnatts)
+			if (j == IndexRelationGetNumberOfAttributes(irel))
 				elog(ERROR, "column is not in index");
 		}
 
@@ -564,7 +566,7 @@ systable_beginscan_ordered(Relation heapRelation,
 	{
 		int			j;
 
-		for (j = 0; j < indexRelation->rd_index->indnatts; j++)
+		for (j = 0; j < IndexRelationGetNumberOfAttributes(indexRelation); j++)
 		{
 			if (key[i].sk_attno == indexRelation->rd_index->indkey.values[j])
 			{
@@ -572,7 +574,7 @@ systable_beginscan_ordered(Relation heapRelation,
 				break;
 			}
 		}
-		if (j == indexRelation->rd_index->indnatts)
+		if (j == IndexRelationGetNumberOfAttributes(indexRelation))
 			elog(ERROR, "column is not in index");
 	}
 
diff --git a/src/backend/access/nbtree/nbtinsert.c b/src/backend/access/nbtree/nbtinsert.c
index e3c55eb..9c9bdb3 100644
--- a/src/backend/access/nbtree/nbtinsert.c
+++ b/src/backend/access/nbtree/nbtinsert.c
@@ -108,18 +108,22 @@ _bt_doinsert(Relation rel, IndexTuple itup,
 			 IndexUniqueCheck checkUnique, Relation heapRel)
 {
 	bool		is_unique = false;
-	int			natts = rel->rd_rel->relnatts;
+	int			indnkeyatts;
 	ScanKey		itup_scankey;
 	BTStack		stack;
 	Buffer		buf;
 	OffsetNumber offset;
 
+	Assert(IndexRelationGetNumberOfAttributes(rel) != 0);
+	indnkeyatts = IndexRelationGetNumberOfKeyAttributes(rel);
+	Assert(indnkeyatts != 0);
+
 	/* we need an insertion scan key to do our search, so build one */
 	itup_scankey = _bt_mkscankey(rel, itup);
 
 top:
 	/* find the first page containing this key */
-	stack = _bt_search(rel, natts, itup_scankey, false, &buf, BT_WRITE);
+	stack = _bt_search(rel, indnkeyatts, itup_scankey, false, &buf, BT_WRITE);
 
 	offset = InvalidOffsetNumber;
 
@@ -134,7 +138,7 @@ top:
 	 * move right in the tree.  See Lehman and Yao for an excruciatingly
 	 * precise description.
 	 */
-	buf = _bt_moveright(rel, buf, natts, itup_scankey, false,
+	buf = _bt_moveright(rel, buf, indnkeyatts, itup_scankey, false,
 						true, stack, BT_WRITE);
 
 	/*
@@ -163,7 +167,7 @@ top:
 		TransactionId xwait;
 		uint32		speculativeToken;
 
-		offset = _bt_binsrch(rel, buf, natts, itup_scankey, false);
+		offset = _bt_binsrch(rel, buf, indnkeyatts, itup_scankey, false);
 		xwait = _bt_check_unique(rel, itup, heapRel, buf, offset, itup_scankey,
 								 checkUnique, &is_unique, &speculativeToken);
 
@@ -199,7 +203,7 @@ top:
 		 */
 		CheckForSerializableConflictIn(rel, NULL, buf);
 		/* do the insertion */
-		_bt_findinsertloc(rel, &buf, &offset, natts, itup_scankey, itup,
+		_bt_findinsertloc(rel, &buf, &offset, indnkeyatts, itup_scankey, itup,
 						  stack, heapRel);
 		_bt_insertonpg(rel, buf, InvalidBuffer, stack, itup, offset, false);
 	}
@@ -242,7 +246,7 @@ _bt_check_unique(Relation rel, IndexTuple itup, Relation heapRel,
 				 uint32 *speculativeToken)
 {
 	TupleDesc	itupdesc = RelationGetDescr(rel);
-	int			natts = rel->rd_rel->relnatts;
+	int			indnkeyatts = IndexRelationGetNumberOfKeyAttributes(rel);
 	SnapshotData SnapshotDirty;
 	OffsetNumber maxoff;
 	Page		page;
@@ -301,7 +305,7 @@ _bt_check_unique(Relation rel, IndexTuple itup, Relation heapRel,
 				 * in real comparison, but only for ordering/finding items on
 				 * pages. - vadim 03/24/97
 				 */
-				if (!_bt_isequal(itupdesc, page, offset, natts, itup_scankey))
+				if (!_bt_isequal(itupdesc, page, offset, indnkeyatts, itup_scankey))
 					break;		/* we're past all the equal tuples */
 
 				/* okay, we gotta fetch the heap tuple ... */
@@ -457,7 +461,7 @@ _bt_check_unique(Relation rel, IndexTuple itup, Relation heapRel,
 			if (P_RIGHTMOST(opaque))
 				break;
 			if (!_bt_isequal(itupdesc, page, P_HIKEY,
-							 natts, itup_scankey))
+							 indnkeyatts, itup_scankey))
 				break;
 			/* Advance to next non-dead page --- there must be one */
 			for (;;)
@@ -733,6 +737,8 @@ _bt_insertonpg(Relation rel,
 	BTPageOpaque lpageop;
 	OffsetNumber firstright = InvalidOffsetNumber;
 	Size		itemsz;
+	int 		indnatts,
+				indnkeyatts;
 
 	page = BufferGetPage(buf);
 	lpageop = (BTPageOpaque) PageGetSpecialPointer(page);
@@ -745,6 +751,14 @@ _bt_insertonpg(Relation rel,
 		elog(ERROR, "cannot insert to incompletely split page %u",
 			 BufferGetBlockNumber(buf));
 
+	/* Truncate nonkey attributes when inserting on nonleaf pages. */
+	indnatts = IndexRelationGetNumberOfAttributes(rel);
+	indnkeyatts = IndexRelationGetNumberOfKeyAttributes(rel);
+	if (indnatts != indnkeyatts && !P_ISLEAF(lpageop))
+	{
+		itup = index_reform_tuple(rel, itup, indnatts, indnkeyatts);
+	}
+
 	itemsz = IndexTupleDSize(*itup);
 	itemsz = MAXALIGN(itemsz);	/* be safe, PageAddItem will do this but we
 								 * need to be consistent */
@@ -1928,6 +1942,8 @@ _bt_newroot(Relation rel, Buffer lbuf, Buffer rbuf)
 	Buffer		metabuf;
 	Page		metapg;
 	BTMetaPageData *metad;
+	int 		indnatts,
+				indnkeyatts;
 
 	lbkno = BufferGetBlockNumber(lbuf);
 	rbkno = BufferGetBlockNumber(rbuf);
@@ -1961,7 +1977,17 @@ _bt_newroot(Relation rel, Buffer lbuf, Buffer rbuf)
 	itemid = PageGetItemId(lpage, P_HIKEY);
 	right_item_sz = ItemIdGetLength(itemid);
 	item = (IndexTuple) PageGetItem(lpage, itemid);
-	right_item = CopyIndexTuple(item);
+	indnatts = IndexRelationGetNumberOfAttributes(rel);
+	indnkeyatts = IndexRelationGetNumberOfKeyAttributes(rel);
+
+	if (indnatts != indnkeyatts)
+	{
+		right_item = index_reform_tuple(rel, item, indnatts, indnkeyatts);
+		right_item_sz = IndexTupleDSize(*right_item);
+		right_item_sz = MAXALIGN(right_item_sz);
+	}
+	else
+		right_item = CopyIndexTuple(item);
 	ItemPointerSet(&(right_item->t_tid), rbkno, P_HIKEY);
 
 	/* NO EREPORT(ERROR) from here till newroot op is logged */
diff --git a/src/backend/access/nbtree/nbtpage.c b/src/backend/access/nbtree/nbtpage.c
index 67755d7..6d64a8b 100644
--- a/src/backend/access/nbtree/nbtpage.c
+++ b/src/backend/access/nbtree/nbtpage.c
@@ -1254,8 +1254,9 @@ _bt_pagedel(Relation rel, Buffer buf)
 				/* we need an insertion scan key for the search, so build one */
 				itup_scankey = _bt_mkscankey(rel, targetkey);
 				/* find the leftmost leaf page containing this key */
-				stack = _bt_search(rel, rel->rd_rel->relnatts, itup_scankey,
-								   false, &lbuf, BT_READ);
+				stack = _bt_search(rel,
+								   IndexRelationGetNumberOfKeyAttributes(rel),
+								   itup_scankey, false, &lbuf, BT_READ);
 				/* don't need a pin on the page */
 				_bt_relbuf(rel, lbuf);
 
diff --git a/src/backend/access/nbtree/nbtree.c b/src/backend/access/nbtree/nbtree.c
index f2905cb..24f2b9e 100644
--- a/src/backend/access/nbtree/nbtree.c
+++ b/src/backend/access/nbtree/nbtree.c
@@ -98,6 +98,7 @@ bthandler(PG_FUNCTION_ARGS)
 	amroutine->amstorage = false;
 	amroutine->amclusterable = true;
 	amroutine->ampredlocks = true;
+	amroutine->amcaninclude = true;
 	amroutine->amkeytype = InvalidOid;
 
 	amroutine->ambuild = btbuild;
diff --git a/src/backend/access/nbtree/nbtsort.c b/src/backend/access/nbtree/nbtsort.c
index 99a014e..03c0e63 100644
--- a/src/backend/access/nbtree/nbtsort.c
+++ b/src/backend/access/nbtree/nbtsort.c
@@ -456,7 +456,8 @@ _bt_buildadd(BTWriteState *wstate, BTPageState *state, IndexTuple itup)
 	OffsetNumber last_off;
 	Size		pgspc;
 	Size		itupsz;
-
+	int			indnatts,
+				indnkeyatts;
 	/*
 	 * This is a handy place to check for cancel interrupts during the btree
 	 * load phase of index creation.
@@ -593,6 +594,22 @@ _bt_buildadd(BTWriteState *wstate, BTPageState *state, IndexTuple itup)
 		state->btps_minkey = CopyIndexTuple(itup);
 	}
 
+	/* Truncate nonkey attributes when inserting on nonleaf pages */
+	indnatts = IndexRelationGetNumberOfAttributes(wstate->index);
+	indnkeyatts = IndexRelationGetNumberOfKeyAttributes(wstate->index);
+	if (indnatts != indnkeyatts)
+	{
+		BTPageOpaque pageop = (BTPageOpaque) PageGetSpecialPointer(npage);
+
+		if (!P_ISLEAF(pageop))
+		{
+			itup = index_reform_tuple(wstate->index, itup,
+									  indnatts,indnkeyatts);
+			itupsz = IndexTupleDSize(*itup);
+			itupsz = MAXALIGN(itupsz);
+		}
+	}
+
 	/*
 	 * Add the new item into the current page.
 	 */
@@ -685,7 +702,7 @@ _bt_load(BTWriteState *wstate, BTSpool *btspool, BTSpool *btspool2)
 				load1;
 	TupleDesc	tupdes = RelationGetDescr(wstate->index);
 	int			i,
-				keysz = RelationGetNumberOfAttributes(wstate->index);
+				keysz = IndexRelationGetNumberOfKeyAttributes(wstate->index);
 	ScanKey		indexScanKey = NULL;
 	SortSupport sortKeys;
 
diff --git a/src/backend/access/nbtree/nbtutils.c b/src/backend/access/nbtree/nbtutils.c
index c850b48..cfbce32 100644
--- a/src/backend/access/nbtree/nbtutils.c
+++ b/src/backend/access/nbtree/nbtutils.c
@@ -63,17 +63,26 @@ _bt_mkscankey(Relation rel, IndexTuple itup)
 {
 	ScanKey		skey;
 	TupleDesc	itupdesc;
-	int			natts;
+	int			indnatts,
+				indnkeyatts;
 	int16	   *indoption;
 	int			i;
 
 	itupdesc = RelationGetDescr(rel);
-	natts = RelationGetNumberOfAttributes(rel);
+	indnatts = IndexRelationGetNumberOfAttributes(rel);
+	indnkeyatts = IndexRelationGetNumberOfKeyAttributes(rel);
 	indoption = rel->rd_indoption;
 
-	skey = (ScanKey) palloc(natts * sizeof(ScanKeyData));
+	Assert(indnkeyatts != 0);
+	Assert(indnkeyatts <= indnatts);
 
-	for (i = 0; i < natts; i++)
+	/*
+	 * We'll execute search using ScanKey constructed on key columns.
+	 * Non key (included) columns must be omitted.
+	 */
+	skey = (ScanKey) palloc(indnkeyatts * sizeof(ScanKeyData));
+
+	for (i = 0; i < indnkeyatts; i++)
 	{
 		FmgrInfo   *procinfo;
 		Datum		arg;
@@ -115,16 +124,16 @@ ScanKey
 _bt_mkscankey_nodata(Relation rel)
 {
 	ScanKey		skey;
-	int			natts;
+	int			indnkeyatts;
 	int16	   *indoption;
 	int			i;
 
-	natts = RelationGetNumberOfAttributes(rel);
+	indnkeyatts = IndexRelationGetNumberOfKeyAttributes(rel);
 	indoption = rel->rd_indoption;
 
-	skey = (ScanKey) palloc(natts * sizeof(ScanKeyData));
+	skey = (ScanKey) palloc(indnkeyatts * sizeof(ScanKeyData));
 
-	for (i = 0; i < natts; i++)
+	for (i = 0; i < indnkeyatts; i++)
 	{
 		FmgrInfo   *procinfo;
 		int			flags;
diff --git a/src/backend/bootstrap/bootparse.y b/src/backend/bootstrap/bootparse.y
index 41d2fd4..c47c387 100644
--- a/src/backend/bootstrap/bootparse.y
+++ b/src/backend/bootstrap/bootparse.y
@@ -293,6 +293,7 @@ Boot_DeclareIndexStmt:
 					stmt->accessMethod = $8;
 					stmt->tableSpace = NULL;
 					stmt->indexParams = $10;
+					stmt->indexIncludingParams = NIL;
 					stmt->options = NIL;
 					stmt->whereClause = NULL;
 					stmt->excludeOpNames = NIL;
@@ -336,6 +337,7 @@ Boot_DeclareUniqueIndexStmt:
 					stmt->accessMethod = $9;
 					stmt->tableSpace = NULL;
 					stmt->indexParams = $11;
+					stmt->indexIncludingParams = NIL;
 					stmt->options = NIL;
 					stmt->whereClause = NULL;
 					stmt->excludeOpNames = NIL;
diff --git a/src/backend/bootstrap/bootstrap.c b/src/backend/bootstrap/bootstrap.c
index 6c75099..b9d8a41 100644
--- a/src/backend/bootstrap/bootstrap.c
+++ b/src/backend/bootstrap/bootstrap.c
@@ -591,7 +591,7 @@ boot_openrel(char *relname)
 		 relname, (int) ATTRIBUTE_FIXED_PART_SIZE);
 
 	boot_reldesc = heap_openrv(makeRangeVar(NULL, relname, -1), NoLock);
-	numattr = boot_reldesc->rd_rel->relnatts;
+	numattr = RelationGetNumberOfAttributes(boot_reldesc);
 	for (i = 0; i < numattr; i++)
 	{
 		if (attrtypes[i] == NULL)
diff --git a/src/backend/catalog/index.c b/src/backend/catalog/index.c
index a309c44..3cc9e8e 100644
--- a/src/backend/catalog/index.c
+++ b/src/backend/catalog/index.c
@@ -216,7 +216,7 @@ index_check_primary_key(Relation heapRel,
 	 * null, otherwise attempt to ALTER TABLE .. SET NOT NULL
 	 */
 	cmds = NIL;
-	for (i = 0; i < indexInfo->ii_NumIndexAttrs; i++)
+	for (i = 0; i < indexInfo->ii_NumIndexKeyAttrs; i++)
 	{
 		AttrNumber	attnum = indexInfo->ii_KeyAttrNumbers[i];
 		HeapTuple	atttuple;
@@ -425,6 +425,13 @@ ConstructTupleDescriptor(Relation heapRelation,
 		colnames_item = lnext(colnames_item);
 
 		/*
+		 * Code below is concerned to the opclasses which are not used
+		 * with the included columns.
+		 */
+		if (i >= indexInfo->ii_NumIndexKeyAttrs)
+			continue;
+
+		/*
 		 * Check the opclass and index AM to see if either provides a keytype
 		 * (overriding the attribute type).  Opclass takes precedence.
 		 */
@@ -560,7 +567,7 @@ UpdateIndexRelation(Oid indexoid,
 	for (i = 0; i < indexInfo->ii_NumIndexAttrs; i++)
 		indkey->values[i] = indexInfo->ii_KeyAttrNumbers[i];
 	indcollation = buildoidvector(collationOids, indexInfo->ii_NumIndexAttrs);
-	indclass = buildoidvector(classOids, indexInfo->ii_NumIndexAttrs);
+	indclass = buildoidvector(classOids, indexInfo->ii_NumIndexKeyAttrs);
 	indoption = buildint2vector(coloptions, indexInfo->ii_NumIndexAttrs);
 
 	/*
@@ -605,6 +612,7 @@ UpdateIndexRelation(Oid indexoid,
 	values[Anum_pg_index_indexrelid - 1] = ObjectIdGetDatum(indexoid);
 	values[Anum_pg_index_indrelid - 1] = ObjectIdGetDatum(heapoid);
 	values[Anum_pg_index_indnatts - 1] = Int16GetDatum(indexInfo->ii_NumIndexAttrs);
+	values[Anum_pg_index_indnkeyatts - 1] = Int16GetDatum(indexInfo->ii_NumIndexKeyAttrs);
 	values[Anum_pg_index_indisunique - 1] = BoolGetDatum(indexInfo->ii_Unique);
 	values[Anum_pg_index_indisprimary - 1] = BoolGetDatum(primary);
 	values[Anum_pg_index_indisexclusion - 1] = BoolGetDatum(isexclusion);
@@ -1010,7 +1018,7 @@ index_create(Relation heapRelation,
 		}
 
 		/* Store dependency on operator classes */
-		for (i = 0; i < indexInfo->ii_NumIndexAttrs; i++)
+		for (i = 0; i < indexInfo->ii_NumIndexKeyAttrs; i++)
 		{
 			referenced.classId = OperatorClassRelationId;
 			referenced.objectId = classObjectId[i];
@@ -1068,6 +1076,8 @@ index_create(Relation heapRelation,
 	else
 		Assert(indexRelation->rd_indexcxt != NULL);
 
+	indexRelation->rd_index->indnkeyatts = indexInfo->ii_NumIndexKeyAttrs;
+
 	/*
 	 * If this is bootstrap (initdb) time, then we don't actually fill in the
 	 * index yet.  We'll be creating more indexes and classes later, so we
@@ -1188,7 +1198,7 @@ index_constraint_create(Relation heapRelation,
 								   true,
 								   RelationGetRelid(heapRelation),
 								   indexInfo->ii_KeyAttrNumbers,
-								   indexInfo->ii_NumIndexAttrs,
+								   indexInfo->ii_NumIndexKeyAttrs,
 								   InvalidOid,	/* no domain */
 								   indexRelationId,		/* index OID */
 								   InvalidOid,	/* no foreign key */
@@ -1628,15 +1638,19 @@ BuildIndexInfo(Relation index)
 	IndexInfo  *ii = makeNode(IndexInfo);
 	Form_pg_index indexStruct = index->rd_index;
 	int			i;
-	int			numKeys;
+	int			numAtts;
 
 	/* check the number of keys, and copy attr numbers into the IndexInfo */
-	numKeys = indexStruct->indnatts;
-	if (numKeys < 1 || numKeys > INDEX_MAX_KEYS)
+	numAtts = indexStruct->indnatts;
+	if (numAtts < 1 || numAtts > INDEX_MAX_KEYS)
 		elog(ERROR, "invalid indnatts %d for index %u",
-			 numKeys, RelationGetRelid(index));
-	ii->ii_NumIndexAttrs = numKeys;
-	for (i = 0; i < numKeys; i++)
+			 numAtts, RelationGetRelid(index));
+	ii->ii_NumIndexAttrs = numAtts;
+	ii->ii_NumIndexKeyAttrs = indexStruct->indnkeyatts;
+	Assert(ii->ii_NumIndexKeyAttrs != 0);
+	Assert(ii->ii_NumIndexKeyAttrs <= ii->ii_NumIndexAttrs);
+
+	for (i = 0; i < numAtts; i++)
 		ii->ii_KeyAttrNumbers[i] = indexStruct->indkey.values[i];
 
 	/* fetch any expressions needed for expressional indexes */
@@ -1692,9 +1706,11 @@ BuildIndexInfo(Relation index)
 void
 BuildSpeculativeIndexInfo(Relation index, IndexInfo *ii)
 {
-	int			ncols = index->rd_rel->relnatts;
+	int			indnkeyatts;
 	int			i;
 
+	indnkeyatts = IndexRelationGetNumberOfKeyAttributes(index);
+
 	/*
 	 * fetch info for checking unique indexes
 	 */
@@ -1703,16 +1719,16 @@ BuildSpeculativeIndexInfo(Relation index, IndexInfo *ii)
 	if (index->rd_rel->relam != BTREE_AM_OID)
 		elog(ERROR, "unexpected non-btree speculative unique index");
 
-	ii->ii_UniqueOps = (Oid *) palloc(sizeof(Oid) * ncols);
-	ii->ii_UniqueProcs = (Oid *) palloc(sizeof(Oid) * ncols);
-	ii->ii_UniqueStrats = (uint16 *) palloc(sizeof(uint16) * ncols);
+	ii->ii_UniqueOps = (Oid *) palloc(sizeof(Oid) * indnkeyatts);
+	ii->ii_UniqueProcs = (Oid *) palloc(sizeof(Oid) * indnkeyatts);
+	ii->ii_UniqueStrats = (uint16 *) palloc(sizeof(uint16) * indnkeyatts);
 
 	/*
 	 * We have to look up the operator's strategy number.  This provides a
 	 * cross-check that the operator does match the index.
 	 */
 	/* We need the func OIDs and strategy numbers too */
-	for (i = 0; i < ncols; i++)
+	for (i = 0; i < indnkeyatts; i++)
 	{
 		ii->ii_UniqueStrats[i] = BTEqualStrategyNumber;
 		ii->ii_UniqueOps[i] =
diff --git a/src/backend/catalog/indexing.c b/src/backend/catalog/indexing.c
index b9fe102..6096fa5 100644
--- a/src/backend/catalog/indexing.c
+++ b/src/backend/catalog/indexing.c
@@ -119,6 +119,7 @@ CatalogIndexInsert(CatalogIndexState indstate, HeapTuple heapTuple)
 		Assert(indexInfo->ii_Predicate == NIL);
 		Assert(indexInfo->ii_ExclusionOps == NULL);
 		Assert(relationDescs[i]->rd_index->indimmediate);
+		Assert(indexInfo->ii_NumIndexKeyAttrs != 0);
 
 		/*
 		 * FormIndexDatum fills in its values and isnull parameters with the
diff --git a/src/backend/catalog/toasting.c b/src/backend/catalog/toasting.c
index f40a005..e09c825 100644
--- a/src/backend/catalog/toasting.c
+++ b/src/backend/catalog/toasting.c
@@ -314,6 +314,7 @@ create_toast_table(Relation rel, Oid toastOid, Oid toastIndexOid,
 
 	indexInfo = makeNode(IndexInfo);
 	indexInfo->ii_NumIndexAttrs = 2;
+	indexInfo->ii_NumIndexKeyAttrs = 2;
 	indexInfo->ii_KeyAttrNumbers[0] = 1;
 	indexInfo->ii_KeyAttrNumbers[1] = 2;
 	indexInfo->ii_Expressions = NIL;
diff --git a/src/backend/commands/indexcmds.c b/src/backend/commands/indexcmds.c
index 13b04e6..b8312f2 100644
--- a/src/backend/commands/indexcmds.c
+++ b/src/backend/commands/indexcmds.c
@@ -213,7 +213,7 @@ CheckIndexCompatible(Oid oldId,
 	}
 
 	/* Any change in operator class or collation breaks compatibility. */
-	old_natts = indexForm->indnatts;
+	old_natts = indexForm->indnkeyatts;
 	Assert(old_natts == numberOfAttributes);
 
 	d = SysCacheGetAttr(INDEXRELID, tuple, Anum_pg_index_indcollation, &isnull);
@@ -327,6 +327,7 @@ DefineIndex(Oid relationId,
 	int16	   *coloptions;
 	IndexInfo  *indexInfo;
 	int			numberOfAttributes;
+	int			numberOfKeyAttributes;
 	TransactionId limitXmin;
 	VirtualTransactionId *old_snapshots;
 	ObjectAddress address;
@@ -337,14 +338,27 @@ DefineIndex(Oid relationId,
 	Snapshot	snapshot;
 	int			i;
 
+	if(list_intersection(stmt->indexParams, stmt->indexIncludingParams) != NIL)
+		ereport(ERROR,
+				(errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
+				 errmsg("included columns must not intersect with key columns")));
+	/*
+	 * count key attributes in index
+	 */
+	numberOfKeyAttributes = list_length(stmt->indexParams);
+
 	/*
-	 * count attributes in index
+	 * We append any INCLUDING columns onto the indexParams list so that
+	 * we have one list with all columns. Later we can determine which of these
+	 * are key columns, and which are just part of the INCLUDING list by check
+	 * the list position. A list item in a position less than
+	 * ii_NumIndexKeyAttrs is part of the key columns, and anything equal to
+	 * and over is part of the INCLUDING columns.
 	 */
+	stmt->indexParams = list_concat(stmt->indexParams,
+									stmt->indexIncludingParams);
 	numberOfAttributes = list_length(stmt->indexParams);
-	if (numberOfAttributes <= 0)
-		ereport(ERROR,
-				(errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
-				 errmsg("must specify at least one column")));
+
 	if (numberOfAttributes > INDEX_MAX_KEYS)
 		ereport(ERROR,
 				(errcode(ERRCODE_TOO_MANY_COLUMNS),
@@ -507,6 +521,11 @@ DefineIndex(Oid relationId,
 				(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
 			   errmsg("access method \"%s\" does not support unique indexes",
 					  accessMethodName)));
+	if (list_length(stmt->indexIncludingParams) > 0 && !amRoutine->amcaninclude)
+		ereport(ERROR,
+				(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+				errmsg("access method \"%s\" does not support included columns",
+						accessMethodName)));
 	if (numberOfAttributes > 1 && !amRoutine->amcanmulticol)
 		ereport(ERROR,
 				(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
@@ -544,6 +563,7 @@ DefineIndex(Oid relationId,
 	 */
 	indexInfo = makeNode(IndexInfo);
 	indexInfo->ii_NumIndexAttrs = numberOfAttributes;
+	indexInfo->ii_NumIndexKeyAttrs = numberOfKeyAttributes;
 	indexInfo->ii_Expressions = NIL;	/* for now */
 	indexInfo->ii_ExpressionsState = NIL;
 	indexInfo->ii_Predicate = make_ands_implicit((Expr *) stmt->whereClause);
@@ -559,7 +579,7 @@ DefineIndex(Oid relationId,
 
 	typeObjectId = (Oid *) palloc(numberOfAttributes * sizeof(Oid));
 	collationObjectId = (Oid *) palloc(numberOfAttributes * sizeof(Oid));
-	classObjectId = (Oid *) palloc(numberOfAttributes * sizeof(Oid));
+	classObjectId = (Oid *) palloc(numberOfKeyAttributes * sizeof(Oid));
 	coloptions = (int16 *) palloc(numberOfAttributes * sizeof(int16));
 	ComputeIndexAttrs(indexInfo,
 					  typeObjectId, collationObjectId, classObjectId,
@@ -966,16 +986,15 @@ ComputeIndexAttrs(IndexInfo *indexInfo,
 	ListCell   *nextExclOp;
 	ListCell   *lc;
 	int			attn;
+	int 		nkeycols = indexInfo->ii_NumIndexKeyAttrs;
 
 	/* Allocate space for exclusion operator info, if needed */
 	if (exclusionOpNames)
 	{
-		int			ncols = list_length(attList);
-
-		Assert(list_length(exclusionOpNames) == ncols);
-		indexInfo->ii_ExclusionOps = (Oid *) palloc(sizeof(Oid) * ncols);
-		indexInfo->ii_ExclusionProcs = (Oid *) palloc(sizeof(Oid) * ncols);
-		indexInfo->ii_ExclusionStrats = (uint16 *) palloc(sizeof(uint16) * ncols);
+		Assert(list_length(exclusionOpNames) == nkeycols);
+		indexInfo->ii_ExclusionOps = (Oid *) palloc(sizeof(Oid) * nkeycols);
+		indexInfo->ii_ExclusionProcs = (Oid *) palloc(sizeof(Oid) * nkeycols);
+		indexInfo->ii_ExclusionStrats = (uint16 *) palloc(sizeof(uint16) * nkeycols);
 		nextExclOp = list_head(exclusionOpNames);
 	}
 	else
@@ -1028,6 +1047,11 @@ ComputeIndexAttrs(IndexInfo *indexInfo,
 			Node	   *expr = attribute->expr;
 
 			Assert(expr != NULL);
+
+			if (attn >= nkeycols)
+				ereport(ERROR,
+					(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+						errmsg("Expressions in INCLUDING clause are not supported")));
 			atttype = exprType(expr);
 			attcollation = exprCollation(expr);
 
@@ -1106,6 +1130,16 @@ ComputeIndexAttrs(IndexInfo *indexInfo,
 		collationOidP[attn] = attcollation;
 
 		/*
+		 * Skip opclass and ordering options for included columns.
+		 */
+		if (attn >= nkeycols)
+		{
+			colOptionP[attn] = 0;
+			attn++;
+			continue;
+		}
+
+		/*
 		 * Identify the opclass to use.
 		 */
 		classOidP[attn] = GetIndexOpClass(attribute->opclass,
diff --git a/src/backend/commands/matview.c b/src/backend/commands/matview.c
index 869c586..403b037 100644
--- a/src/backend/commands/matview.c
+++ b/src/backend/commands/matview.c
@@ -567,7 +567,7 @@ refresh_by_match_merge(Oid matviewOid, Oid tempOid, Oid relowner,
 										  RelationGetRelationName(tempRel));
 	diffname = make_temptable_name_n(tempname, 2);
 
-	relnatts = matviewRel->rd_rel->relnatts;
+	relnatts = RelationGetNumberOfAttributes(matviewRel);
 	usedForQual = (bool *) palloc0(sizeof(bool) * relnatts);
 
 	/* Open SPI context. */
@@ -653,11 +653,11 @@ refresh_by_match_merge(Oid matviewOid, Oid tempOid, Oid relowner,
 			RelationGetIndexExpressions(indexRel) == NIL &&
 			RelationGetIndexPredicate(indexRel) == NIL)
 		{
-			int			numatts = indexStruct->indnatts;
+			int			indnkeyatts = indexStruct->indnkeyatts;
 			int			i;
 
 			/* Add quals for all columns from this index. */
-			for (i = 0; i < numatts; i++)
+			for (i = 0; i < indnkeyatts; i++)
 			{
 				int			attnum = indexStruct->indkey.values[i];
 				Oid			type;
diff --git a/src/backend/commands/tablecmds.c b/src/backend/commands/tablecmds.c
index 96dc923..e6b5d07 100644
--- a/src/backend/commands/tablecmds.c
+++ b/src/backend/commands/tablecmds.c
@@ -5234,7 +5234,7 @@ ATExecDropNotNull(Relation rel, const char *colName, LOCKMODE lockmode)
 			 * Loop over each attribute in the primary key and see if it
 			 * matches the to-be-altered attribute
 			 */
-			for (i = 0; i < indexStruct->indnatts; i++)
+			for (i = 0; i < indexStruct->indnkeyatts; i++)
 			{
 				if (indexStruct->indkey.values[i] == attnum)
 					ereport(ERROR,
@@ -7083,7 +7083,7 @@ transformFkeyGetPrimaryKey(Relation pkrel, Oid *indexOid,
 	 * assume a primary key cannot have expressional elements)
 	 */
 	*attnamelist = NIL;
-	for (i = 0; i < indexStruct->indnatts; i++)
+	for (i = 0; i < indexStruct->indnkeyatts; i++)
 	{
 		int			pkattno = indexStruct->indkey.values[i];
 
@@ -7161,7 +7161,7 @@ transformFkeyCheckAttrs(Relation pkrel,
 		 * partial index; forget it if there are any expressions, too. Invalid
 		 * indexes are out as well.
 		 */
-		if (indexStruct->indnatts == numattrs &&
+		if (indexStruct->indnkeyatts == numattrs &&
 			indexStruct->indisunique &&
 			IndexIsValid(indexStruct) &&
 			heap_attisnull(indexTuple, Anum_pg_index_indpred) &&
@@ -11045,7 +11045,7 @@ ATExecReplicaIdentity(Relation rel, ReplicaIdentityStmt *stmt, LOCKMODE lockmode
 						RelationGetRelationName(indexRel))));
 
 	/* Check index for nullable columns. */
-	for (key = 0; key < indexRel->rd_index->indnatts; key++)
+	for (key = 0; key < IndexRelationGetNumberOfKeyAttributes(indexRel); key++)
 	{
 		int16		attno = indexRel->rd_index->indkey.values[key];
 		Form_pg_attribute attr;
diff --git a/src/backend/executor/execIndexing.c b/src/backend/executor/execIndexing.c
index 838cee7..597e96b 100644
--- a/src/backend/executor/execIndexing.c
+++ b/src/backend/executor/execIndexing.c
@@ -646,7 +646,7 @@ check_exclusion_or_unique_constraint(Relation heap, Relation index,
 	Oid		   *constr_procs;
 	uint16	   *constr_strats;
 	Oid		   *index_collations = index->rd_indcollation;
-	int			index_natts = index->rd_index->indnatts;
+	int			indnkeyatts = IndexRelationGetNumberOfKeyAttributes(index);
 	IndexScanDesc index_scan;
 	HeapTuple	tup;
 	ScanKeyData scankeys[INDEX_MAX_KEYS];
@@ -673,7 +673,7 @@ check_exclusion_or_unique_constraint(Relation heap, Relation index,
 	 * If any of the input values are NULL, the constraint check is assumed to
 	 * pass (i.e., we assume the operators are strict).
 	 */
-	for (i = 0; i < index_natts; i++)
+	for (i = 0; i < indnkeyatts; i++)
 	{
 		if (isnull[i])
 			return true;
@@ -685,7 +685,7 @@ check_exclusion_or_unique_constraint(Relation heap, Relation index,
 	 */
 	InitDirtySnapshot(DirtySnapshot);
 
-	for (i = 0; i < index_natts; i++)
+	for (i = 0; i < indnkeyatts; i++)
 	{
 		ScanKeyEntryInitialize(&scankeys[i],
 							   0,
@@ -717,8 +717,8 @@ check_exclusion_or_unique_constraint(Relation heap, Relation index,
 retry:
 	conflict = false;
 	found_self = false;
-	index_scan = index_beginscan(heap, index, &DirtySnapshot, index_natts, 0);
-	index_rescan(index_scan, scankeys, index_natts, NULL, 0);
+	index_scan = index_beginscan(heap, index, &DirtySnapshot, indnkeyatts, 0);
+	index_rescan(index_scan, scankeys, indnkeyatts, NULL, 0);
 
 	while ((tup = index_getnext(index_scan,
 								ForwardScanDirection)) != NULL)
@@ -877,10 +877,10 @@ index_recheck_constraint(Relation index, Oid *constr_procs,
 						 Datum *existing_values, bool *existing_isnull,
 						 Datum *new_values)
 {
-	int			index_natts = index->rd_index->indnatts;
+	int			indnkeyatts = IndexRelationGetNumberOfKeyAttributes(index);
 	int			i;
 
-	for (i = 0; i < index_natts; i++)
+	for (i = 0; i < indnkeyatts; i++)
 	{
 		/* Assume the exclusion operators are strict */
 		if (existing_isnull[i])
diff --git a/src/backend/executor/nodeIndexscan.c b/src/backend/executor/nodeIndexscan.c
index bf16cb1..69e82cb 100644
--- a/src/backend/executor/nodeIndexscan.c
+++ b/src/backend/executor/nodeIndexscan.c
@@ -1144,7 +1144,9 @@ ExecIndexBuildScanKeys(PlanState *planstate, Relation index,
 		Expr	   *leftop;		/* expr on lhs of operator */
 		Expr	   *rightop;	/* expr on rhs ... */
 		AttrNumber	varattno;	/* att number used in scan */
+		int			indnkeyatts;
 
+		indnkeyatts = IndexRelationGetNumberOfKeyAttributes(index);
 		if (IsA(clause, OpExpr))
 		{
 			/* indexkey op const or indexkey op expression */
@@ -1169,7 +1171,7 @@ ExecIndexBuildScanKeys(PlanState *planstate, Relation index,
 				elog(ERROR, "indexqual doesn't have key on left side");
 
 			varattno = ((Var *) leftop)->varattno;
-			if (varattno < 1 || varattno > index->rd_index->indnatts)
+			if (varattno < 1 || varattno > indnkeyatts)
 				elog(ERROR, "bogus index qualification");
 
 			/*
@@ -1292,7 +1294,7 @@ ExecIndexBuildScanKeys(PlanState *planstate, Relation index,
 				opnos_cell = lnext(opnos_cell);
 
 				if (index->rd_rel->relam != BTREE_AM_OID ||
-					varattno < 1 || varattno > index->rd_index->indnatts)
+					varattno < 1 || varattno > indnkeyatts)
 					elog(ERROR, "bogus RowCompare index qualification");
 				opfamily = index->rd_opfamily[varattno - 1];
 
@@ -1413,7 +1415,7 @@ ExecIndexBuildScanKeys(PlanState *planstate, Relation index,
 				elog(ERROR, "indexqual doesn't have key on left side");
 
 			varattno = ((Var *) leftop)->varattno;
-			if (varattno < 1 || varattno > index->rd_index->indnatts)
+			if (varattno < 1 || varattno > indnkeyatts)
 				elog(ERROR, "bogus index qualification");
 
 			/*
diff --git a/src/backend/nodes/copyfuncs.c b/src/backend/nodes/copyfuncs.c
index a9e9cc3..92612ec 100644
--- a/src/backend/nodes/copyfuncs.c
+++ b/src/backend/nodes/copyfuncs.c
@@ -2626,6 +2626,7 @@ _copyConstraint(const Constraint *from)
 	COPY_NODE_FIELD(raw_expr);
 	COPY_STRING_FIELD(cooked_expr);
 	COPY_NODE_FIELD(keys);
+	COPY_NODE_FIELD(including);
 	COPY_NODE_FIELD(exclusions);
 	COPY_NODE_FIELD(options);
 	COPY_STRING_FIELD(indexname);
@@ -3115,6 +3116,7 @@ _copyIndexStmt(const IndexStmt *from)
 	COPY_STRING_FIELD(accessMethod);
 	COPY_STRING_FIELD(tableSpace);
 	COPY_NODE_FIELD(indexParams);
+	COPY_NODE_FIELD(indexIncludingParams);
 	COPY_NODE_FIELD(options);
 	COPY_NODE_FIELD(whereClause);
 	COPY_NODE_FIELD(excludeOpNames);
diff --git a/src/backend/nodes/equalfuncs.c b/src/backend/nodes/equalfuncs.c
index b9c3959..78be931 100644
--- a/src/backend/nodes/equalfuncs.c
+++ b/src/backend/nodes/equalfuncs.c
@@ -1249,6 +1249,7 @@ _equalIndexStmt(const IndexStmt *a, const IndexStmt *b)
 	COMPARE_STRING_FIELD(accessMethod);
 	COMPARE_STRING_FIELD(tableSpace);
 	COMPARE_NODE_FIELD(indexParams);
+	COMPARE_NODE_FIELD(indexIncludingParams);
 	COMPARE_NODE_FIELD(options);
 	COMPARE_NODE_FIELD(whereClause);
 	COMPARE_NODE_FIELD(excludeOpNames);
@@ -2361,6 +2362,7 @@ _equalConstraint(const Constraint *a, const Constraint *b)
 	COMPARE_NODE_FIELD(raw_expr);
 	COMPARE_STRING_FIELD(cooked_expr);
 	COMPARE_NODE_FIELD(keys);
+	COMPARE_NODE_FIELD(including);
 	COMPARE_NODE_FIELD(exclusions);
 	COMPARE_NODE_FIELD(options);
 	COMPARE_STRING_FIELD(indexname);
diff --git a/src/backend/nodes/outfuncs.c b/src/backend/nodes/outfuncs.c
index 28d983c..a895289 100644
--- a/src/backend/nodes/outfuncs.c
+++ b/src/backend/nodes/outfuncs.c
@@ -2213,6 +2213,7 @@ _outIndexStmt(StringInfo str, const IndexStmt *node)
 	WRITE_STRING_FIELD(accessMethod);
 	WRITE_STRING_FIELD(tableSpace);
 	WRITE_NODE_FIELD(indexParams);
+	WRITE_NODE_FIELD(indexIncludingParams);
 	WRITE_NODE_FIELD(options);
 	WRITE_NODE_FIELD(whereClause);
 	WRITE_NODE_FIELD(excludeOpNames);
@@ -2949,6 +2950,7 @@ _outConstraint(StringInfo str, const Constraint *node)
 		case CONSTR_PRIMARY:
 			appendStringInfoString(str, "PRIMARY_KEY");
 			WRITE_NODE_FIELD(keys);
+			WRITE_NODE_FIELD(including);
 			WRITE_NODE_FIELD(options);
 			WRITE_STRING_FIELD(indexname);
 			WRITE_STRING_FIELD(indexspace);
@@ -2958,6 +2960,7 @@ _outConstraint(StringInfo str, const Constraint *node)
 		case CONSTR_UNIQUE:
 			appendStringInfoString(str, "UNIQUE");
 			WRITE_NODE_FIELD(keys);
+			WRITE_NODE_FIELD(including);
 			WRITE_NODE_FIELD(options);
 			WRITE_STRING_FIELD(indexname);
 			WRITE_STRING_FIELD(indexspace);
diff --git a/src/backend/optimizer/path/pathkeys.c b/src/backend/optimizer/path/pathkeys.c
index eed39b9..3aadc9a 100644
--- a/src/backend/optimizer/path/pathkeys.c
+++ b/src/backend/optimizer/path/pathkeys.c
@@ -450,6 +450,13 @@ build_index_pathkeys(PlannerInfo *root,
 		bool		nulls_first;
 		PathKey    *cpathkey;
 
+		/*
+		 * INCLUDING columns are stored in index unordered,
+		 * so they don't support ordered index scan.
+		 */
+		if(i >= index->nkeycolumns)
+			break;
+
 		/* We assume we don't need to make a copy of the tlist item */
 		indexkey = indextle->expr;
 
diff --git a/src/backend/optimizer/util/plancat.c b/src/backend/optimizer/util/plancat.c
index 0ea9fcf..9b43811 100644
--- a/src/backend/optimizer/util/plancat.c
+++ b/src/backend/optimizer/util/plancat.c
@@ -166,7 +166,7 @@ get_relation_info(PlannerInfo *root, Oid relationObjectId, bool inhparent,
 			Form_pg_index index;
 			IndexAmRoutine *amroutine;
 			IndexOptInfo *info;
-			int			ncolumns;
+			int			ncolumns, nkeycolumns;
 			int			i;
 
 			/*
@@ -209,19 +209,25 @@ get_relation_info(PlannerInfo *root, Oid relationObjectId, bool inhparent,
 				RelationGetForm(indexRelation)->reltablespace;
 			info->rel = rel;
 			info->ncolumns = ncolumns = index->indnatts;
+			info->nkeycolumns = nkeycolumns = index->indnkeyatts;
+
 			info->indexkeys = (int *) palloc(sizeof(int) * ncolumns);
 			info->indexcollations = (Oid *) palloc(sizeof(Oid) * ncolumns);
-			info->opfamily = (Oid *) palloc(sizeof(Oid) * ncolumns);
-			info->opcintype = (Oid *) palloc(sizeof(Oid) * ncolumns);
+			info->opfamily = (Oid *) palloc(sizeof(Oid) * nkeycolumns);
+			info->opcintype = (Oid *) palloc(sizeof(Oid) * nkeycolumns);
 			info->canreturn = (bool *) palloc(sizeof(bool) * ncolumns);
 
 			for (i = 0; i < ncolumns; i++)
 			{
 				info->indexkeys[i] = index->indkey.values[i];
 				info->indexcollations[i] = indexRelation->rd_indcollation[i];
+				info->canreturn[i] = index_can_return(indexRelation, i + 1);
+			}
+
+			for (i = 0; i < nkeycolumns; i++)
+			{
 				info->opfamily[i] = indexRelation->rd_opfamily[i];
 				info->opcintype[i] = indexRelation->rd_opcintype[i];
-				info->canreturn[i] = index_can_return(indexRelation, i + 1);
 			}
 
 			info->relam = indexRelation->rd_rel->relam;
@@ -249,10 +255,10 @@ get_relation_info(PlannerInfo *root, Oid relationObjectId, bool inhparent,
 				Assert(amroutine->amcanorder);
 
 				info->sortopfamily = info->opfamily;
-				info->reverse_sort = (bool *) palloc(sizeof(bool) * ncolumns);
-				info->nulls_first = (bool *) palloc(sizeof(bool) * ncolumns);
+				info->reverse_sort = (bool *) palloc(sizeof(bool) * nkeycolumns);
+				info->nulls_first = (bool *) palloc(sizeof(bool) * nkeycolumns);
 
-				for (i = 0; i < ncolumns; i++)
+				for (i = 0; i < nkeycolumns; i++)
 				{
 					int16		opt = indexRelation->rd_indoption[i];
 
@@ -276,11 +282,11 @@ get_relation_info(PlannerInfo *root, Oid relationObjectId, bool inhparent,
 				 * of current or foreseeable amcanorder index types, it's not
 				 * worth expending more effort on now.
 				 */
-				info->sortopfamily = (Oid *) palloc(sizeof(Oid) * ncolumns);
-				info->reverse_sort = (bool *) palloc(sizeof(bool) * ncolumns);
-				info->nulls_first = (bool *) palloc(sizeof(bool) * ncolumns);
+				info->sortopfamily = (Oid *) palloc(sizeof(Oid) * nkeycolumns);
+				info->reverse_sort = (bool *) palloc(sizeof(bool) * nkeycolumns);
+				info->nulls_first = (bool *) palloc(sizeof(bool) * nkeycolumns);
 
-				for (i = 0; i < ncolumns; i++)
+				for (i = 0; i < nkeycolumns; i++)
 				{
 					int16		opt = indexRelation->rd_indoption[i];
 					Oid			ltopr;
@@ -594,7 +600,7 @@ infer_arbiter_indexes(PlannerInfo *root)
 			goto next;
 
 		/* Build BMS representation of cataloged index attributes */
-		for (natt = 0; natt < idxForm->indnatts; natt++)
+		for (natt = 0; natt < idxForm->indnkeyatts; natt++)
 		{
 			int			attno = idxRel->rd_index->indkey.values[natt];
 
@@ -1513,7 +1519,7 @@ has_unique_index(RelOptInfo *rel, AttrNumber attno)
 		 * just the specified attr is unique.
 		 */
 		if (index->unique &&
-			index->ncolumns == 1 &&
+			index->nkeycolumns == 1 &&
 			index->indexkeys[0] == attno &&
 			(index->indpred == NIL || index->predOK))
 			return true;
diff --git a/src/backend/parser/analyze.c b/src/backend/parser/analyze.c
index 7d2fedf..faf8309 100644
--- a/src/backend/parser/analyze.c
+++ b/src/backend/parser/analyze.c
@@ -920,7 +920,7 @@ transformOnConflictClause(ParseState *pstate,
 		 * relation.
 		 */
 		Assert(pstate->p_next_resno == 1);
-		for (attno = 0; attno < targetrel->rd_rel->relnatts; attno++)
+		for (attno = 0; attno < RelationGetNumberOfAttributes(targetrel); attno++)
 		{
 			Form_pg_attribute attr = targetrel->rd_att->attrs[attno];
 			char	   *name;
@@ -2122,8 +2122,8 @@ transformUpdateTargetList(ParseState *pstate, List *origTlist)
 								EXPR_KIND_UPDATE_SOURCE);
 
 	/* Prepare to assign non-conflicting resnos to resjunk attributes */
-	if (pstate->p_next_resno <= pstate->p_target_relation->rd_rel->relnatts)
-		pstate->p_next_resno = pstate->p_target_relation->rd_rel->relnatts + 1;
+	if (pstate->p_next_resno <= RelationGetNumberOfAttributes(pstate->p_target_relation))
+		pstate->p_next_resno = RelationGetNumberOfAttributes(pstate->p_target_relation) + 1;
 
 	/* Prepare non-junk columns for assignment to target table */
 	target_rte = pstate->p_target_rangetblentry;
diff --git a/src/backend/parser/gram.y b/src/backend/parser/gram.y
index b307b48..e608b83 100644
--- a/src/backend/parser/gram.y
+++ b/src/backend/parser/gram.y
@@ -354,6 +354,7 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query);
 				oper_argtypes RuleActionList RuleActionMulti
 				opt_column_list columnList opt_name_list
 				sort_clause opt_sort_clause sortby_list index_params
+				optincluding opt_including index_including_params
 				name_list role_list from_clause from_list opt_array_bounds
 				qualified_name_list any_name any_name_list type_name_list
 				any_operator expr_list attrs
@@ -370,6 +371,7 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query);
 				create_generic_options alter_generic_options
 				relation_expr_list dostmt_opt_list
 				transform_element_list transform_type_list
+				optcincluding opt_c_including
 
 %type <list>	group_by_list
 %type <node>	group_by_item empty_grouping_set rollup_clause cube_clause
@@ -3213,17 +3215,18 @@ ConstraintElem:
 					n->initially_valid = !n->skip_validation;
 					$$ = (Node *)n;
 				}
-			| UNIQUE '(' columnList ')' opt_definition OptConsTableSpace
+			| UNIQUE '(' columnList ')' opt_c_including opt_definition OptConsTableSpace
 				ConstraintAttributeSpec
 				{
 					Constraint *n = makeNode(Constraint);
 					n->contype = CONSTR_UNIQUE;
 					n->location = @1;
 					n->keys = $3;
-					n->options = $5;
+					n->including = $5;
+					n->options = $6;
 					n->indexname = NULL;
-					n->indexspace = $6;
-					processCASbits($7, @7, "UNIQUE",
+					n->indexspace = $7;
+					processCASbits($8, @8, "UNIQUE",
 								   &n->deferrable, &n->initdeferred, NULL,
 								   NULL, yyscanner);
 					$$ = (Node *)n;
@@ -3234,6 +3237,7 @@ ConstraintElem:
 					n->contype = CONSTR_UNIQUE;
 					n->location = @1;
 					n->keys = NIL;
+					n->including = NIL;
 					n->options = NIL;
 					n->indexname = $2;
 					n->indexspace = NULL;
@@ -3242,17 +3246,18 @@ ConstraintElem:
 								   NULL, yyscanner);
 					$$ = (Node *)n;
 				}
-			| PRIMARY KEY '(' columnList ')' opt_definition OptConsTableSpace
+			| PRIMARY KEY '(' columnList ')' opt_c_including opt_definition OptConsTableSpace
 				ConstraintAttributeSpec
 				{
 					Constraint *n = makeNode(Constraint);
 					n->contype = CONSTR_PRIMARY;
 					n->location = @1;
 					n->keys = $4;
-					n->options = $6;
+					n->including = $6;
+					n->options = $7;
 					n->indexname = NULL;
-					n->indexspace = $7;
-					processCASbits($8, @8, "PRIMARY KEY",
+					n->indexspace = $8;
+					processCASbits($9, @9, "PRIMARY KEY",
 								   &n->deferrable, &n->initdeferred, NULL,
 								   NULL, yyscanner);
 					$$ = (Node *)n;
@@ -3263,6 +3268,7 @@ ConstraintElem:
 					n->contype = CONSTR_PRIMARY;
 					n->location = @1;
 					n->keys = NIL;
+					n->including = NIL;
 					n->options = NIL;
 					n->indexname = $3;
 					n->indexspace = NULL;
@@ -3330,6 +3336,13 @@ columnElem: ColId
 				}
 		;
 
+opt_c_including:	INCLUDING optcincluding			{ $$ = $2; }
+			 |		/* EMPTY */						{ $$ = NIL; }
+		;
+
+optcincluding : '(' columnList ')'		{ $$ = $2; }
+		;
+
 key_match:  MATCH FULL
 			{
 				$$ = FKCONSTR_MATCH_FULL;
@@ -6604,7 +6617,7 @@ defacl_privilege_target:
 
 IndexStmt:	CREATE opt_unique INDEX opt_concurrently opt_index_name
 			ON qualified_name access_method_clause '(' index_params ')'
-			opt_reloptions OptTableSpace where_clause
+			opt_including opt_reloptions OptTableSpace where_clause
 				{
 					IndexStmt *n = makeNode(IndexStmt);
 					n->unique = $2;
@@ -6613,9 +6626,10 @@ IndexStmt:	CREATE opt_unique INDEX opt_concurrently opt_index_name
 					n->relation = $7;
 					n->accessMethod = $8;
 					n->indexParams = $10;
-					n->options = $12;
-					n->tableSpace = $13;
-					n->whereClause = $14;
+					n->indexIncludingParams = $12;
+					n->options = $13;
+					n->tableSpace = $14;
+					n->whereClause = $15;
 					n->excludeOpNames = NIL;
 					n->idxcomment = NULL;
 					n->indexOid = InvalidOid;
@@ -6630,7 +6644,7 @@ IndexStmt:	CREATE opt_unique INDEX opt_concurrently opt_index_name
 				}
 			| CREATE opt_unique INDEX opt_concurrently IF_P NOT EXISTS index_name
 			ON qualified_name access_method_clause '(' index_params ')'
-			opt_reloptions OptTableSpace where_clause
+			opt_including opt_reloptions OptTableSpace where_clause
 				{
 					IndexStmt *n = makeNode(IndexStmt);
 					n->unique = $2;
@@ -6639,9 +6653,10 @@ IndexStmt:	CREATE opt_unique INDEX opt_concurrently opt_index_name
 					n->relation = $10;
 					n->accessMethod = $11;
 					n->indexParams = $13;
-					n->options = $15;
-					n->tableSpace = $16;
-					n->whereClause = $17;
+					n->indexIncludingParams = $15;
+					n->options = $16;
+					n->tableSpace = $17;
+					n->whereClause = $18;
 					n->excludeOpNames = NIL;
 					n->idxcomment = NULL;
 					n->indexOid = InvalidOid;
@@ -6720,6 +6735,16 @@ index_elem:	ColId opt_collate opt_class opt_asc_desc opt_nulls_order
 				}
 		;
 
+optincluding : '(' index_including_params ')'		{ $$ = $2; }
+		;
+opt_including:		INCLUDING optincluding			{ $$ = $2; }
+			 |		/* EMPTY */						{ $$ = NIL; }
+		;
+
+index_including_params:	index_elem						{ $$ = list_make1($1); }
+			| index_including_params ',' index_elem		{ $$ = lappend($1, $3); }
+		;
+
 opt_collate: COLLATE any_name						{ $$ = $2; }
 			| /*EMPTY*/								{ $$ = NIL; }
 		;
diff --git a/src/backend/parser/parse_relation.c b/src/backend/parser/parse_relation.c
index 81332b5..0f5d796 100644
--- a/src/backend/parser/parse_relation.c
+++ b/src/backend/parser/parse_relation.c
@@ -2875,7 +2875,7 @@ attnameAttNum(Relation rd, const char *attname, bool sysColOK)
 {
 	int			i;
 
-	for (i = 0; i < rd->rd_rel->relnatts; i++)
+	for (i = 0; i < RelationGetNumberOfAttributes(rd); i++)
 	{
 		Form_pg_attribute att = rd->rd_att->attrs[i];
 
diff --git a/src/backend/parser/parse_target.c b/src/backend/parser/parse_target.c
index ec08e1e..e5cb83d 100644
--- a/src/backend/parser/parse_target.c
+++ b/src/backend/parser/parse_target.c
@@ -898,7 +898,7 @@ checkInsertTargets(ParseState *pstate, List *cols, List **attrnos)
 		 * Generate default column list for INSERT.
 		 */
 		Form_pg_attribute *attr = pstate->p_target_relation->rd_att->attrs;
-		int			numcol = pstate->p_target_relation->rd_rel->relnatts;
+		int			numcol = RelationGetNumberOfAttributes(pstate->p_target_relation);
 		int			i;
 
 		for (i = 0; i < numcol; i++)
diff --git a/src/backend/parser/parse_utilcmd.c b/src/backend/parser/parse_utilcmd.c
index dc431c7..575f233 100644
--- a/src/backend/parser/parse_utilcmd.c
+++ b/src/backend/parser/parse_utilcmd.c
@@ -1242,14 +1242,14 @@ generateClonedIndexStmt(CreateStmtContext *cxt, Relation source_idx,
 
 	/* Build the list of IndexElem */
 	index->indexParams = NIL;
+	index->indexIncludingParams = NIL;
 
 	indexpr_item = list_head(indexprs);
-	for (keyno = 0; keyno < idxrec->indnatts; keyno++)
+	for (keyno = 0; keyno < idxrec->indnkeyatts; keyno++)
 	{
 		IndexElem  *iparam;
 		AttrNumber	attnum = idxrec->indkey.values[keyno];
 		int16		opt = source_idx->rd_indoption[keyno];
-
 		iparam = makeNode(IndexElem);
 
 		if (AttributeNumberIsValid(attnum))
@@ -1331,6 +1331,36 @@ generateClonedIndexStmt(CreateStmtContext *cxt, Relation source_idx,
 		index->indexParams = lappend(index->indexParams, iparam);
 	}
 
+	/* Handle included columns separately */
+	for (keyno = idxrec->indnkeyatts; keyno < idxrec->indnatts; keyno++)
+	{
+		IndexElem  *iparam;
+		AttrNumber	attnum = idxrec->indkey.values[keyno];
+
+		iparam = makeNode(IndexElem);
+
+		if (AttributeNumberIsValid(attnum))
+		{
+			/* Simple index column */
+			char	   *attname;
+
+			attname = get_relid_attribute_name(indrelid, attnum);
+			keycoltype = get_atttype(indrelid, attnum);
+
+			iparam->name = attname;
+			iparam->expr = NULL;
+		}
+		else
+			elog(ERROR, "Expressions are not supported in included columns.");
+
+		/* Copy the original index column name */
+		iparam->indexcolname = pstrdup(NameStr(attrs[keyno]->attname));
+
+		/* Add the collation name, if non-default */
+		iparam->collation = get_collation(indcollation->values[keyno], keycoltype);
+
+		index->indexIncludingParams = lappend(index->indexIncludingParams, iparam);
+	}
 	/* Copy reloptions if any */
 	datum = SysCacheGetAttr(RELOID, ht_idxrel,
 							Anum_pg_class_reloptions, &isnull);
@@ -1523,6 +1553,7 @@ transformIndexConstraints(CreateStmtContext *cxt)
 			IndexStmt  *priorindex = lfirst(k);
 
 			if (equal(index->indexParams, priorindex->indexParams) &&
+				equal(index->indexIncludingParams, priorindex->indexIncludingParams) &&
 				equal(index->whereClause, priorindex->whereClause) &&
 				equal(index->excludeOpNames, priorindex->excludeOpNames) &&
 				strcmp(index->accessMethod, priorindex->accessMethod) == 0 &&
@@ -1594,6 +1625,7 @@ transformIndexConstraint(Constraint *constraint, CreateStmtContext *cxt)
 	index->tableSpace = constraint->indexspace;
 	index->whereClause = constraint->where_clause;
 	index->indexParams = NIL;
+	index->indexIncludingParams = NIL;
 	index->excludeOpNames = NIL;
 	index->idxcomment = NULL;
 	index->indexOid = InvalidOid;
@@ -1743,24 +1775,30 @@ transformIndexConstraint(Constraint *constraint, CreateStmtContext *cxt)
 											   heap_rel->rd_rel->relhasoids);
 			attname = pstrdup(NameStr(attform->attname));
 
-			/*
-			 * Insist on default opclass and sort options.  While the index
-			 * would still work as a constraint with non-default settings, it
-			 * might not provide exactly the same uniqueness semantics as
-			 * you'd get from a normally-created constraint; and there's also
-			 * the dump/reload problem mentioned above.
-			 */
-			defopclass = GetDefaultOpClass(attform->atttypid,
-										   index_rel->rd_rel->relam);
-			if (indclass->values[i] != defopclass ||
-				index_rel->rd_indoption[i] != 0)
-				ereport(ERROR,
-						(errcode(ERRCODE_WRONG_OBJECT_TYPE),
-						 errmsg("index \"%s\" does not have default sorting behavior", index_name),
-						 errdetail("Cannot create a primary key or unique constraint using such an index."),
-					 parser_errposition(cxt->pstate, constraint->location)));
+			if (i < index_form->indnkeyatts)
+			{
+				/*
+				* Insist on default opclass and sort options.  While the index
+				* would still work as a constraint with non-default settings, it
+				* might not provide exactly the same uniqueness semantics as
+				* you'd get from a normally-created constraint; and there's also
+				* the dump/reload problem mentioned above.
+				*/
+				defopclass = GetDefaultOpClass(attform->atttypid,
+											index_rel->rd_rel->relam);
+				if (indclass->values[i] != defopclass ||
+					index_rel->rd_indoption[i] != 0)
+					ereport(ERROR,
+							(errcode(ERRCODE_WRONG_OBJECT_TYPE),
+							errmsg("index \"%s\" does not have default sorting behavior", index_name),
+							errdetail("Cannot create a primary key or unique constraint using such an index."),
+						parser_errposition(cxt->pstate, constraint->location)));
+
+				constraint->keys = lappend(constraint->keys, makeString(attname));
+			}
+			else
+				constraint->including = lappend(constraint->including, makeString(attname));
 
-			constraint->keys = lappend(constraint->keys, makeString(attname));
 		}
 
 		/* Close the index relation but keep the lock */
@@ -1773,6 +1811,7 @@ transformIndexConstraint(Constraint *constraint, CreateStmtContext *cxt)
 	 * If it's an EXCLUDE constraint, the grammar returns a list of pairs of
 	 * IndexElems and operator names.  We have to break that apart into
 	 * separate lists.
+	 * NOTE that exclusion constraints don't support included nonkey attributes
 	 */
 	if (constraint->contype == CONSTR_EXCLUSION)
 	{
@@ -1927,6 +1966,54 @@ transformIndexConstraint(Constraint *constraint, CreateStmtContext *cxt)
 		index->indexParams = lappend(index->indexParams, iparam);
 	}
 
+	/* Here is some ugly code dublication. But we do need it. */
+	foreach(lc, constraint->including)
+	{
+		char	   *key = strVal(lfirst(lc));
+		bool		found = false;
+		ColumnDef  *column = NULL;
+		ListCell   *columns;
+		IndexElem  *iparam;
+
+		foreach(columns, cxt->columns)
+		{
+			column = (ColumnDef *) lfirst(columns);
+			Assert(IsA(column, ColumnDef));
+			if (strcmp(column->colname, key) == 0)
+			{
+				found = true;
+				break;
+			}
+		}
+		if (found)
+		{
+			/* found column in the new table; force it to be NOT NULL */
+			if (constraint->contype == CONSTR_PRIMARY)
+				column->is_not_null = TRUE;
+		}
+
+		/*
+		 * In the ALTER TABLE case, don't complain about index keys not
+		 * created in the command; they may well exist already. DefineIndex
+		 * will complain about them if not, and will also take care of marking
+		 * them NOT NULL.
+		 */
+		if (!found && !cxt->isalter)
+			ereport(ERROR,
+					(errcode(ERRCODE_UNDEFINED_COLUMN),
+					 errmsg("column \"%s\" named in key does not exist", key),
+					 parser_errposition(cxt->pstate, constraint->location)));
+
+		/* OK, add it to the index definition */
+		iparam = makeNode(IndexElem);
+		iparam->name = pstrdup(key);
+		iparam->expr = NULL;
+		iparam->indexcolname = NULL;
+		iparam->collation = NIL;
+		iparam->opclass = NIL;
+		index->indexIncludingParams = lappend(index->indexIncludingParams, iparam);
+	}
+
 	return index;
 }
 
diff --git a/src/backend/utils/adt/ruleutils.c b/src/backend/utils/adt/ruleutils.c
index 490a090..60bfce8 100644
--- a/src/backend/utils/adt/ruleutils.c
+++ b/src/backend/utils/adt/ruleutils.c
@@ -1140,6 +1140,21 @@ pg_get_indexdef_worker(Oid indexrelid, int colno,
 		Oid			keycoltype;
 		Oid			keycolcollation;
 
+		/*
+		 * attrsOnly flag is used for building unique-constraint and
+		 * exclusion-constraint error messages. Included attrs are
+		 * meaningless there, so do not include them into the message.
+		 */
+		if (attrsOnly && keyno >= idxrec->indnkeyatts)
+			break;
+
+		/* Report the INCLUDED attributes, if any. */
+		if ((!attrsOnly) && keyno == idxrec->indnkeyatts)
+		{
+				appendStringInfoString(&buf, ") INCLUDING (");
+				sep = "";
+		}
+
 		if (!colno)
 			appendStringInfoString(&buf, sep);
 		sep = ", ";
@@ -1153,6 +1168,7 @@ pg_get_indexdef_worker(Oid indexrelid, int colno,
 			attname = get_relid_attribute_name(indrelid, attnum);
 			if (!colno || colno == keyno + 1)
 				appendStringInfoString(&buf, quote_identifier(attname));
+
 			get_atttypetypmodcoll(indrelid, attnum,
 								  &keycoltype, &keycoltypmod,
 								  &keycolcollation);
@@ -1192,6 +1208,9 @@ pg_get_indexdef_worker(Oid indexrelid, int colno,
 				appendStringInfo(&buf, " COLLATE %s",
 								 generate_collation_name((indcoll)));
 
+			if(keyno >= idxrec->indnkeyatts)
+				continue;
+
 			/* Add the operator class name, if not default */
 			get_opclass_name(indclass->values[keyno], keycoltype, &buf);
 
diff --git a/src/backend/utils/adt/selfuncs.c b/src/backend/utils/adt/selfuncs.c
index 46c95b0..9679b3a 100644
--- a/src/backend/utils/adt/selfuncs.c
+++ b/src/backend/utils/adt/selfuncs.c
@@ -4474,7 +4474,7 @@ examine_variable(PlannerInfo *root, Node *node, int varRelid,
 						 * should match has_unique_index().
 						 */
 						if (index->unique &&
-							index->ncolumns == 1 &&
+							index->nkeycolumns == 1 &&
 							(index->indpred == NIL || index->predOK))
 							vardata->isunique = true;
 
@@ -6559,7 +6559,7 @@ btcostestimate(PlannerInfo *root, IndexPath *path, double loop_count,
 	 * NullTest invalidates that theory, even though it sets eqQualHere.
 	 */
 	if (index->unique &&
-		indexcol == index->ncolumns - 1 &&
+		indexcol == index->nkeycolumns - 1 &&
 		eqQualHere &&
 		!found_saop &&
 		!found_is_null_op)
diff --git a/src/backend/utils/cache/relcache.c b/src/backend/utils/cache/relcache.c
index 130c06d..c65386f 100644
--- a/src/backend/utils/cache/relcache.c
+++ b/src/backend/utils/cache/relcache.c
@@ -521,7 +521,7 @@ RelationBuildTupleDesc(Relation relation)
 	/*
 	 * add attribute data to relation->rd_att
 	 */
-	need = relation->rd_rel->relnatts;
+	need = RelationGetNumberOfAttributes(relation);
 
 	while (HeapTupleIsValid(pg_attribute_tuple = systable_getnext(pg_attribute_scan)))
 	{
@@ -530,7 +530,7 @@ RelationBuildTupleDesc(Relation relation)
 		attp = (Form_pg_attribute) GETSTRUCT(pg_attribute_tuple);
 
 		if (attp->attnum <= 0 ||
-			attp->attnum > relation->rd_rel->relnatts)
+			attp->attnum > RelationGetNumberOfAttributes(relation))
 			elog(ERROR, "invalid attribute number %d for %s",
 				 attp->attnum, RelationGetRelationName(relation));
 
@@ -547,7 +547,7 @@ RelationBuildTupleDesc(Relation relation)
 			if (attrdef == NULL)
 				attrdef = (AttrDefault *)
 					MemoryContextAllocZero(CacheMemoryContext,
-										   relation->rd_rel->relnatts *
+										   RelationGetNumberOfAttributes(relation) *
 										   sizeof(AttrDefault));
 			attrdef[ndef].adnum = attp->attnum;
 			attrdef[ndef].adbin = NULL;
@@ -577,7 +577,7 @@ RelationBuildTupleDesc(Relation relation)
 	{
 		int			i;
 
-		for (i = 0; i < relation->rd_rel->relnatts; i++)
+		for (i = 0; i < RelationGetNumberOfAttributes(relation); i++)
 			Assert(relation->rd_att->attrs[i]->attcacheoff == -1);
 	}
 #endif
@@ -587,7 +587,7 @@ RelationBuildTupleDesc(Relation relation)
 	 * attribute: it must be zero.  This eliminates the need for special cases
 	 * for attnum=1 that used to exist in fastgetattr() and index_getattr().
 	 */
-	if (relation->rd_rel->relnatts > 0)
+	if (RelationGetNumberOfAttributes(relation) > 0)
 		relation->rd_att->attrs[0]->attcacheoff = 0;
 
 	/*
@@ -599,7 +599,7 @@ RelationBuildTupleDesc(Relation relation)
 
 		if (ndef > 0)			/* DEFAULTs */
 		{
-			if (ndef < relation->rd_rel->relnatts)
+			if (ndef < RelationGetNumberOfAttributes(relation))
 				constr->defval = (AttrDefault *)
 					repalloc(attrdef, ndef * sizeof(AttrDefault));
 			else
@@ -1205,7 +1205,8 @@ RelationInitIndexAccessInfo(Relation relation)
 	int2vector *indoption;
 	MemoryContext indexcxt;
 	MemoryContext oldcontext;
-	int			natts;
+	int			indnatts;
+	int			indnkeyatts;
 	uint16		amsupport;
 
 	/*
@@ -1235,10 +1236,11 @@ RelationInitIndexAccessInfo(Relation relation)
 	relation->rd_amhandler = aform->amhandler;
 	ReleaseSysCache(tuple);
 
-	natts = relation->rd_rel->relnatts;
-	if (natts != relation->rd_index->indnatts)
+	indnatts = RelationGetNumberOfAttributes(relation);
+	if (indnatts != IndexRelationGetNumberOfAttributes(relation))
 		elog(ERROR, "relnatts disagrees with indnatts for index %u",
 			 RelationGetRelid(relation));
+	indnkeyatts = IndexRelationGetNumberOfKeyAttributes(relation);
 
 	/*
 	 * Make the private context to hold index access info.  The reason we need
@@ -1264,14 +1266,14 @@ RelationInitIndexAccessInfo(Relation relation)
 	 * Allocate arrays to hold data
 	 */
 	relation->rd_opfamily = (Oid *)
-		MemoryContextAllocZero(indexcxt, natts * sizeof(Oid));
+		MemoryContextAllocZero(indexcxt, indnkeyatts * sizeof(Oid));
 	relation->rd_opcintype = (Oid *)
-		MemoryContextAllocZero(indexcxt, natts * sizeof(Oid));
+		MemoryContextAllocZero(indexcxt, indnkeyatts * sizeof(Oid));
 
 	amsupport = relation->rd_amroutine->amsupport;
 	if (amsupport > 0)
 	{
-		int			nsupport = natts * amsupport;
+		int			nsupport = indnatts * amsupport;
 
 		relation->rd_support = (RegProcedure *)
 			MemoryContextAllocZero(indexcxt, nsupport * sizeof(RegProcedure));
@@ -1285,10 +1287,10 @@ RelationInitIndexAccessInfo(Relation relation)
 	}
 
 	relation->rd_indcollation = (Oid *)
-		MemoryContextAllocZero(indexcxt, natts * sizeof(Oid));
+		MemoryContextAllocZero(indexcxt, indnatts * sizeof(Oid));
 
 	relation->rd_indoption = (int16 *)
-		MemoryContextAllocZero(indexcxt, natts * sizeof(int16));
+		MemoryContextAllocZero(indexcxt, indnatts * sizeof(int16));
 
 	/*
 	 * indcollation cannot be referenced directly through the C struct,
@@ -1301,7 +1303,7 @@ RelationInitIndexAccessInfo(Relation relation)
 							   &isnull);
 	Assert(!isnull);
 	indcoll = (oidvector *) DatumGetPointer(indcollDatum);
-	memcpy(relation->rd_indcollation, indcoll->values, natts * sizeof(Oid));
+	memcpy(relation->rd_indcollation, indcoll->values, indnatts * sizeof(Oid));
 
 	/*
 	 * indclass cannot be referenced directly through the C struct, because it
@@ -1322,7 +1324,7 @@ RelationInitIndexAccessInfo(Relation relation)
 	 */
 	IndexSupportInitialize(indclass, relation->rd_support,
 						   relation->rd_opfamily, relation->rd_opcintype,
-						   amsupport, natts);
+						   amsupport, indnkeyatts);
 
 	/*
 	 * Similarly extract indoption and copy it to the cache entry
@@ -1333,7 +1335,7 @@ RelationInitIndexAccessInfo(Relation relation)
 								 &isnull);
 	Assert(!isnull);
 	indoption = (int2vector *) DatumGetPointer(indoptionDatum);
-	memcpy(relation->rd_indoption, indoption->values, natts * sizeof(int16));
+	memcpy(relation->rd_indoption, indoption->values, indnatts * sizeof(int16));
 
 	/*
 	 * expressions, predicate, exclusion caches will be filled later
@@ -4324,7 +4326,6 @@ RelationGetIndexAttrBitmap(Relation relation, IndexAttrBitmapKind attrKind)
 			{
 				indexattrs = bms_add_member(indexattrs,
 							   attrnum - FirstLowInvalidHeapAttributeNumber);
-
 				if (isKey)
 					uindexattrs = bms_add_member(uindexattrs,
 							   attrnum - FirstLowInvalidHeapAttributeNumber);
@@ -4397,7 +4398,7 @@ RelationGetExclusionInfo(Relation indexRelation,
 						 Oid **procs,
 						 uint16 **strategies)
 {
-	int			ncols = indexRelation->rd_rel->relnatts;
+	int			indnkeyatts;
 	Oid		   *ops;
 	Oid		   *funcs;
 	uint16	   *strats;
@@ -4409,17 +4410,19 @@ RelationGetExclusionInfo(Relation indexRelation,
 	MemoryContext oldcxt;
 	int			i;
 
+	indnkeyatts = IndexRelationGetNumberOfKeyAttributes(indexRelation);
+
 	/* Allocate result space in caller context */
-	*operators = ops = (Oid *) palloc(sizeof(Oid) * ncols);
-	*procs = funcs = (Oid *) palloc(sizeof(Oid) * ncols);
-	*strategies = strats = (uint16 *) palloc(sizeof(uint16) * ncols);
+	*operators = ops = (Oid *) palloc(sizeof(Oid) * indnkeyatts);
+	*procs = funcs = (Oid *) palloc(sizeof(Oid) * indnkeyatts);
+	*strategies = strats = (uint16 *) palloc(sizeof(uint16) * indnkeyatts);
 
 	/* Quick exit if we have the data cached already */
 	if (indexRelation->rd_exclstrats != NULL)
 	{
-		memcpy(ops, indexRelation->rd_exclops, sizeof(Oid) * ncols);
-		memcpy(funcs, indexRelation->rd_exclprocs, sizeof(Oid) * ncols);
-		memcpy(strats, indexRelation->rd_exclstrats, sizeof(uint16) * ncols);
+		memcpy(ops, indexRelation->rd_exclops, sizeof(Oid) * indnkeyatts);
+		memcpy(funcs, indexRelation->rd_exclprocs, sizeof(Oid) * indnkeyatts);
+		memcpy(strats, indexRelation->rd_exclstrats, sizeof(uint16) * indnkeyatts);
 		return;
 	}
 
@@ -4468,12 +4471,12 @@ RelationGetExclusionInfo(Relation indexRelation,
 		arr = DatumGetArrayTypeP(val);	/* ensure not toasted */
 		nelem = ARR_DIMS(arr)[0];
 		if (ARR_NDIM(arr) != 1 ||
-			nelem != ncols ||
+			nelem != indnkeyatts ||
 			ARR_HASNULL(arr) ||
 			ARR_ELEMTYPE(arr) != OIDOID)
 			elog(ERROR, "conexclop is not a 1-D Oid array");
 
-		memcpy(ops, ARR_DATA_PTR(arr), sizeof(Oid) * ncols);
+		memcpy(ops, ARR_DATA_PTR(arr), sizeof(Oid) * indnkeyatts);
 	}
 
 	systable_endscan(conscan);
@@ -4484,7 +4487,7 @@ RelationGetExclusionInfo(Relation indexRelation,
 			 RelationGetRelationName(indexRelation));
 
 	/* We need the func OIDs and strategy numbers too */
-	for (i = 0; i < ncols; i++)
+	for (i = 0; i < indnkeyatts; i++)
 	{
 		funcs[i] = get_opcode(ops[i]);
 		strats[i] = get_op_opfamily_strategy(ops[i],
@@ -4497,12 +4500,12 @@ RelationGetExclusionInfo(Relation indexRelation,
 
 	/* Save a copy of the results in the relcache entry. */
 	oldcxt = MemoryContextSwitchTo(indexRelation->rd_indexcxt);
-	indexRelation->rd_exclops = (Oid *) palloc(sizeof(Oid) * ncols);
-	indexRelation->rd_exclprocs = (Oid *) palloc(sizeof(Oid) * ncols);
-	indexRelation->rd_exclstrats = (uint16 *) palloc(sizeof(uint16) * ncols);
-	memcpy(indexRelation->rd_exclops, ops, sizeof(Oid) * ncols);
-	memcpy(indexRelation->rd_exclprocs, funcs, sizeof(Oid) * ncols);
-	memcpy(indexRelation->rd_exclstrats, strats, sizeof(uint16) * ncols);
+	indexRelation->rd_exclops = (Oid *) palloc(sizeof(Oid) * indnkeyatts);
+	indexRelation->rd_exclprocs = (Oid *) palloc(sizeof(Oid) * indnkeyatts);
+	indexRelation->rd_exclstrats = (uint16 *) palloc(sizeof(uint16) * indnkeyatts);
+	memcpy(indexRelation->rd_exclops, ops, sizeof(Oid) * indnkeyatts);
+	memcpy(indexRelation->rd_exclprocs, funcs, sizeof(Oid) * indnkeyatts);
+	memcpy(indexRelation->rd_exclstrats, strats, sizeof(uint16) * indnkeyatts);
 	MemoryContextSwitchTo(oldcxt);
 }
 
diff --git a/src/backend/utils/sort/tuplesort.c b/src/backend/utils/sort/tuplesort.c
index a30e170..bc7a17b 100644
--- a/src/backend/utils/sort/tuplesort.c
+++ b/src/backend/utils/sort/tuplesort.c
@@ -813,7 +813,7 @@ tuplesort_begin_index_btree(Relation heapRel,
 	state->enforceUnique = enforceUnique;
 
 	indexScanKey = _bt_mkscankey_nodata(indexRel);
-	state->nKeys = RelationGetNumberOfAttributes(indexRel);
+	state->nKeys = IndexRelationGetNumberOfKeyAttributes(indexRel);
 
 	/* Prepare SortSupport data for each column */
 	state->sortKeys = (SortSupport) palloc0(state->nKeys *
diff --git a/src/bin/pg_dump/pg_dump.c b/src/bin/pg_dump/pg_dump.c
index 64c2673..f5d6c5c 100644
--- a/src/bin/pg_dump/pg_dump.c
+++ b/src/bin/pg_dump/pg_dump.c
@@ -5406,7 +5406,8 @@ getIndexes(Archive *fout, TableInfo tblinfo[], int numTables)
 				i_oid,
 				i_indexname,
 				i_indexdef,
-				i_indnkeys,
+				i_indnnkeyatts,
+				i_indnatts,
 				i_indkey,
 				i_indisclustered,
 				i_indisreplident,
@@ -5462,12 +5463,14 @@ getIndexes(Archive *fout, TableInfo tblinfo[], int numTables)
 			/*
 			 * the test on indisready is necessary in 9.2, and harmless in
 			 * earlier/later versions
+			 * TODO update version. i.indnkeyatts only exists since 9.6
 			 */
 			appendPQExpBuffer(query,
 							  "SELECT t.tableoid, t.oid, "
 							  "t.relname AS indexname, "
 					 "pg_catalog.pg_get_indexdef(i.indexrelid) AS indexdef, "
-							  "t.relnatts AS indnkeys, "
+							  "i.indnkeyatts AS indnkeyatts, "
+							  "i.indnatts AS indnatts, "
 							  "i.indkey, i.indisclustered, "
 							  "i.indisreplident, t.relpages, "
 							  "c.contype, c.conname, "
@@ -5668,7 +5671,8 @@ getIndexes(Archive *fout, TableInfo tblinfo[], int numTables)
 		i_oid = PQfnumber(res, "oid");
 		i_indexname = PQfnumber(res, "indexname");
 		i_indexdef = PQfnumber(res, "indexdef");
-		i_indnkeys = PQfnumber(res, "indnkeys");
+		i_indnnkeyatts = PQfnumber(res, "indnkeyatts");
+		i_indnatts = PQfnumber(res, "indnatts");
 		i_indkey = PQfnumber(res, "indkey");
 		i_indisclustered = PQfnumber(res, "indisclustered");
 		i_indisreplident = PQfnumber(res, "indisreplident");
@@ -5698,7 +5702,8 @@ getIndexes(Archive *fout, TableInfo tblinfo[], int numTables)
 			indxinfo[j].dobj.namespace = tbinfo->dobj.namespace;
 			indxinfo[j].indextable = tbinfo;
 			indxinfo[j].indexdef = pg_strdup(PQgetvalue(res, j, i_indexdef));
-			indxinfo[j].indnkeys = atoi(PQgetvalue(res, j, i_indnkeys));
+			indxinfo[j].indnkeyattrs = atoi(PQgetvalue(res, j, i_indnnkeyatts));
+			indxinfo[j].indnattrs = atoi(PQgetvalue(res, j, i_indnatts));
 			indxinfo[j].tablespace = pg_strdup(PQgetvalue(res, j, i_tablespace));
 			indxinfo[j].indreloptions = pg_strdup(PQgetvalue(res, j, i_indreloptions));
 
@@ -14844,7 +14849,7 @@ dumpConstraint(Archive *fout, ConstraintInfo *coninfo)
 		{
 			appendPQExpBuffer(q, "%s (",
 						 coninfo->contype == 'p' ? "PRIMARY KEY" : "UNIQUE");
-			for (k = 0; k < indxinfo->indnkeys; k++)
+			for (k = 0; k < indxinfo->indnkeyattrs; k++)
 			{
 				int			indkey = (int) indxinfo->indkeys[k];
 				const char *attname;
@@ -14858,6 +14863,23 @@ dumpConstraint(Archive *fout, ConstraintInfo *coninfo)
 								  fmtId(attname));
 			}
 
+			if (indxinfo->indnkeyattrs < indxinfo->indnattrs)
+				appendPQExpBuffer(q, ") INCLUDING (");
+
+			for (k = indxinfo->indnkeyattrs; k < indxinfo->indnattrs; k++)
+			{
+				int			indkey = (int) indxinfo->indkeys[k];
+				const char *attname;
+
+				if (indkey == InvalidAttrNumber)
+					break;
+				attname = getAttrName(indkey, tbinfo);
+
+				appendPQExpBuffer(q, "%s%s",
+								  (k == indxinfo->indnkeyattrs) ? "" : ", ",
+								  fmtId(attname));
+			}
+
 			appendPQExpBufferChar(q, ')');
 
 			if (nonemptyReloptions(indxinfo->indreloptions))
diff --git a/src/bin/pg_dump/pg_dump.h b/src/bin/pg_dump/pg_dump.h
index 9a1d8f8..0cf167c 100644
--- a/src/bin/pg_dump/pg_dump.h
+++ b/src/bin/pg_dump/pg_dump.h
@@ -285,8 +285,10 @@ typedef struct _indxInfo
 	char	   *indexdef;
 	char	   *tablespace;		/* tablespace in which index is stored */
 	char	   *indreloptions;	/* options specified by WITH (...) */
-	int			indnkeys;
-	Oid		   *indkeys;
+	int			indnkeyattrs;	/* number of index key attributes*/
+	int			indnattrs;		/* total number of index attributes*/
+	Oid		   *indkeys;		/* In spite of the name 'indkeys' this field
+								 * contains both key and nonkey attributes*/
 	bool		indisclustered;
 	bool		indisreplident;
 	/* if there is an associated constraint object, its dumpId: */
diff --git a/src/include/access/amapi.h b/src/include/access/amapi.h
index 35f1061..17e652c 100644
--- a/src/include/access/amapi.h
+++ b/src/include/access/amapi.h
@@ -142,6 +142,8 @@ typedef struct IndexAmRoutine
 	bool		amclusterable;
 	/* does AM handle predicate locks? */
 	bool		ampredlocks;
+	/* does AM support columns included with clause INCLUDING? */
+	bool		amcaninclude;
 	/* type of data stored in index, or InvalidOid if variable */
 	Oid			amkeytype;
 
diff --git a/src/include/access/itup.h b/src/include/access/itup.h
index 8350fa0..42f7ad0 100644
--- a/src/include/access/itup.h
+++ b/src/include/access/itup.h
@@ -18,6 +18,7 @@
 #include "access/tupmacs.h"
 #include "storage/bufpage.h"
 #include "storage/itemptr.h"
+#include "utils/rel.h"
 
 /*
  * Index tuple header structure
@@ -147,5 +148,6 @@ extern Datum nocache_index_getattr(IndexTuple tup, int attnum,
 extern void index_deform_tuple(IndexTuple tup, TupleDesc tupleDescriptor,
 				   Datum *values, bool *isnull);
 extern IndexTuple CopyIndexTuple(IndexTuple source);
+extern IndexTuple index_reform_tuple(Relation idxrel, IndexTuple olditup, int indnatts, int indnkeyatts);
 
 #endif   /* ITUP_H */
diff --git a/src/include/catalog/pg_index.h b/src/include/catalog/pg_index.h
index ee97c5d..fcbd18a 100644
--- a/src/include/catalog/pg_index.h
+++ b/src/include/catalog/pg_index.h
@@ -32,7 +32,8 @@ CATALOG(pg_index,2610) BKI_WITHOUT_OIDS BKI_SCHEMA_MACRO
 {
 	Oid			indexrelid;		/* OID of the index */
 	Oid			indrelid;		/* OID of the relation it indexes */
-	int16		indnatts;		/* number of columns in index */
+	int16		indnatts;		/* total number of columns in index */
+	int16		indnkeyatts;	/* number of key columns in index */
 	bool		indisunique;	/* is this a unique index? */
 	bool		indisprimary;	/* is this index for primary key? */
 	bool		indisexclusion; /* is this index for exclusion constraint? */
@@ -70,26 +71,27 @@ typedef FormData_pg_index *Form_pg_index;
  *		compiler constants for pg_index
  * ----------------
  */
-#define Natts_pg_index					19
+#define Natts_pg_index					20
 #define Anum_pg_index_indexrelid		1
 #define Anum_pg_index_indrelid			2
 #define Anum_pg_index_indnatts			3
-#define Anum_pg_index_indisunique		4
-#define Anum_pg_index_indisprimary		5
-#define Anum_pg_index_indisexclusion	6
-#define Anum_pg_index_indimmediate		7
-#define Anum_pg_index_indisclustered	8
-#define Anum_pg_index_indisvalid		9
-#define Anum_pg_index_indcheckxmin		10
-#define Anum_pg_index_indisready		11
-#define Anum_pg_index_indislive			12
-#define Anum_pg_index_indisreplident	13
-#define Anum_pg_index_indkey			14
-#define Anum_pg_index_indcollation		15
-#define Anum_pg_index_indclass			16
-#define Anum_pg_index_indoption			17
-#define Anum_pg_index_indexprs			18
-#define Anum_pg_index_indpred			19
+#define Anum_pg_index_indnkeyatts		4
+#define Anum_pg_index_indisunique		5
+#define Anum_pg_index_indisprimary		6
+#define Anum_pg_index_indisexclusion	7
+#define Anum_pg_index_indimmediate		8
+#define Anum_pg_index_indisclustered	9
+#define Anum_pg_index_indisvalid		10
+#define Anum_pg_index_indcheckxmin		11
+#define Anum_pg_index_indisready		12
+#define Anum_pg_index_indislive			13
+#define Anum_pg_index_indisreplident	14
+#define Anum_pg_index_indkey			15
+#define Anum_pg_index_indcollation		16
+#define Anum_pg_index_indclass			17
+#define Anum_pg_index_indoption			18
+#define Anum_pg_index_indexprs			19
+#define Anum_pg_index_indpred			20
 
 /*
  * Index AMs that support ordered scans must support these two indoption
diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h
index 064a050..15ef27d 100644
--- a/src/include/nodes/execnodes.h
+++ b/src/include/nodes/execnodes.h
@@ -33,9 +33,11 @@
  *		entries for a particular index.  Used for both index_build and
  *		retail creation of index entries.
  *
- *		NumIndexAttrs		number of columns in this index
+ *		NumIndexAttrs		total number of columns in this index
+ *		NumIndexKeyAttrs	number of key columns in index
  *		KeyAttrNumbers		underlying-rel attribute numbers used as keys
- *							(zeroes indicate expressions)
+ *							(zeroes indicate expressions). It also contains
+ * 							info about included columns.
  *		Expressions			expr trees for expression entries, or NIL if none
  *		ExpressionsState	exec state for expressions, or NIL if none
  *		Predicate			partial-index predicate, or NIL if none
@@ -58,7 +60,8 @@
 typedef struct IndexInfo
 {
 	NodeTag		type;
-	int			ii_NumIndexAttrs;
+	int			ii_NumIndexAttrs; /* total number of columns in index */
+	int			ii_NumIndexKeyAttrs; /* number of key columns in index */
 	AttrNumber	ii_KeyAttrNumbers[INDEX_MAX_KEYS];
 	List	   *ii_Expressions; /* list of Expr */
 	List	   *ii_ExpressionsState;	/* list of ExprState */
diff --git a/src/include/nodes/parsenodes.h b/src/include/nodes/parsenodes.h
index 2fd0629..a5d33e3 100644
--- a/src/include/nodes/parsenodes.h
+++ b/src/include/nodes/parsenodes.h
@@ -1834,7 +1834,8 @@ typedef struct Constraint
 	char	   *cooked_expr;	/* expr, as nodeToString representation */
 
 	/* Fields used for unique constraints (UNIQUE and PRIMARY KEY): */
-	List	   *keys;			/* String nodes naming referenced column(s) */
+	List	   *keys;			/* String nodes naming referenced key column(s) */
+	List	   *including;		/* String nodes naming referenced nonkey column(s) */
 
 	/* Fields used for EXCLUSION constraints: */
 	List	   *exclusions;		/* list of (IndexElem, operator name) pairs */
@@ -2426,6 +2427,8 @@ typedef struct IndexStmt
 	char	   *accessMethod;	/* name of access method (eg. btree) */
 	char	   *tableSpace;		/* tablespace, or NULL for default */
 	List	   *indexParams;	/* columns to index: a list of IndexElem */
+	List	   *indexIncludingParams;	/* additional columns to index:
+										 * a list of IndexElem */
 	List	   *options;		/* WITH clause options: a list of DefElem */
 	Node	   *whereClause;	/* qualification (partial-index predicate) */
 	List	   *excludeOpNames; /* exclusion operator names, or NIL if none */
diff --git a/src/include/nodes/relation.h b/src/include/nodes/relation.h
index 96198ae..7556190 100644
--- a/src/include/nodes/relation.h
+++ b/src/include/nodes/relation.h
@@ -512,11 +512,12 @@ typedef struct RelOptInfo
  * IndexOptInfo
  *		Per-index information for planning/optimization
  *
- *		indexkeys[], indexcollations[], opfamily[], and opcintype[]
- *		each have ncolumns entries.
+ *		indexkeys[], indexcollations[] each have ncolumns entries.
+ *		opfamily[], and opcintype[]	each have nkeycolumns entries. They do
+ *		not contain any information about included attributes.
  *
- *		sortopfamily[], reverse_sort[], and nulls_first[] likewise have
- *		ncolumns entries, if the index is ordered; but if it is unordered,
+ *		sortopfamily[], reverse_sort[], and nulls_first[] have
+ *		nkeycolumns entries, if the index is ordered; but if it is unordered,
  *		those pointers are NULL.
  *
  *		Zeroes in the indexkeys[] array indicate index columns that are
@@ -549,7 +550,9 @@ typedef struct IndexOptInfo
 
 	/* index descriptor information */
 	int			ncolumns;		/* number of columns in index */
-	int		   *indexkeys;		/* column numbers of index's keys, or 0 */
+	int			nkeycolumns;	/* number of key columns in index */
+	int		   *indexkeys;		/* column numbers of index's attributes
+								 * both key and included columns, or 0 */
 	Oid		   *indexcollations;	/* OIDs of collations of index columns */
 	Oid		   *opfamily;		/* OIDs of operator families for columns */
 	Oid		   *opcintype;		/* OIDs of opclass declared input data types */
diff --git a/src/include/utils/rel.h b/src/include/utils/rel.h
index f2bebf2..9b2a3ee 100644
--- a/src/include/utils/rel.h
+++ b/src/include/utils/rel.h
@@ -329,11 +329,25 @@ typedef struct ViewOptions
 
 /*
  * RelationGetNumberOfAttributes
- *		Returns the number of attributes in a relation.
+ *		Returns the total number of attributes in a relation.
  */
 #define RelationGetNumberOfAttributes(relation) ((relation)->rd_rel->relnatts)
 
 /*
+ * IndexRelationGetNumberOfAttributes
+ *		Returns the number of attributes in an index.
+ */
+#define IndexRelationGetNumberOfAttributes(relation) \
+		((relation)->rd_index->indnatts)
+
+/*
+ * IndexRelationGetNumberOfKeyAttributes
+ *		Returns the number of key attributes in an index.
+ */
+#define IndexRelationGetNumberOfKeyAttributes(relation) \
+		((relation)->rd_index->indnkeyatts)
+
+/*
  * RelationGetDescr
  *		Returns tuple descriptor for a relation.
  */
diff --git a/src/test/regress/expected/create_index.out b/src/test/regress/expected/create_index.out
index b72e65d..02488df 100644
--- a/src/test/regress/expected/create_index.out
+++ b/src/test/regress/expected/create_index.out
@@ -2376,6 +2376,25 @@ DETAIL:  Key ((f1 || f2))=(ABCDEF) already exists.
 -- but this shouldn't:
 INSERT INTO func_index_heap VALUES('QWERTY');
 --
+-- Test unique index with included columns
+--
+CREATE TABLE covering_index_heap (f1 int, f2 int, f3 text);
+CREATE UNIQUE INDEX covering_index_index on covering_index_heap (f1,f2) INCLUDING(f3);
+INSERT INTO covering_index_heap VALUES(1,1,'AAA');
+INSERT INTO covering_index_heap VALUES(1,2,'AAA');
+-- this should fail because of unique index on f1,f2:
+INSERT INTO covering_index_heap VALUES(1,2,'BBB');
+ERROR:  duplicate key value violates unique constraint "covering_index_index"
+DETAIL:  Key (f1, f2)=(1, 2) already exists.
+-- and this shouldn't:
+INSERT INTO covering_index_heap VALUES(1,4,'AAA');
+-- Try to build index on table that already contains data
+CREATE UNIQUE INDEX covering_pkey on covering_index_heap (f1,f2) INCLUDING(f3);
+-- Try to use existing covering index as primary key
+ALTER TABLE covering_index_heap ADD CONSTRAINT covering_pkey PRIMARY KEY USING INDEX
+covering_pkey;
+DROP TABLE covering_index_heap;
+--
 -- Also try building functional, expressional, and partial indexes on
 -- tables that already contain data.
 --
diff --git a/src/test/regress/sql/create_index.sql b/src/test/regress/sql/create_index.sql
index ff86953..3737157 100644
--- a/src/test/regress/sql/create_index.sql
+++ b/src/test/regress/sql/create_index.sql
@@ -722,6 +722,26 @@ INSERT INTO func_index_heap VALUES('ABCD', 'EF');
 INSERT INTO func_index_heap VALUES('QWERTY');
 
 --
+-- Test unique index with included columns
+--
+CREATE TABLE covering_index_heap (f1 int, f2 int, f3 text);
+CREATE UNIQUE INDEX covering_index_index on covering_index_heap (f1,f2) INCLUDING(f3);
+
+INSERT INTO covering_index_heap VALUES(1,1,'AAA');
+INSERT INTO covering_index_heap VALUES(1,2,'AAA');
+-- this should fail because of unique index on f1,f2:
+INSERT INTO covering_index_heap VALUES(1,2,'BBB');
+-- and this shouldn't:
+INSERT INTO covering_index_heap VALUES(1,4,'AAA');
+-- Try to build index on table that already contains data
+CREATE UNIQUE INDEX covering_pkey on covering_index_heap (f1,f2) INCLUDING(f3);
+-- Try to use existing covering index as primary key
+ALTER TABLE covering_index_heap ADD CONSTRAINT covering_pkey PRIMARY KEY USING INDEX
+covering_pkey;
+DROP TABLE covering_index_heap;
+
+
+--
 -- Also try building functional, expressional, and partial indexes on
 -- tables that already contain data.
 --
-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to