08.01.2016 00:12, David Rowley:
On 7 January 2016 at 06:36, Jeff Janes <jeff.ja...@gmail.com <mailto:jeff.ja...@gmail.com>> wrote:

    On Tue, Jan 5, 2016 at 11:55 PM, David Rowley
    <david.row...@2ndquadrant.com
    <mailto:david.row...@2ndquadrant.com>> wrote:
    > create table ab (a int,b int);
    > insert into ab select x,y from generate_series(1,20) x(x),
    > generate_series(10,1,-1) y(y);
    > create index on ab (a) including (b);
    > explain select * from ab order by a,b;
    >                         QUERY PLAN
    > ----------------------------------------------------------
    >  Sort  (cost=10.64..11.14 rows=200 width=8)
    >    Sort Key: a, b
    >    ->  Seq Scan on ab  (cost=0.00..3.00 rows=200 width=8)
    > (3 rows)

    If you set enable_sort=off, then you get the index-only scan with no
    sort.  So it believes the index can be used for ordering (correctly, I
    think), just sometimes it thinks it is not faster to do it that way.

    I'm not sure why this would be a correctness problem. The covered
    column does not participate in uniqueness checks, but it still usually
    participates in index ordering.  (That is why dummy op-classes are
    needed if you want to include non-sortable-type columns as being
    covered.)


If that's the case, then it appears that I've misunderstood INCLUDING. From reading _bt_doinsert() it appeared that it'll ignore the INCLUDING columns and just find the insert position based on the key columns. Yet that's not the way that it appears to work. I was also a bit confused, as from working with another database which has very similar syntax to this, that one only includes the columns to allow index only scans, and the included columns are not indexed, therefore can't be part of index quals and the index only provides a sorted path for the indexed columns, and not the included columns.

Thank you for properly testing. Order by clause in this case definitely doesn't work as expected. The problem is fixed by patching a planner function "build_index_pathkeys()'. It disables using of index if sorting of included columns is required.
Test example works correctly now - it always performs seq scan and sort.

Saying that, I'm now a bit confused to why the following does not produce 2 indexes which are the same size:

create table t1 (a int, b text);
insert into t1 select x,md5(random()::text) from generate_series(1,1000000) x(x);
create index t1_a_inc_b_idx on t1 (a) including (b);
create index t1_a_b_idx on t1 (a,b);
select pg_relation_Size('t1_a_b_idx'),pg_relation_size('t1_a_inc_b_idx');
 pg_relation_size | pg_relation_size
------------------+------------------
         59064320 |         58744832
(1 row)

I suppose you've already found that in discussion above. Included columns are stored only in leaf index pages. The difference is the size of attributes 'b' which are situatedin inner pages of index "t1_a_b_idx".

Also, if we want INCLUDING() to mean "uniqueness is not enforced on these columns, but they're still in the index", then I don't really think allowing types without a btree opclass is a good idea. It's likely too surprised filled and might not be what the user actually wants. I'd suggest that these non-indexed columns would be better defined by further expanding the syntax, the first (perhaps not very good) thing that comes to mind is:

create unique index idx_name on table (unique_col) also index (other,idx,cols) including (leaf,onlycols);

Looking up thread, I don't think I was the first to be confused by this.

Included columns are still in the index physically - they are stored in the index relation. But they are not indexedin the true sense of the word. It's impossible to use them for index scan or ordering. At the beginning, I've got an idea that included columns are supposed to be used for combination of unique index on one columns and covering on others. In a very rare instances one could prefer a non-unique index with included columns "t1_a_inc_b_idx"to a regular multicolumn index "t1_a_b_idx". Frankly, I didn't see such use cases at all. Index size reduction is not considerable, while we lose some useful index functionality on included column. I think that it should be mentioned as a note in documentation, but I need help to phrase it clear.

But now I see the reason to create non-unique index with included columns - lack of suitable opclass on column "b". It's impossible to add it into the index as a key column, but that's not a problem with INCLUDING clause.
Look at example.

create table t1 (a int, b box);
create index t1_a_inc_b_idx on t1 (a) including (b);
create index on tbl (a,b);
ERROR: data type box has no default operator class for access method "btree" HINT: You must specify an operator class for the index or define a default operator class for the data type.
create index on tbl (a) including (b);
CREATE INDEX

This functionality is provided by the attached patch "omit_opclass_4.0", which must be applied over covering_unique_4.0.patch.

I see what you were confused about, I'd had the same question at the very beginning of the discussion of this patch. Now it seems a bit more clear to me. INCLUDING columns are not used for the searching or ordering of records, so there is no need to check whether they have an opclass. INCLUDING columns perform as expected and it agrees with other database experience. And this patch is completed.

But it isn't perfect definitely... I found test case to explain that. See below. That's why we need optional_opclass functionality, which will use opclass where possible and omit it in other cases. This idea have been already described in a message Re: [PROPOSAL] Covering + unique indexes <http://www.postgresql.org/message-id/55f84df4.5030...@postgrespro.ru>as "partially unique index". I suggest to separate optional_opclass task to ease syntax discussion and following review. And I'll implement it in the next patch a bit later.

Test case:
1) patch covering_unique_4.0 + test_covering_unique_4.0
If included columns' opclasses are used, new query plan is the same with the old one.
and have nearly the same execution time:

                                                         QUERY PLAN
----------------------------------------------------------------------------------------------------------------------------
Index Only Scan using oldcoveringidx on oldt (cost=0.43..301.72 rows=1 width=8) (actual time=0.021..0.676 rows=6 loops=1)
   Index Cond: ((c1 < 10000) AND (c3 < 20))
   Heap Fetches: 0
 Planning time: 0.101 ms
 Execution time: 0.697 ms
(5 rows)

                                                     QUERY PLAN
--------------------------------------------------------------------------------------------------------------------
Index Only Scan using newidx on newt (cost=0.43..276.51 rows=1 width=8) (actual time=0.020..0.665 rows=6 loops=1)
   Index Cond: ((c1 < 10000) AND (c3 < 20))
   Heap Fetches: 0
 Planning time: 0.082 ms
 Execution time: 0.687 ms
(5 rows)

2) patch covering_unique_4.0 + patch omit_opclass_4.0 + test_covering_unique_4.0 Otherwise, new query can not use included column in Index Cond and uses filter instead. It slows down the query significantly.
                                                         QUERY PLAN
----------------------------------------------------------------------------------------------------------------------------
Index Only Scan using oldcoveringidx on oldt (cost=0.43..230.39 rows=1 width=8) (actual time=0.021..0.722 rows=6 loops=1)
   Index Cond: ((c1 < 10000) AND (c3 < 20))
   Heap Fetches: 0
 Planning time: 0.091 ms
 Execution time: 0.744 ms
(5 rows)

                                                     QUERY PLAN
--------------------------------------------------------------------------------------------------------------------
Index Only Scan using newidx on newt (cost=0.43..374.68 rows=1 width=8) (actual time=0.018..2.595 rows=6 loops=1)
   Index Cond: (c1 < 10000)
   Filter: (c3 < 20)
   Rows Removed by Filter: 9993
   Heap Fetches: 0
 Planning time: 0.078 ms
 Execution time: 2.612 ms

--
Anastasia Lubennikova
Postgres Professional:http://www.postgrespro.com
The Russian Postgres Company


diff --git a/doc/src/sgml/catalogs.sgml b/doc/src/sgml/catalogs.sgml
index 97ef618..d17a06c 100644
--- a/doc/src/sgml/catalogs.sgml
+++ b/doc/src/sgml/catalogs.sgml
@@ -644,6 +644,13 @@
       <entry>Does an index of this type manage fine-grained predicate locks?</entry>
      </row>
 
+      <row>
+      <entry><structfield>amcaninclude</structfield></entry>
+      <entry><type>bool</type></entry>
+      <entry></entry>
+      <entry>Does the access method support included columns?</entry>
+     </row>
+
      <row>
       <entry><structfield>amkeytype</structfield></entry>
       <entry><type>oid</type></entry>
@@ -3714,6 +3721,14 @@
       <literal>pg_class.relnatts</literal>)</entry>
      </row>
 
+      <row>
+      <entry><structfield>indnkeyatts</structfield></entry>
+      <entry><type>int2</type></entry>
+      <entry></entry>
+      <entry>The number of key columns in the index. "Key columns" are ordinary
+      index columns in contrast with "included" columns.</entry>
+     </row>
+
      <row>
       <entry><structfield>indisunique</structfield></entry>
       <entry><type>bool</type></entry>
diff --git a/doc/src/sgml/indexam.sgml b/doc/src/sgml/indexam.sgml
index 1c09bae..a102391 100644
--- a/doc/src/sgml/indexam.sgml
+++ b/doc/src/sgml/indexam.sgml
@@ -765,9 +765,10 @@ amrestrpos (IndexScanDesc scan);
   <para>
    <productname>PostgreSQL</productname> enforces SQL uniqueness constraints
    using <firstterm>unique indexes</>, which are indexes that disallow
-   multiple entries with identical keys.  An access method that supports this
+   multiple entries with identical keys. An access method that supports this
    feature sets <structname>pg_am</>.<structfield>amcanunique</> true.
-   (At present, only b-tree supports it.)
+   Columns included with clause INCLUDING  aren't used to enforce uniqueness.
+   (At present, only b-tree supports them.)
   </para>
 
   <para>
diff --git a/doc/src/sgml/indices.sgml b/doc/src/sgml/indices.sgml
index 23bbec6..44dddb6 100644
--- a/doc/src/sgml/indices.sgml
+++ b/doc/src/sgml/indices.sgml
@@ -633,7 +633,8 @@ CREATE INDEX test3_desc_index ON test3 (id DESC NULLS LAST);
    Indexes can also be used to enforce uniqueness of a column's value,
    or the uniqueness of the combined values of more than one column.
 <synopsis>
-CREATE UNIQUE INDEX <replaceable>name</replaceable> ON <replaceable>table</replaceable> (<replaceable>column</replaceable> <optional>, ...</optional>);
+CREATE UNIQUE INDEX <replaceable>name</replaceable> ON <replaceable>table</replaceable> (<replaceable>column</replaceable> <optional>, ...</optional>)
+<optional>INCLUDING (<replaceable>column</replaceable> <optional>, ...</optional>)</optional>;
 </synopsis>
    Currently, only B-tree indexes can be declared unique.
   </para>
@@ -642,7 +643,8 @@ CREATE UNIQUE INDEX <replaceable>name</replaceable> ON <replaceable>table</repla
    When an index is declared unique, multiple table rows with equal
    indexed values are not allowed.  Null values are not considered
    equal.  A multicolumn unique index will only reject cases where all
-   indexed columns are equal in multiple rows.
+   indexed columns are equal in multiple rows. Columns included with clause
+   <literal>INCLUDING</literal> aren't used to enforce constraints (UNIQUE, PRIMARY KEY, etc).
   </para>
 
   <para>
diff --git a/doc/src/sgml/ref/create_index.sgml b/doc/src/sgml/ref/create_index.sgml
index ce36a1b..8360bb6 100644
--- a/doc/src/sgml/ref/create_index.sgml
+++ b/doc/src/sgml/ref/create_index.sgml
@@ -23,6 +23,7 @@ PostgreSQL documentation
 <synopsis>
 CREATE [ UNIQUE ] INDEX [ CONCURRENTLY ] [ [ IF NOT EXISTS ] <replaceable class="parameter">name</replaceable> ] ON <replaceable class="parameter">table_name</replaceable> [ USING <replaceable class="parameter">method</replaceable> ]
     ( { <replaceable class="parameter">column_name</replaceable> | ( <replaceable class="parameter">expression</replaceable> ) } [ COLLATE <replaceable class="parameter">collation</replaceable> ] [ <replaceable class="parameter">opclass</replaceable> ] [ ASC | DESC ] [ NULLS { FIRST | LAST } ] [, ...] )
+    [ INCLUDING ( { <replaceable class="parameter">column_name</replaceable> | ( <replaceable class="parameter">expression</replaceable> ) } [ COLLATE <replaceable class="parameter">collation</replaceable> ] [ <replaceable class="parameter">opclass</replaceable> ] [ ASC | DESC ] [ NULLS { FIRST | LAST } ] [, ...] )
     [ WITH ( <replaceable class="PARAMETER">storage_parameter</replaceable> = <replaceable class="PARAMETER">value</replaceable> [, ... ] ) ]
     [ TABLESPACE <replaceable class="parameter">tablespace_name</replaceable> ]
     [ WHERE <replaceable class="parameter">predicate</replaceable> ]
@@ -139,6 +140,32 @@ CREATE [ UNIQUE ] INDEX [ CONCURRENTLY ] [ [ IF NOT EXISTS ] <replaceable class=
      </varlistentry>
 
      <varlistentry>
+      <term><literal>INCLUDING</literal></term>
+      <listitem>
+       <para>
+        An optional <literal>INCLUDING</> clause allows a list of columns to be
+        specified which will be included in the index, in the non-key portion of
+        the index. Columns which are part of this clause cannot also exist in the
+        key columns portion of the index, and vice versa. The
+        <literal>INCLUDING</> columns exist solely to allow more queries to benefit
+        from <firstterm>index-only scans</> by including certain columns in the
+        index, the value of which would otherwise have to be obtained by reading
+        the table's heap. Having these columns in the <literal>INCLUDING</> clause
+        in some cases allows <productname>PostgreSQL</> to skip the heap read
+        completely. This also allows <literal>UNIQUE</> indexes to be defined on
+        one set of columns, which can include another set of column in the
+        <literal>INCLUDING</> clause, on which the uniqueness is not enforced upon.
+        It's the same with other constraints (PRIMARY KEY and EXCLUDE). This can
+        also can be used for non-unique indexes as any columns which are not required
+        for the searching or ordering of records can defined in the
+        <literal>INCLUDING</> clause, which can slightly reduce the size of the index,
+        due to storing included attributes only in leaf index pages.
+        Currently, only the B-tree access method supports this feature.
+       </para>
+      </listitem>
+     </varlistentry>
+
+     <varlistentry>
       <term><replaceable class="parameter">name</replaceable></term>
       <listitem>
        <para>
@@ -596,7 +623,7 @@ Indexes:
   <title>Examples</title>
 
   <para>
-   To create a B-tree index on the column <literal>title</literal> in
+   To create a unique B-tree index on the column <literal>title</literal> in
    the table <literal>films</literal>:
 <programlisting>
 CREATE UNIQUE INDEX title_idx ON films (title);
@@ -604,6 +631,15 @@ CREATE UNIQUE INDEX title_idx ON films (title);
   </para>
 
   <para>
+   To create a unique B-tree index on the column <literal>title</literal>
+   and included columns <literal>director</literal> and <literal>rating</literal>
+   in the table <literal>films</literal>:
+<programlisting>
+CREATE UNIQUE INDEX title_idx ON films (title) INCLUDING (director, rating);
+</programlisting>
+  </para>
+
+  <para>
    To create an index on the expression <literal>lower(title)</>,
    allowing efficient case-insensitive searches:
 <programlisting>
diff --git a/src/backend/access/common/indextuple.c b/src/backend/access/common/indextuple.c
index dc588d7..3adff5e 100644
--- a/src/backend/access/common/indextuple.c
+++ b/src/backend/access/common/indextuple.c
@@ -19,6 +19,7 @@
 #include "access/heapam.h"
 #include "access/itup.h"
 #include "access/tuptoaster.h"
+#include "utils/rel.h"
 
 
 /* ----------------------------------------------------------------
@@ -441,3 +442,30 @@ CopyIndexTuple(IndexTuple source)
 	memcpy(result, source, size);
 	return result;
 }
+
+/*
+ * Reform index tuple. Truncate nonkey (INCLUDING) attributes.
+ */
+IndexTuple
+index_reform_tuple(Relation idxrel, IndexTuple olditup, int natts, int nkeyatts)
+{
+	TupleDesc   itupdesc = RelationGetDescr(idxrel);
+	Datum       values[INDEX_MAX_KEYS];
+	bool        isnull[INDEX_MAX_KEYS];
+	IndexTuple	newitup;
+
+	Assert(natts <= INDEX_MAX_KEYS);
+	Assert(nkeyatts > 0);
+	Assert(nkeyatts <= natts);
+
+	index_deform_tuple(olditup, itupdesc, values, isnull);
+
+	/* form new tuple that will contain only key attributes */
+	itupdesc->natts = nkeyatts;
+	newitup = index_form_tuple(itupdesc, values, isnull);
+	newitup->t_tid = olditup->t_tid;
+
+	itupdesc->natts = natts;
+
+	return newitup;
+}
diff --git a/src/backend/access/nbtree/nbtinsert.c b/src/backend/access/nbtree/nbtinsert.c
index 77c2fdf..404e312 100644
--- a/src/backend/access/nbtree/nbtinsert.c
+++ b/src/backend/access/nbtree/nbtinsert.c
@@ -108,18 +108,22 @@ _bt_doinsert(Relation rel, IndexTuple itup,
 			 IndexUniqueCheck checkUnique, Relation heapRel)
 {
 	bool		is_unique = false;
-	int			natts = rel->rd_rel->relnatts;
+	int			nkeyatts;
 	ScanKey		itup_scankey;
 	BTStack		stack;
 	Buffer		buf;
 	OffsetNumber offset;
 
+	Assert(rel->rd_index->indnatts != 0);
+	Assert(rel->rd_index->indnkeyatts != 0);
+	nkeyatts = IndexRelationGetNumberOfKeyAttributes(rel);
+
 	/* we need an insertion scan key to do our search, so build one */
 	itup_scankey = _bt_mkscankey(rel, itup);
 
 top:
 	/* find the first page containing this key */
-	stack = _bt_search(rel, natts, itup_scankey, false, &buf, BT_WRITE);
+	stack = _bt_search(rel, nkeyatts, itup_scankey, false, &buf, BT_WRITE);
 
 	offset = InvalidOffsetNumber;
 
@@ -134,7 +138,7 @@ top:
 	 * move right in the tree.  See Lehman and Yao for an excruciatingly
 	 * precise description.
 	 */
-	buf = _bt_moveright(rel, buf, natts, itup_scankey, false,
+	buf = _bt_moveright(rel, buf, nkeyatts, itup_scankey, false,
 						true, stack, BT_WRITE);
 
 	/*
@@ -163,7 +167,7 @@ top:
 		TransactionId xwait;
 		uint32		speculativeToken;
 
-		offset = _bt_binsrch(rel, buf, natts, itup_scankey, false);
+		offset = _bt_binsrch(rel, buf, nkeyatts, itup_scankey, false);
 		xwait = _bt_check_unique(rel, itup, heapRel, buf, offset, itup_scankey,
 								 checkUnique, &is_unique, &speculativeToken);
 
@@ -199,7 +203,7 @@ top:
 		 */
 		CheckForSerializableConflictIn(rel, NULL, buf);
 		/* do the insertion */
-		_bt_findinsertloc(rel, &buf, &offset, natts, itup_scankey, itup,
+		_bt_findinsertloc(rel, &buf, &offset, nkeyatts, itup_scankey, itup,
 						  stack, heapRel);
 		_bt_insertonpg(rel, buf, InvalidBuffer, stack, itup, offset, false);
 	}
@@ -242,7 +246,7 @@ _bt_check_unique(Relation rel, IndexTuple itup, Relation heapRel,
 				 uint32 *speculativeToken)
 {
 	TupleDesc	itupdesc = RelationGetDescr(rel);
-	int			natts = rel->rd_rel->relnatts;
+	int			nkeyatts = rel->rd_index->indnkeyatts;
 	SnapshotData SnapshotDirty;
 	OffsetNumber maxoff;
 	Page		page;
@@ -250,6 +254,9 @@ _bt_check_unique(Relation rel, IndexTuple itup, Relation heapRel,
 	Buffer		nbuf = InvalidBuffer;
 	bool		found = false;
 
+	Assert(rel->rd_index->indnatts != 0);
+	Assert(rel->rd_index->indnkeyatts != 0);
+
 	/* Assume unique until we find a duplicate */
 	*is_unique = true;
 
@@ -301,7 +308,7 @@ _bt_check_unique(Relation rel, IndexTuple itup, Relation heapRel,
 				 * in real comparison, but only for ordering/finding items on
 				 * pages. - vadim 03/24/97
 				 */
-				if (!_bt_isequal(itupdesc, page, offset, natts, itup_scankey))
+				if (!_bt_isequal(itupdesc, page, offset, nkeyatts, itup_scankey))
 					break;		/* we're past all the equal tuples */
 
 				/* okay, we gotta fetch the heap tuple ... */
@@ -457,7 +464,7 @@ _bt_check_unique(Relation rel, IndexTuple itup, Relation heapRel,
 			if (P_RIGHTMOST(opaque))
 				break;
 			if (!_bt_isequal(itupdesc, page, P_HIKEY,
-							 natts, itup_scankey))
+							 nkeyatts, itup_scankey))
 				break;
 			/* Advance to next non-dead page --- there must be one */
 			for (;;)
@@ -745,6 +752,14 @@ _bt_insertonpg(Relation rel,
 		elog(ERROR, "cannot insert to incompletely split page %u",
 			 BufferGetBlockNumber(buf));
 
+	/* Truncate nonkey attributes when inserting on nonleaf pages. */
+	if (rel->rd_index->indnatts != rel->rd_index->indnkeyatts
+		&& !P_ISLEAF(lpageop))
+	{
+			itup = index_reform_tuple(rel, itup,
+				rel->rd_index->indnatts, rel->rd_index->indnkeyatts);
+	}
+
 	itemsz = IndexTupleDSize(*itup);
 	itemsz = MAXALIGN(itemsz);	/* be safe, PageAddItem will do this but we
 								 * need to be consistent */
@@ -1962,6 +1977,7 @@ _bt_newroot(Relation rel, Buffer lbuf, Buffer rbuf)
 	right_item_sz = ItemIdGetLength(itemid);
 	item = (IndexTuple) PageGetItem(lpage, itemid);
 	right_item = CopyIndexTuple(item);
+	right_item = index_reform_tuple(rel, right_item, rel->rd_index->indnatts, rel->rd_index->indnkeyatts);
 	ItemPointerSet(&(right_item->t_tid), rbkno, P_HIKEY);
 
 	/* NO EREPORT(ERROR) from here till newroot op is logged */
diff --git a/src/backend/access/nbtree/nbtpage.c b/src/backend/access/nbtree/nbtpage.c
index 6e65db9..131bbc2 100644
--- a/src/backend/access/nbtree/nbtpage.c
+++ b/src/backend/access/nbtree/nbtpage.c
@@ -1254,8 +1254,9 @@ _bt_pagedel(Relation rel, Buffer buf)
 				/* we need an insertion scan key for the search, so build one */
 				itup_scankey = _bt_mkscankey(rel, targetkey);
 				/* find the leftmost leaf page containing this key */
-				stack = _bt_search(rel, rel->rd_rel->relnatts, itup_scankey,
-								   false, &lbuf, BT_READ);
+				stack = _bt_search(rel,
+								   IndexRelationGetNumberOfKeyAttributes(rel),
+								   itup_scankey, false, &lbuf, BT_READ);
 				/* don't need a pin on the page */
 				_bt_relbuf(rel, lbuf);
 
diff --git a/src/backend/access/nbtree/nbtsort.c b/src/backend/access/nbtree/nbtsort.c
index f95f67a..f8eb66d 100644
--- a/src/backend/access/nbtree/nbtsort.c
+++ b/src/backend/access/nbtree/nbtsort.c
@@ -593,6 +593,22 @@ _bt_buildadd(BTWriteState *wstate, BTPageState *state, IndexTuple itup)
 		state->btps_minkey = CopyIndexTuple(itup);
 	}
 
+	/* Truncate nonkey attributes when inserting on nonleaf pages */
+	if (wstate->index->rd_index->indnatts
+		!= wstate->index->rd_index->indnkeyatts)
+	{
+		BTPageOpaque pageop = (BTPageOpaque) PageGetSpecialPointer(npage);
+
+		if (!P_ISLEAF(pageop))
+		{
+			itup = index_reform_tuple(wstate->index,
+					itup, wstate->index->rd_index->indnatts,
+					wstate->index->rd_index->indnkeyatts);
+			itupsz = IndexTupleDSize(*itup);
+			itupsz = MAXALIGN(itupsz);
+		}
+	}
+
 	/*
 	 * Add the new item into the current page.
 	 */
diff --git a/src/backend/access/nbtree/nbtutils.c b/src/backend/access/nbtree/nbtutils.c
index 91331ba..2192da4 100644
--- a/src/backend/access/nbtree/nbtutils.c
+++ b/src/backend/access/nbtree/nbtutils.c
@@ -63,17 +63,23 @@ _bt_mkscankey(Relation rel, IndexTuple itup)
 {
 	ScanKey		skey;
 	TupleDesc	itupdesc;
-	int			natts;
-	int16	   *indoption;
+	int			nkeyatts;
+	int16	   *indoption = rel->rd_indoption;
 	int			i;
-
 	itupdesc = RelationGetDescr(rel);
-	natts = RelationGetNumberOfAttributes(rel);
-	indoption = rel->rd_indoption;
 
-	skey = (ScanKey) palloc(natts * sizeof(ScanKeyData));
+	Assert(rel->rd_index->indnkeyatts != 0);
+	Assert(rel->rd_index->indnkeyatts <= rel->rd_index->indnatts);
 
-	for (i = 0; i < natts; i++)
+	nkeyatts = rel->rd_index->indnkeyatts;
+
+	/*
+	 * We'll execute search using ScanKey constructed on key columns.
+	 * Non key (included) columns must be omitted.
+	 */
+	skey = (ScanKey) palloc(nkeyatts * sizeof(ScanKeyData));
+
+	for (i = 0; i < nkeyatts; i++)
 	{
 		FmgrInfo   *procinfo;
 		Datum		arg;
diff --git a/src/backend/bootstrap/bootparse.y b/src/backend/bootstrap/bootparse.y
index d8d1b06..002bcd5 100644
--- a/src/backend/bootstrap/bootparse.y
+++ b/src/backend/bootstrap/bootparse.y
@@ -293,6 +293,7 @@ Boot_DeclareIndexStmt:
 					stmt->accessMethod = $8;
 					stmt->tableSpace = NULL;
 					stmt->indexParams = $10;
+					stmt->indexIncludingParams = NIL;
 					stmt->options = NIL;
 					stmt->whereClause = NULL;
 					stmt->excludeOpNames = NIL;
@@ -336,6 +337,7 @@ Boot_DeclareUniqueIndexStmt:
 					stmt->accessMethod = $9;
 					stmt->tableSpace = NULL;
 					stmt->indexParams = $11;
+					stmt->indexIncludingParams = NIL;
 					stmt->options = NIL;
 					stmt->whereClause = NULL;
 					stmt->excludeOpNames = NIL;
diff --git a/src/backend/catalog/index.c b/src/backend/catalog/index.c
index c10be3d..d3e980e 100644
--- a/src/backend/catalog/index.c
+++ b/src/backend/catalog/index.c
@@ -213,7 +213,7 @@ index_check_primary_key(Relation heapRel,
 	 * null, otherwise attempt to ALTER TABLE .. SET NOT NULL
 	 */
 	cmds = NIL;
-	for (i = 0; i < indexInfo->ii_NumIndexAttrs; i++)
+	for (i = 0; i < indexInfo->ii_NumIndexKeyAttrs; i++)
 	{
 		AttrNumber	attnum = indexInfo->ii_KeyAttrNumbers[i];
 		HeapTuple	atttuple;
@@ -608,6 +608,7 @@ UpdateIndexRelation(Oid indexoid,
 	values[Anum_pg_index_indexrelid - 1] = ObjectIdGetDatum(indexoid);
 	values[Anum_pg_index_indrelid - 1] = ObjectIdGetDatum(heapoid);
 	values[Anum_pg_index_indnatts - 1] = Int16GetDatum(indexInfo->ii_NumIndexAttrs);
+	values[Anum_pg_index_indnkeyatts - 1] = Int16GetDatum(indexInfo->ii_NumIndexKeyAttrs);
 	values[Anum_pg_index_indisunique - 1] = BoolGetDatum(indexInfo->ii_Unique);
 	values[Anum_pg_index_indisprimary - 1] = BoolGetDatum(primary);
 	values[Anum_pg_index_indisexclusion - 1] = BoolGetDatum(isexclusion);
@@ -1071,6 +1072,8 @@ index_create(Relation heapRelation,
 	else
 		Assert(indexRelation->rd_indexcxt != NULL);
 
+	indexRelation->rd_index->indnkeyatts = indexInfo->ii_NumIndexKeyAttrs;
+
 	/*
 	 * If this is bootstrap (initdb) time, then we don't actually fill in the
 	 * index yet.  We'll be creating more indexes and classes later, so we
@@ -1191,7 +1194,7 @@ index_constraint_create(Relation heapRelation,
 								   true,
 								   RelationGetRelid(heapRelation),
 								   indexInfo->ii_KeyAttrNumbers,
-								   indexInfo->ii_NumIndexAttrs,
+								   indexInfo->ii_NumIndexKeyAttrs,
 								   InvalidOid,	/* no domain */
 								   indexRelationId,		/* index OID */
 								   InvalidOid,	/* no foreign key */
@@ -1639,6 +1642,10 @@ BuildIndexInfo(Relation index)
 		elog(ERROR, "invalid indnatts %d for index %u",
 			 numKeys, RelationGetRelid(index));
 	ii->ii_NumIndexAttrs = numKeys;
+	ii->ii_NumIndexKeyAttrs = indexStruct->indnkeyatts;
+	Assert(ii->ii_NumIndexKeyAttrs != 0);
+	Assert(ii->ii_NumIndexKeyAttrs <= ii->ii_NumIndexAttrs);
+
 	for (i = 0; i < numKeys; i++)
 		ii->ii_KeyAttrNumbers[i] = indexStruct->indkey.values[i];
 
diff --git a/src/backend/catalog/indexing.c b/src/backend/catalog/indexing.c
index 0231084..89e20fa 100644
--- a/src/backend/catalog/indexing.c
+++ b/src/backend/catalog/indexing.c
@@ -119,6 +119,7 @@ CatalogIndexInsert(CatalogIndexState indstate, HeapTuple heapTuple)
 		Assert(indexInfo->ii_Predicate == NIL);
 		Assert(indexInfo->ii_ExclusionOps == NULL);
 		Assert(relationDescs[i]->rd_index->indimmediate);
+		Assert(indexInfo->ii_NumIndexKeyAttrs != 0);
 
 		/*
 		 * FormIndexDatum fills in its values and isnull parameters with the
diff --git a/src/backend/catalog/toasting.c b/src/backend/catalog/toasting.c
index 3652d7b..fc47a37 100644
--- a/src/backend/catalog/toasting.c
+++ b/src/backend/catalog/toasting.c
@@ -313,6 +313,7 @@ create_toast_table(Relation rel, Oid toastOid, Oid toastIndexOid,
 
 	indexInfo = makeNode(IndexInfo);
 	indexInfo->ii_NumIndexAttrs = 2;
+	indexInfo->ii_NumIndexKeyAttrs = 2;
 	indexInfo->ii_KeyAttrNumbers[0] = 1;
 	indexInfo->ii_KeyAttrNumbers[1] = 2;
 	indexInfo->ii_Expressions = NIL;
diff --git a/src/backend/commands/indexcmds.c b/src/backend/commands/indexcmds.c
index b450bcf..f5ca83a 100644
--- a/src/backend/commands/indexcmds.c
+++ b/src/backend/commands/indexcmds.c
@@ -208,7 +208,7 @@ CheckIndexCompatible(Oid oldId,
 	}
 
 	/* Any change in operator class or collation breaks compatibility. */
-	old_natts = indexForm->indnatts;
+	old_natts = indexForm->indnkeyatts;
 	Assert(old_natts == numberOfAttributes);
 
 	d = SysCacheGetAttr(INDEXRELID, tuple, Anum_pg_index_indcollation, &isnull);
@@ -320,7 +320,8 @@ DefineIndex(Oid relationId,
 	Datum		reloptions;
 	int16	   *coloptions;
 	IndexInfo  *indexInfo;
-	int			numberOfAttributes;
+	int			numberOfAttributes,
+				numberOfKeyAttributes;
 	TransactionId limitXmin;
 	VirtualTransactionId *old_snapshots;
 	ObjectAddress address;
@@ -331,10 +332,31 @@ DefineIndex(Oid relationId,
 	Snapshot	snapshot;
 	int			i;
 
+	if(list_intersection(stmt->indexParams, stmt->indexIncludingParams) != NIL)
+		ereport(ERROR,
+				(errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
+				 errmsg("included columns must not intersect with key columns")));
+
 	/*
 	 * count attributes in index
 	 */
+	numberOfKeyAttributes = list_length(stmt->indexParams);
+	if (numberOfKeyAttributes <= 0)
+		ereport(ERROR,
+				(errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
+				 errmsg("must specify at least one key column")));
+
+	/*
+	 * We append any INCLUDING columns onto the indexParams list so that
+	 * we have one list with all columns. Later we can determine which of these
+	 * are key columns, and which are just part of the INCLUDING list by check the list
+	 * position. A list item in a position less than ii_NumIndexKeyAttrs is part of
+	 * the key columns, and anything equal to and over is part of the
+	 * INCLUDING columns.
+	 */
+	stmt->indexParams = list_concat(stmt->indexParams, stmt->indexIncludingParams);
 	numberOfAttributes = list_length(stmt->indexParams);
+
 	if (numberOfAttributes <= 0)
 		ereport(ERROR,
 				(errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
@@ -466,6 +488,7 @@ DefineIndex(Oid relationId,
 	 * look up the access method, verify it can handle the requested features
 	 */
 	accessMethodName = stmt->accessMethod;
+
 	tuple = SearchSysCache1(AMNAME, PointerGetDatum(accessMethodName));
 	if (!HeapTupleIsValid(tuple))
 	{
@@ -511,6 +534,12 @@ DefineIndex(Oid relationId,
 		errmsg("access method \"%s\" does not support exclusion constraints",
 			   accessMethodName)));
 
+	if (list_length(stmt->indexIncludingParams) > 0 && !accessMethodForm->amcaninclude)
+		ereport(ERROR,
+		(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+		errmsg("access method \"%s\" does not support included columns",
+				accessMethodName)));
+
 	amcanorder = accessMethodForm->amcanorder;
 	amoptions = accessMethodForm->amoptions;
 
@@ -536,6 +565,7 @@ DefineIndex(Oid relationId,
 	 */
 	indexInfo = makeNode(IndexInfo);
 	indexInfo->ii_NumIndexAttrs = numberOfAttributes;
+	indexInfo->ii_NumIndexKeyAttrs = numberOfKeyAttributes;
 	indexInfo->ii_Expressions = NIL;	/* for now */
 	indexInfo->ii_ExpressionsState = NIL;
 	indexInfo->ii_Predicate = make_ands_implicit((Expr *) stmt->whereClause);
diff --git a/src/backend/executor/execIndexing.c b/src/backend/executor/execIndexing.c
index 8f1b058..7bf33d7 100644
--- a/src/backend/executor/execIndexing.c
+++ b/src/backend/executor/execIndexing.c
@@ -646,7 +646,7 @@ check_exclusion_or_unique_constraint(Relation heap, Relation index,
 	Oid		   *constr_procs;
 	uint16	   *constr_strats;
 	Oid		   *index_collations = index->rd_indcollation;
-	int			index_natts = index->rd_index->indnatts;
+	int			index_nkeyatts = index->rd_index->indnkeyatts;
 	IndexScanDesc index_scan;
 	HeapTuple	tup;
 	ScanKeyData scankeys[INDEX_MAX_KEYS];
@@ -673,7 +673,7 @@ check_exclusion_or_unique_constraint(Relation heap, Relation index,
 	 * If any of the input values are NULL, the constraint check is assumed to
 	 * pass (i.e., we assume the operators are strict).
 	 */
-	for (i = 0; i < index_natts; i++)
+	for (i = 0; i < index_nkeyatts; i++)
 	{
 		if (isnull[i])
 			return true;
@@ -685,7 +685,7 @@ check_exclusion_or_unique_constraint(Relation heap, Relation index,
 	 */
 	InitDirtySnapshot(DirtySnapshot);
 
-	for (i = 0; i < index_natts; i++)
+	for (i = 0; i < index_nkeyatts; i++)
 	{
 		ScanKeyEntryInitialize(&scankeys[i],
 							   0,
@@ -717,8 +717,8 @@ check_exclusion_or_unique_constraint(Relation heap, Relation index,
 retry:
 	conflict = false;
 	found_self = false;
-	index_scan = index_beginscan(heap, index, &DirtySnapshot, index_natts, 0);
-	index_rescan(index_scan, scankeys, index_natts, NULL, 0);
+	index_scan = index_beginscan(heap, index, &DirtySnapshot, index_nkeyatts, 0);
+	index_rescan(index_scan, scankeys, index_nkeyatts, NULL, 0);
 
 	while ((tup = index_getnext(index_scan,
 								ForwardScanDirection)) != NULL)
diff --git a/src/backend/nodes/copyfuncs.c b/src/backend/nodes/copyfuncs.c
index 4cf14b6..d06ed1c 100644
--- a/src/backend/nodes/copyfuncs.c
+++ b/src/backend/nodes/copyfuncs.c
@@ -3110,6 +3110,7 @@ _copyIndexStmt(const IndexStmt *from)
 	COPY_STRING_FIELD(accessMethod);
 	COPY_STRING_FIELD(tableSpace);
 	COPY_NODE_FIELD(indexParams);
+	COPY_NODE_FIELD(indexIncludingParams);
 	COPY_NODE_FIELD(options);
 	COPY_NODE_FIELD(whereClause);
 	COPY_NODE_FIELD(excludeOpNames);
diff --git a/src/backend/nodes/equalfuncs.c b/src/backend/nodes/equalfuncs.c
index a13d831..7298793 100644
--- a/src/backend/nodes/equalfuncs.c
+++ b/src/backend/nodes/equalfuncs.c
@@ -1229,6 +1229,7 @@ _equalIndexStmt(const IndexStmt *a, const IndexStmt *b)
 	COMPARE_STRING_FIELD(accessMethod);
 	COMPARE_STRING_FIELD(tableSpace);
 	COMPARE_NODE_FIELD(indexParams);
+	COMPARE_NODE_FIELD(indexIncludingParams);
 	COMPARE_NODE_FIELD(options);
 	COMPARE_NODE_FIELD(whereClause);
 	COMPARE_NODE_FIELD(excludeOpNames);
diff --git a/src/backend/nodes/outfuncs.c b/src/backend/nodes/outfuncs.c
index 7169d46..d6127e2 100644
--- a/src/backend/nodes/outfuncs.c
+++ b/src/backend/nodes/outfuncs.c
@@ -2170,6 +2170,7 @@ _outIndexStmt(StringInfo str, const IndexStmt *node)
 	WRITE_STRING_FIELD(accessMethod);
 	WRITE_STRING_FIELD(tableSpace);
 	WRITE_NODE_FIELD(indexParams);
+	WRITE_NODE_FIELD(indexIncludingParams);
 	WRITE_NODE_FIELD(options);
 	WRITE_NODE_FIELD(whereClause);
 	WRITE_NODE_FIELD(excludeOpNames);
diff --git a/src/backend/optimizer/path/pathkeys.c b/src/backend/optimizer/path/pathkeys.c
index b81cc49..466f3e7 100644
--- a/src/backend/optimizer/path/pathkeys.c
+++ b/src/backend/optimizer/path/pathkeys.c
@@ -450,6 +450,13 @@ build_index_pathkeys(PlannerInfo *root,
 		bool		nulls_first;
 		PathKey    *cpathkey;
 
+		/*
+		 * INCLUDING columns are stored in index unordered,
+		 * so they don't support ordered index scan.
+		 */
+		if(i >= index->nkeycolumns)
+			break;
+
 		/* We assume we don't need to make a copy of the tlist item */
 		indexkey = indextle->expr;
 
diff --git a/src/backend/optimizer/util/plancat.c b/src/backend/optimizer/util/plancat.c
index 9442e5f..603d11d 100644
--- a/src/backend/optimizer/util/plancat.c
+++ b/src/backend/optimizer/util/plancat.c
@@ -164,7 +164,7 @@ get_relation_info(PlannerInfo *root, Oid relationObjectId, bool inhparent,
 			Relation	indexRelation;
 			Form_pg_index index;
 			IndexOptInfo *info;
-			int			ncolumns;
+			int			ncolumns, nkeycolumns;
 			int			i;
 
 			/*
@@ -207,6 +207,23 @@ get_relation_info(PlannerInfo *root, Oid relationObjectId, bool inhparent,
 				RelationGetForm(indexRelation)->reltablespace;
 			info->rel = rel;
 			info->ncolumns = ncolumns = index->indnatts;
+			info->nkeycolumns = nkeycolumns = index->indnkeyatts;
+
+			/* TODO
+			 * All these arrays below still have length = ncolumns.
+			 * Fix, when optional opclass functionality will be added.
+			 *
+			 * Generally, any column could be returned by IndexOnlyScan.
+			 * Even if it doesn't have opclass for that type of index.
+			 *
+			 * For example,
+			 * we have an index "create index on tbl(c1) including c2".
+			 * If there's no suitable oplass on c2
+			 * query "select c2 from tbl where c2 < 10" can't use index-only scan
+			 * and query "select c2 from tbl where c1 < 10" can.
+			 * But now it doesn't because of requirement that
+			 * each indexed column must have an opclass.
+			 */
 			info->indexkeys = (int *) palloc(sizeof(int) * ncolumns);
 			info->indexcollations = (Oid *) palloc(sizeof(Oid) * ncolumns);
 			info->opfamily = (Oid *) palloc(sizeof(Oid) * ncolumns);
diff --git a/src/backend/parser/gram.y b/src/backend/parser/gram.y
index 223ef17..ce19e66 100644
--- a/src/backend/parser/gram.y
+++ b/src/backend/parser/gram.y
@@ -354,6 +354,7 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query);
 				oper_argtypes RuleActionList RuleActionMulti
 				opt_column_list columnList opt_name_list
 				sort_clause opt_sort_clause sortby_list index_params
+				optincluding opt_including index_including_params
 				name_list role_list from_clause from_list opt_array_bounds
 				qualified_name_list any_name any_name_list type_name_list
 				any_operator expr_list attrs
@@ -6604,7 +6605,7 @@ defacl_privilege_target:
 
 IndexStmt:	CREATE opt_unique INDEX opt_concurrently opt_index_name
 			ON qualified_name access_method_clause '(' index_params ')'
-			opt_reloptions OptTableSpace where_clause
+			opt_including opt_reloptions OptTableSpace where_clause
 				{
 					IndexStmt *n = makeNode(IndexStmt);
 					n->unique = $2;
@@ -6613,9 +6614,10 @@ IndexStmt:	CREATE opt_unique INDEX opt_concurrently opt_index_name
 					n->relation = $7;
 					n->accessMethod = $8;
 					n->indexParams = $10;
-					n->options = $12;
-					n->tableSpace = $13;
-					n->whereClause = $14;
+					n->indexIncludingParams = $12;
+					n->options = $13;
+					n->tableSpace = $14;
+					n->whereClause = $15;
 					n->excludeOpNames = NIL;
 					n->idxcomment = NULL;
 					n->indexOid = InvalidOid;
@@ -6720,6 +6722,16 @@ index_elem:	ColId opt_collate opt_class opt_asc_desc opt_nulls_order
 				}
 		;
 
+optincluding : '(' index_including_params ')'		{ $$ = $2; }
+		;
+opt_including:		INCLUDING optincluding			{ $$ = $2; }
+			 |		/* EMPTY */						{ $$ = NIL; }
+		;
+
+index_including_params:	index_elem						{ $$ = list_make1($1); }
+			| index_including_params ',' index_elem		{ $$ = lappend($1, $3); }
+		;
+
 opt_collate: COLLATE any_name						{ $$ = $2; }
 			| /*EMPTY*/								{ $$ = NIL; }
 		;
diff --git a/src/backend/utils/adt/ruleutils.c b/src/backend/utils/adt/ruleutils.c
index 280808a..8a7b0f1 100644
--- a/src/backend/utils/adt/ruleutils.c
+++ b/src/backend/utils/adt/ruleutils.c
@@ -1134,10 +1134,23 @@ pg_get_indexdef_worker(Oid indexrelid, int colno,
 		Oid			keycoltype;
 		Oid			keycolcollation;
 
+		/* Report the INCLUDED attributes, if any. */
+		if(keyno == idxrec->indnkeyatts)
+		{
+			if(!attrsOnly)
+			{
+				appendStringInfoString(&buf, ") INCLUDING (");
+				sep = "";
+			}
+		}
+		else if (keyno > idxrec->indnkeyatts)
+			sep = ", ";
+
 		if (!colno)
 			appendStringInfoString(&buf, sep);
 		sep = ", ";
 
+
 		if (attnum != 0)
 		{
 			/* Simple index column */
@@ -1145,8 +1158,13 @@ pg_get_indexdef_worker(Oid indexrelid, int colno,
 			int32		keycoltypmod;
 
 			attname = get_relid_attribute_name(indrelid, attnum);
+
 			if (!colno || colno == keyno + 1)
+			{
 				appendStringInfoString(&buf, quote_identifier(attname));
+				if (attrsOnly && keyno >= idxrec->indnkeyatts)
+					appendStringInfoString(&buf, " (included)");
+			}
 			get_atttypetypmodcoll(indrelid, attnum,
 								  &keycoltype, &keycoltypmod,
 								  &keycolcollation);
diff --git a/src/backend/utils/cache/relcache.c b/src/backend/utils/cache/relcache.c
index 6b0c0b7..8851f40 100644
--- a/src/backend/utils/cache/relcache.c
+++ b/src/backend/utils/cache/relcache.c
@@ -1938,6 +1938,7 @@ RelationReloadIndexInfo(Relation relation)
 		 * it's not worth it to track exactly which ones they are.  None of
 		 * the array fields are allowed to change, though.
 		 */
+		relation->rd_index->indnkeyatts = index->indnkeyatts;
 		relation->rd_index->indisunique = index->indisunique;
 		relation->rd_index->indisprimary = index->indisprimary;
 		relation->rd_index->indisexclusion = index->indisexclusion;
diff --git a/src/include/access/itup.h b/src/include/access/itup.h
index c997545..00fdaee 100644
--- a/src/include/access/itup.h
+++ b/src/include/access/itup.h
@@ -18,6 +18,7 @@
 #include "access/tupmacs.h"
 #include "storage/bufpage.h"
 #include "storage/itemptr.h"
+#include "utils/rel.h"
 
 /*
  * Index tuple header structure
@@ -147,5 +148,7 @@ extern Datum nocache_index_getattr(IndexTuple tup, int attnum,
 extern void index_deform_tuple(IndexTuple tup, TupleDesc tupleDescriptor,
 				   Datum *values, bool *isnull);
 extern IndexTuple CopyIndexTuple(IndexTuple source);
+extern IndexTuple index_reform_tuple(Relation idxrel, IndexTuple tup,
+									 int natts, int nkeyatts);
 
 #endif   /* ITUP_H */
diff --git a/src/include/catalog/pg_am.h b/src/include/catalog/pg_am.h
index 8a28b8e..2bce0fc 100644
--- a/src/include/catalog/pg_am.h
+++ b/src/include/catalog/pg_am.h
@@ -51,6 +51,7 @@ CATALOG(pg_am,2601)
 	bool		amstorage;		/* can storage type differ from column type? */
 	bool		amclusterable;	/* does AM support cluster command? */
 	bool		ampredlocks;	/* does AM handle predicate locks? */
+	bool		amcaninclude;	/* does AM support INCLUDING columns? */
 	Oid			amkeytype;		/* type of data in index, or InvalidOid */
 	regproc		aminsert;		/* "insert this tuple" function */
 	regproc		ambeginscan;	/* "prepare for index scan" function */
@@ -80,7 +81,7 @@ typedef FormData_pg_am *Form_pg_am;
  *		compiler constants for pg_am
  * ----------------
  */
-#define Natts_pg_am						30
+#define Natts_pg_am						31
 #define Anum_pg_am_amname				1
 #define Anum_pg_am_amstrategies			2
 #define Anum_pg_am_amsupport			3
@@ -95,44 +96,45 @@ typedef FormData_pg_am *Form_pg_am;
 #define Anum_pg_am_amstorage			12
 #define Anum_pg_am_amclusterable		13
 #define Anum_pg_am_ampredlocks			14
-#define Anum_pg_am_amkeytype			15
-#define Anum_pg_am_aminsert				16
-#define Anum_pg_am_ambeginscan			17
-#define Anum_pg_am_amgettuple			18
-#define Anum_pg_am_amgetbitmap			19
-#define Anum_pg_am_amrescan				20
-#define Anum_pg_am_amendscan			21
-#define Anum_pg_am_ammarkpos			22
-#define Anum_pg_am_amrestrpos			23
-#define Anum_pg_am_ambuild				24
-#define Anum_pg_am_ambuildempty			25
-#define Anum_pg_am_ambulkdelete			26
-#define Anum_pg_am_amvacuumcleanup		27
-#define Anum_pg_am_amcanreturn			28
-#define Anum_pg_am_amcostestimate		29
-#define Anum_pg_am_amoptions			30
+#define Anum_pg_am_amcaninclude		15
+#define Anum_pg_am_amkeytype			16
+#define Anum_pg_am_aminsert				17
+#define Anum_pg_am_ambeginscan			18
+#define Anum_pg_am_amgettuple			19
+#define Anum_pg_am_amgetbitmap			20
+#define Anum_pg_am_amrescan				21
+#define Anum_pg_am_amendscan			22
+#define Anum_pg_am_ammarkpos			23
+#define Anum_pg_am_amrestrpos			24
+#define Anum_pg_am_ambuild				25
+#define Anum_pg_am_ambuildempty			26
+#define Anum_pg_am_ambulkdelete			27
+#define Anum_pg_am_amvacuumcleanup		28
+#define Anum_pg_am_amcanreturn			29
+#define Anum_pg_am_amcostestimate		30
+#define Anum_pg_am_amoptions			31
 
 /* ----------------
  *		initial contents of pg_am
  * ----------------
  */
 
-DATA(insert OID = 403 (  btree		5 2 t f t t t t t t f t t 0 btinsert btbeginscan btgettuple btgetbitmap btrescan btendscan btmarkpos btrestrpos btbuild btbuildempty btbulkdelete btvacuumcleanup btcanreturn btcostestimate btoptions ));
+DATA(insert OID = 403 (  btree		5 2 t f t t t t t t f t t t 0 btinsert btbeginscan btgettuple btgetbitmap btrescan btendscan btmarkpos btrestrpos btbuild btbuildempty btbulkdelete btvacuumcleanup btcanreturn btcostestimate btoptions ));
 DESCR("b-tree index access method");
 #define BTREE_AM_OID 403
-DATA(insert OID = 405 (  hash		1 1 f f t f f f f f f f f 23 hashinsert hashbeginscan hashgettuple hashgetbitmap hashrescan hashendscan hashmarkpos hashrestrpos hashbuild hashbuildempty hashbulkdelete hashvacuumcleanup - hashcostestimate hashoptions ));
+DATA(insert OID = 405 (  hash		1 1 f f t f f f f f f f f f 23 hashinsert hashbeginscan hashgettuple hashgetbitmap hashrescan hashendscan hashmarkpos hashrestrpos hashbuild hashbuildempty hashbulkdelete hashvacuumcleanup - hashcostestimate hashoptions ));
 DESCR("hash index access method");
 #define HASH_AM_OID 405
-DATA(insert OID = 783 (  gist		0 9 f t f f t t f t t t f 0 gistinsert gistbeginscan gistgettuple gistgetbitmap gistrescan gistendscan gistmarkpos gistrestrpos gistbuild gistbuildempty gistbulkdelete gistvacuumcleanup gistcanreturn gistcostestimate gistoptions ));
+DATA(insert OID = 783 (  gist		0 9 f t f f t t f t t t f f 0 gistinsert gistbeginscan gistgettuple gistgetbitmap gistrescan gistendscan gistmarkpos gistrestrpos gistbuild gistbuildempty gistbulkdelete gistvacuumcleanup gistcanreturn gistcostestimate gistoptions ));
 DESCR("GiST index access method");
 #define GIST_AM_OID 783
-DATA(insert OID = 2742 (  gin		0 6 f f f f t t f f t f f 0 gininsert ginbeginscan - gingetbitmap ginrescan ginendscan ginmarkpos ginrestrpos ginbuild ginbuildempty ginbulkdelete ginvacuumcleanup - gincostestimate ginoptions ));
+DATA(insert OID = 2742 (  gin		0 6 f f f f t t f f t f f f 0 gininsert ginbeginscan - gingetbitmap ginrescan ginendscan ginmarkpos ginrestrpos ginbuild ginbuildempty ginbulkdelete ginvacuumcleanup - gincostestimate ginoptions ));
 DESCR("GIN index access method");
 #define GIN_AM_OID 2742
-DATA(insert OID = 4000 (  spgist	0 5 f f f f f t f t f f f 0 spginsert spgbeginscan spggettuple spggetbitmap spgrescan spgendscan spgmarkpos spgrestrpos spgbuild spgbuildempty spgbulkdelete spgvacuumcleanup spgcanreturn spgcostestimate spgoptions ));
+DATA(insert OID = 4000 (  spgist	0 5 f f f f f t f t f f f f 0 spginsert spgbeginscan spggettuple spggetbitmap spgrescan spgendscan spgmarkpos spgrestrpos spgbuild spgbuildempty spgbulkdelete spgvacuumcleanup spgcanreturn spgcostestimate spgoptions ));
 DESCR("SP-GiST index access method");
 #define SPGIST_AM_OID 4000
-DATA(insert OID = 3580 (  brin	   0 15 f f f f t t f t t f f 0 brininsert brinbeginscan - bringetbitmap brinrescan brinendscan brinmarkpos brinrestrpos brinbuild brinbuildempty brinbulkdelete brinvacuumcleanup - brincostestimate brinoptions ));
+DATA(insert OID = 3580 (  brin	   0 15 f f f f t t f t t f f f 0 brininsert brinbeginscan - bringetbitmap brinrescan brinendscan brinmarkpos brinrestrpos brinbuild brinbuildempty brinbulkdelete brinvacuumcleanup - brincostestimate brinoptions ));
 DESCR("block range index (BRIN) access method");
 #define BRIN_AM_OID 3580
 
diff --git a/src/include/catalog/pg_index.h b/src/include/catalog/pg_index.h
index 45c96e3..1ef0767 100644
--- a/src/include/catalog/pg_index.h
+++ b/src/include/catalog/pg_index.h
@@ -32,7 +32,8 @@ CATALOG(pg_index,2610) BKI_WITHOUT_OIDS BKI_SCHEMA_MACRO
 {
 	Oid			indexrelid;		/* OID of the index */
 	Oid			indrelid;		/* OID of the relation it indexes */
-	int16		indnatts;		/* number of columns in index */
+	int16		indnatts;		/* total number of columns in index */
+	int16		indnkeyatts;	/* number of key columns in index */
 	bool		indisunique;	/* is this a unique index? */
 	bool		indisprimary;	/* is this index for primary key? */
 	bool		indisexclusion; /* is this index for exclusion constraint? */
@@ -70,26 +71,27 @@ typedef FormData_pg_index *Form_pg_index;
  *		compiler constants for pg_index
  * ----------------
  */
-#define Natts_pg_index					19
+#define Natts_pg_index					20
 #define Anum_pg_index_indexrelid		1
 #define Anum_pg_index_indrelid			2
 #define Anum_pg_index_indnatts			3
-#define Anum_pg_index_indisunique		4
-#define Anum_pg_index_indisprimary		5
-#define Anum_pg_index_indisexclusion	6
-#define Anum_pg_index_indimmediate		7
-#define Anum_pg_index_indisclustered	8
-#define Anum_pg_index_indisvalid		9
-#define Anum_pg_index_indcheckxmin		10
-#define Anum_pg_index_indisready		11
-#define Anum_pg_index_indislive			12
-#define Anum_pg_index_indisreplident	13
-#define Anum_pg_index_indkey			14
-#define Anum_pg_index_indcollation		15
-#define Anum_pg_index_indclass			16
-#define Anum_pg_index_indoption			17
-#define Anum_pg_index_indexprs			18
-#define Anum_pg_index_indpred			19
+#define Anum_pg_index_indnkeyatts		4
+#define Anum_pg_index_indisunique		5
+#define Anum_pg_index_indisprimary		6
+#define Anum_pg_index_indisexclusion	7
+#define Anum_pg_index_indimmediate		8
+#define Anum_pg_index_indisclustered	9
+#define Anum_pg_index_indisvalid		10
+#define Anum_pg_index_indcheckxmin		11
+#define Anum_pg_index_indisready		12
+#define Anum_pg_index_indislive			13
+#define Anum_pg_index_indisreplident	14
+#define Anum_pg_index_indkey			15
+#define Anum_pg_index_indcollation		16
+#define Anum_pg_index_indclass			17
+#define Anum_pg_index_indoption			18
+#define Anum_pg_index_indexprs			19
+#define Anum_pg_index_indpred			20
 
 /*
  * Index AMs that support ordered scans must support these two indoption
diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h
index 5ccf470..56d95b5 100644
--- a/src/include/nodes/execnodes.h
+++ b/src/include/nodes/execnodes.h
@@ -58,7 +58,8 @@
 typedef struct IndexInfo
 {
 	NodeTag		type;
-	int			ii_NumIndexAttrs;
+	int			ii_NumIndexAttrs; /* total number of columns in index */
+	int			ii_NumIndexKeyAttrs; /* number of key columns in index */
 	AttrNumber	ii_KeyAttrNumbers[INDEX_MAX_KEYS];
 	List	   *ii_Expressions; /* list of Expr */
 	List	   *ii_ExpressionsState;	/* list of ExprState */
diff --git a/src/include/nodes/parsenodes.h b/src/include/nodes/parsenodes.h
index abd4dd1..15dddbd 100644
--- a/src/include/nodes/parsenodes.h
+++ b/src/include/nodes/parsenodes.h
@@ -2426,6 +2426,8 @@ typedef struct IndexStmt
 	char	   *accessMethod;	/* name of access method (eg. btree) */
 	char	   *tableSpace;		/* tablespace, or NULL for default */
 	List	   *indexParams;	/* columns to index: a list of IndexElem */
+	List	   *indexIncludingParams;	/* additional columns to index:
+										 * a list of IndexElem */
 	List	   *options;		/* WITH clause options: a list of DefElem */
 	Node	   *whereClause;	/* qualification (partial-index predicate) */
 	List	   *excludeOpNames; /* exclusion operator names, or NIL if none */
diff --git a/src/include/nodes/relation.h b/src/include/nodes/relation.h
index 5393005..248a53c 100644
--- a/src/include/nodes/relation.h
+++ b/src/include/nodes/relation.h
@@ -543,6 +543,7 @@ typedef struct IndexOptInfo
 
 	/* index descriptor information */
 	int			ncolumns;		/* number of columns in index */
+	int			nkeycolumns;	/* number of key columns in index */
 	int		   *indexkeys;		/* column numbers of index's keys, or 0 */
 	Oid		   *indexcollations;	/* OIDs of collations of index columns */
 	Oid		   *opfamily;		/* OIDs of operator families for columns */
diff --git a/src/include/utils/rel.h b/src/include/utils/rel.h
index 8a55a09..38adac8 100644
--- a/src/include/utils/rel.h
+++ b/src/include/utils/rel.h
@@ -346,11 +346,18 @@ typedef struct ViewOptions
 
 /*
  * RelationGetNumberOfAttributes
- *		Returns the number of attributes in a relation.
+ *		Returns the total number of attributes in a relation.
  */
 #define RelationGetNumberOfAttributes(relation) ((relation)->rd_rel->relnatts)
 
 /*
+ * IndexRelationGetNumberOfKeyAttributes
+ *		Returns the number of key attributes in a relation.
+ */
+#define IndexRelationGetNumberOfKeyAttributes(relation) \
+		((relation)->rd_index->indnkeyatts)
+
+/*
  * RelationGetDescr
  *		Returns tuple descriptor for a relation.
  */
diff --git a/src/test/regress/expected/create_index.out b/src/test/regress/expected/create_index.out
index b72e65d..8ea9e67 100644
--- a/src/test/regress/expected/create_index.out
+++ b/src/test/regress/expected/create_index.out
@@ -2376,6 +2376,22 @@ DETAIL:  Key ((f1 || f2))=(ABCDEF) already exists.
 -- but this shouldn't:
 INSERT INTO func_index_heap VALUES('QWERTY');
 --
+-- Test unique index with included columns
+--
+CREATE TABLE covering_index_heap (f1 int, f2 int, f3 text);
+CREATE UNIQUE INDEX covering_index_index on covering_index_heap (f1,f2) INCLUDING(f3);
+INSERT INTO covering_index_heap VALUES(1,1,'AAA');
+INSERT INTO covering_index_heap VALUES(1,2,'AAA');
+-- this should fail because of unique index on f1,f2:
+INSERT INTO covering_index_heap VALUES(1,2,'BBB');
+ERROR:  duplicate key value violates unique constraint "covering_index_index"
+DETAIL:  Key (f1, f2, f3 (included))=(1, 2, BBB) already exists.
+-- and this shouldn't:
+INSERT INTO covering_index_heap VALUES(1,4,'AAA');
+-- Try to build index on table that already contains data
+CREATE UNIQUE INDEX covering_index_index2 on covering_index_heap (f1,f2) INCLUDING(f3);
+DROP TABLE covering_index_heap;
+--
 -- Also try building functional, expressional, and partial indexes on
 -- tables that already contain data.
 --
diff --git a/src/test/regress/sql/create_index.sql b/src/test/regress/sql/create_index.sql
index ff86953..42d4881 100644
--- a/src/test/regress/sql/create_index.sql
+++ b/src/test/regress/sql/create_index.sql
@@ -722,6 +722,22 @@ INSERT INTO func_index_heap VALUES('ABCD', 'EF');
 INSERT INTO func_index_heap VALUES('QWERTY');
 
 --
+-- Test unique index with included columns
+--
+CREATE TABLE covering_index_heap (f1 int, f2 int, f3 text);
+CREATE UNIQUE INDEX covering_index_index on covering_index_heap (f1,f2) INCLUDING(f3);
+
+INSERT INTO covering_index_heap VALUES(1,1,'AAA');
+INSERT INTO covering_index_heap VALUES(1,2,'AAA');
+-- this should fail because of unique index on f1,f2:
+INSERT INTO covering_index_heap VALUES(1,2,'BBB');
+-- and this shouldn't:
+INSERT INTO covering_index_heap VALUES(1,4,'AAA');
+-- Try to build index on table that already contains data
+CREATE UNIQUE INDEX covering_index_index2 on covering_index_heap (f1,f2) INCLUDING(f3);
+DROP TABLE covering_index_heap;
+
+--
 -- Also try building functional, expressional, and partial indexes on
 -- tables that already contain data.
 --
diff --git a/src/backend/access/index/genam.c b/src/backend/access/index/genam.c
index aa5b28c..270ab00 100644
--- a/src/backend/access/index/genam.c
+++ b/src/backend/access/index/genam.c
@@ -175,6 +175,7 @@ BuildIndexValueDescription(Relation indexRelation,
 	Form_pg_index idxrec;
 	HeapTuple	ht_idx;
 	int			natts = indexRelation->rd_rel->relnatts;
+	int			nkeyatts;
 	int			i;
 	int			keyno;
 	Oid			indexrelid = RelationGetRelid(indexRelation);
@@ -244,6 +245,7 @@ BuildIndexValueDescription(Relation indexRelation,
 	appendStringInfo(&buf, "(%s)=(",
 					 pg_get_indexdef_columns(indexrelid, true));
 
+	nkeyatts = IndexRelationGetNumberOfKeyAttributes(indexRelation);
 	for (i = 0; i < natts; i++)
 	{
 		char	   *val;
@@ -254,20 +256,28 @@ BuildIndexValueDescription(Relation indexRelation,
 		{
 			Oid			foutoid;
 			bool		typisvarlena;
-
-			/*
-			 * The provided data is not necessarily of the type stored in the
-			 * index; rather it is of the index opclass's input type. So look
-			 * at rd_opcintype not the index tupdesc.
-			 *
-			 * Note: this is a bit shaky for opclasses that have pseudotype
-			 * input types such as ANYARRAY or RECORD.  Currently, the
-			 * typoutput functions associated with the pseudotypes will work
-			 * okay, but we might have to try harder in future.
-			 */
-			getTypeOutputInfo(indexRelation->rd_opcintype[i],
-							  &foutoid, &typisvarlena);
-			val = OidOutputFunctionCall(foutoid, values[i]);
+			TupleDesc	tupdesc = RelationGetDescr(indexRelation);
+			if (i < nkeyatts)
+			{
+				/*
+				* The provided data is not necessarily of the type stored in the
+				* index; rather it is of the index opclass's input type. So look
+				* at rd_opcintype not the index tupdesc.
+				*
+				* Note: this is a bit shaky for opclasses that have pseudotype
+				* input types such as ANYARRAY or RECORD.  Currently, the
+				* typoutput functions associated with the pseudotypes will work
+				* okay, but we might have to try harder in future.
+				*/
+				getTypeOutputInfo(indexRelation->rd_opcintype[i],
+								&foutoid, &typisvarlena);
+				val = OidOutputFunctionCall(foutoid, values[i]);
+			}
+			else
+			{
+				getTypeOutputInfo(tupdesc->attrs[i]->atttypid, &foutoid, &typisvarlena);
+				val = OidOutputFunctionCall(foutoid, values[i]);
+			}
 		}
 
 		if (i > 0)
diff --git a/src/backend/access/nbtree/nbtutils.c b/src/backend/access/nbtree/nbtutils.c
index 2192da4..e9997aa 100644
--- a/src/backend/access/nbtree/nbtutils.c
+++ b/src/backend/access/nbtree/nbtutils.c
@@ -121,16 +121,16 @@ ScanKey
 _bt_mkscankey_nodata(Relation rel)
 {
 	ScanKey		skey;
-	int			natts;
+	int			nkeyatts;
 	int16	   *indoption;
 	int			i;
 
-	natts = RelationGetNumberOfAttributes(rel);
+	nkeyatts = IndexRelationGetNumberOfKeyAttributes(rel);
 	indoption = rel->rd_indoption;
 
-	skey = (ScanKey) palloc(natts * sizeof(ScanKeyData));
+	skey = (ScanKey) palloc(nkeyatts * sizeof(ScanKeyData));
 
-	for (i = 0; i < natts; i++)
+	for (i = 0; i < nkeyatts; i++)
 	{
 		FmgrInfo   *procinfo;
 		int			flags;
diff --git a/src/backend/catalog/index.c b/src/backend/catalog/index.c
index d3e980e..eaefe3b 100644
--- a/src/backend/catalog/index.c
+++ b/src/backend/catalog/index.c
@@ -427,38 +427,45 @@ ConstructTupleDescriptor(Relation heapRelation,
 		namestrcpy(&to->attname, (const char *) lfirst(colnames_item));
 		colnames_item = lnext(colnames_item);
 
-		/*
-		 * Check the opclass and index AM to see if either provides a keytype
-		 * (overriding the attribute type).  Opclass takes precedence.
-		 */
-		tuple = SearchSysCache1(CLAOID, ObjectIdGetDatum(classObjectId[i]));
-		if (!HeapTupleIsValid(tuple))
-			elog(ERROR, "cache lookup failed for opclass %u",
-				 classObjectId[i]);
-		opclassTup = (Form_pg_opclass) GETSTRUCT(tuple);
-		if (OidIsValid(opclassTup->opckeytype))
-			keyType = opclassTup->opckeytype;
-		else
-			keyType = amform->amkeytype;
-		ReleaseSysCache(tuple);
-
-		if (OidIsValid(keyType) && keyType != to->atttypid)
+		if (i < indexInfo->ii_NumIndexKeyAttrs)
 		{
-			/* index value and heap value have different types */
-			tuple = SearchSysCache1(TYPEOID, ObjectIdGetDatum(keyType));
+			/*
+			 * Check the opclass and index AM to see if either provides a keytype
+			 * (overriding the attribute type).  Opclass takes precedence.
+			 */
+			tuple = SearchSysCache1(CLAOID, ObjectIdGetDatum(classObjectId[i]));
 			if (!HeapTupleIsValid(tuple))
-				elog(ERROR, "cache lookup failed for type %u", keyType);
-			typeTup = (Form_pg_type) GETSTRUCT(tuple);
-
-			to->atttypid = keyType;
-			to->atttypmod = -1;
-			to->attlen = typeTup->typlen;
-			to->attbyval = typeTup->typbyval;
-			to->attalign = typeTup->typalign;
-			to->attstorage = typeTup->typstorage;
-
+				elog(ERROR, "cache lookup failed for opclass %u",
+					classObjectId[i]);
+			opclassTup = (Form_pg_opclass) GETSTRUCT(tuple);
+			if (OidIsValid(opclassTup->opckeytype))
+				keyType = opclassTup->opckeytype;
+			else
+				keyType = amform->amkeytype;
 			ReleaseSysCache(tuple);
+
+			if (OidIsValid(keyType) && keyType != to->atttypid)
+			{
+				/* index value and heap value have different types */
+				tuple = SearchSysCache1(TYPEOID, ObjectIdGetDatum(keyType));
+				if (!HeapTupleIsValid(tuple))
+					elog(ERROR, "cache lookup failed for type %u", keyType);
+				typeTup = (Form_pg_type) GETSTRUCT(tuple);
+
+				to->atttypid = keyType;
+				to->atttypmod = -1;
+				to->attlen = typeTup->typlen;
+				to->attbyval = typeTup->typbyval;
+				to->attalign = typeTup->typalign;
+				to->attstorage = typeTup->typstorage;
+
+				ReleaseSysCache(tuple);
+			}
 		}
+// 		else
+// 		{
+// 			elog(NOTICE, "ConstructTupleDescriptor. Included opclass attr num %d. Don't check opclass", i);
+// 		}
 	}
 
 	ReleaseSysCache(amtuple);
@@ -1000,7 +1007,7 @@ index_create(Relation heapRelation,
 
 		/* Store dependency on collations */
 		/* The default collation is pinned, so don't bother recording it */
-		for (i = 0; i < indexInfo->ii_NumIndexAttrs; i++)
+		for (i = 0; i < indexInfo->ii_NumIndexKeyAttrs; i++)
 		{
 			if (OidIsValid(collationObjectId[i]) &&
 				collationObjectId[i] != DEFAULT_COLLATION_OID)
@@ -1014,7 +1021,7 @@ index_create(Relation heapRelation,
 		}
 
 		/* Store dependency on operator classes */
-		for (i = 0; i < indexInfo->ii_NumIndexAttrs; i++)
+		for (i = 0; i < indexInfo->ii_NumIndexKeyAttrs; i++)
 		{
 			referenced.classId = OperatorClassRelationId;
 			referenced.objectId = classObjectId[i];
@@ -1702,9 +1709,11 @@ BuildIndexInfo(Relation index)
 void
 BuildSpeculativeIndexInfo(Relation index, IndexInfo *ii)
 {
-	int			ncols = index->rd_rel->relnatts;
+	int			nkeycols;
 	int			i;
 
+	nkeycols = IndexRelationGetNumberOfKeyAttributes(index);
+
 	/*
 	 * fetch info for checking unique indexes
 	 */
@@ -1713,16 +1722,16 @@ BuildSpeculativeIndexInfo(Relation index, IndexInfo *ii)
 	if (index->rd_rel->relam != BTREE_AM_OID)
 		elog(ERROR, "unexpected non-btree speculative unique index");
 
-	ii->ii_UniqueOps = (Oid *) palloc(sizeof(Oid) * ncols);
-	ii->ii_UniqueProcs = (Oid *) palloc(sizeof(Oid) * ncols);
-	ii->ii_UniqueStrats = (uint16 *) palloc(sizeof(uint16) * ncols);
+	ii->ii_UniqueOps = (Oid *) palloc(sizeof(Oid) * nkeycols);
+	ii->ii_UniqueProcs = (Oid *) palloc(sizeof(Oid) * nkeycols);
+	ii->ii_UniqueStrats = (uint16 *) palloc(sizeof(uint16) * nkeycols);
 
 	/*
 	 * We have to look up the operator's strategy number.  This provides a
 	 * cross-check that the operator does match the index.
 	 */
 	/* We need the func OIDs and strategy numbers too */
-	for (i = 0; i < ncols; i++)
+	for (i = 0; i < nkeycols; i++)
 	{
 		ii->ii_UniqueStrats[i] = BTEqualStrategyNumber;
 		ii->ii_UniqueOps[i] =
diff --git a/src/backend/commands/indexcmds.c b/src/backend/commands/indexcmds.c
index f5ca83a..7ecca3f 100644
--- a/src/backend/commands/indexcmds.c
+++ b/src/backend/commands/indexcmds.c
@@ -988,16 +988,15 @@ ComputeIndexAttrs(IndexInfo *indexInfo,
 	ListCell   *nextExclOp;
 	ListCell   *lc;
 	int			attn;
+	int nkeycols = indexInfo->ii_NumIndexKeyAttrs;
 
 	/* Allocate space for exclusion operator info, if needed */
 	if (exclusionOpNames)
 	{
-		int			ncols = list_length(attList);
-
-		Assert(list_length(exclusionOpNames) == ncols);
-		indexInfo->ii_ExclusionOps = (Oid *) palloc(sizeof(Oid) * ncols);
-		indexInfo->ii_ExclusionProcs = (Oid *) palloc(sizeof(Oid) * ncols);
-		indexInfo->ii_ExclusionStrats = (uint16 *) palloc(sizeof(uint16) * ncols);
+		Assert(list_length(exclusionOpNames) == nkeycols);
+		indexInfo->ii_ExclusionOps = (Oid *) palloc(sizeof(Oid) * nkeycols);
+		indexInfo->ii_ExclusionProcs = (Oid *) palloc(sizeof(Oid) * nkeycols);
+		indexInfo->ii_ExclusionStrats = (uint16 *) palloc(sizeof(uint16) * nkeycols);
 		nextExclOp = list_head(exclusionOpNames);
 	}
 	else
@@ -1130,110 +1129,116 @@ ComputeIndexAttrs(IndexInfo *indexInfo,
 		/*
 		 * Identify the opclass to use.
 		 */
-		classOidP[attn] = GetIndexOpClass(attribute->opclass,
-										  atttype,
-										  accessMethodName,
-										  accessMethodId);
-
-		/*
-		 * Identify the exclusion operator, if any.
-		 */
-		if (nextExclOp)
+		if (attn < nkeycols)
 		{
-			List	   *opname = (List *) lfirst(nextExclOp);
-			Oid			opid;
-			Oid			opfamily;
-			int			strat;
+			classOidP[attn] = GetIndexOpClass(attribute->opclass,
+											atttype,
+											accessMethodName,
+											accessMethodId);
 
 			/*
-			 * Find the operator --- it must accept the column datatype
-			 * without runtime coercion (but binary compatibility is OK)
+			 * Identify the exclusion operator, if any.
 			 */
-			opid = compatible_oper_opid(opname, atttype, atttype, false);
+			if (nextExclOp)
+			{
+				List	   *opname = (List *) lfirst(nextExclOp);
+				Oid			opid;
+				Oid			opfamily;
+				int			strat;
 
-			/*
-			 * Only allow commutative operators to be used in exclusion
-			 * constraints. If X conflicts with Y, but Y does not conflict
-			 * with X, bad things will happen.
-			 */
-			if (get_commutator(opid) != opid)
-				ereport(ERROR,
-						(errcode(ERRCODE_WRONG_OBJECT_TYPE),
-						 errmsg("operator %s is not commutative",
-								format_operator(opid)),
-						 errdetail("Only commutative operators can be used in exclusion constraints.")));
+				/*
+				* Find the operator --- it must accept the column datatype
+				* without runtime coercion (but binary compatibility is OK)
+				*/
+				opid = compatible_oper_opid(opname, atttype, atttype, false);
 
-			/*
-			 * Operator must be a member of the right opfamily, too
-			 */
-			opfamily = get_opclass_family(classOidP[attn]);
-			strat = get_op_opfamily_strategy(opid, opfamily);
-			if (strat == 0)
-			{
-				HeapTuple	opftuple;
-				Form_pg_opfamily opfform;
+				/*
+				* Only allow commutative operators to be used in exclusion
+				* constraints. If X conflicts with Y, but Y does not conflict
+				* with X, bad things will happen.
+				*/
+				if (get_commutator(opid) != opid)
+					ereport(ERROR,
+							(errcode(ERRCODE_WRONG_OBJECT_TYPE),
+							errmsg("operator %s is not commutative",
+									format_operator(opid)),
+							errdetail("Only commutative operators can be used in exclusion constraints.")));
 
 				/*
-				 * attribute->opclass might not explicitly name the opfamily,
-				 * so fetch the name of the selected opfamily for use in the
-				 * error message.
-				 */
-				opftuple = SearchSysCache1(OPFAMILYOID,
-										   ObjectIdGetDatum(opfamily));
-				if (!HeapTupleIsValid(opftuple))
-					elog(ERROR, "cache lookup failed for opfamily %u",
-						 opfamily);
-				opfform = (Form_pg_opfamily) GETSTRUCT(opftuple);
+				* Operator must be a member of the right opfamily, too
+				*/
+				opfamily = get_opclass_family(classOidP[attn]);
+				strat = get_op_opfamily_strategy(opid, opfamily);
+				if (strat == 0)
+				{
+					HeapTuple	opftuple;
+					Form_pg_opfamily opfform;
+
+					/*
+					* attribute->opclass might not explicitly name the opfamily,
+					* so fetch the name of the selected opfamily for use in the
+					* error message.
+					*/
+					opftuple = SearchSysCache1(OPFAMILYOID,
+											ObjectIdGetDatum(opfamily));
+					if (!HeapTupleIsValid(opftuple))
+						elog(ERROR, "cache lookup failed for opfamily %u",
+							opfamily);
+					opfform = (Form_pg_opfamily) GETSTRUCT(opftuple);
 
-				ereport(ERROR,
-						(errcode(ERRCODE_WRONG_OBJECT_TYPE),
-						 errmsg("operator %s is not a member of operator family \"%s\"",
-								format_operator(opid),
-								NameStr(opfform->opfname)),
-						 errdetail("The exclusion operator must be related to the index operator class for the constraint.")));
-			}
+					ereport(ERROR,
+							(errcode(ERRCODE_WRONG_OBJECT_TYPE),
+							errmsg("operator %s is not a member of operator family \"%s\"",
+									format_operator(opid),
+									NameStr(opfform->opfname)),
+							errdetail("The exclusion operator must be related to the index operator class for the constraint.")));
+				}
 
-			indexInfo->ii_ExclusionOps[attn] = opid;
-			indexInfo->ii_ExclusionProcs[attn] = get_opcode(opid);
-			indexInfo->ii_ExclusionStrats[attn] = strat;
-			nextExclOp = lnext(nextExclOp);
-		}
+				indexInfo->ii_ExclusionOps[attn] = opid;
+				indexInfo->ii_ExclusionProcs[attn] = get_opcode(opid);
+				indexInfo->ii_ExclusionStrats[attn] = strat;
+				nextExclOp = lnext(nextExclOp);
+			}
 
-		/*
-		 * Set up the per-column options (indoption field).  For now, this is
-		 * zero for any un-ordered index, while ordered indexes have DESC and
-		 * NULLS FIRST/LAST options.
-		 */
-		colOptionP[attn] = 0;
-		if (amcanorder)
-		{
-			/* default ordering is ASC */
-			if (attribute->ordering == SORTBY_DESC)
-				colOptionP[attn] |= INDOPTION_DESC;
-			/* default null ordering is LAST for ASC, FIRST for DESC */
-			if (attribute->nulls_ordering == SORTBY_NULLS_DEFAULT)
+			/*
+			* Set up the per-column options (indoption field).  For now, this is
+			* zero for any un-ordered index, while ordered indexes have DESC and
+			* NULLS FIRST/LAST options.
+			*/
+			colOptionP[attn] = 0;
+			if (amcanorder)
 			{
+				/* default ordering is ASC */
 				if (attribute->ordering == SORTBY_DESC)
+					colOptionP[attn] |= INDOPTION_DESC;
+				/* default null ordering is LAST for ASC, FIRST for DESC */
+				if (attribute->nulls_ordering == SORTBY_NULLS_DEFAULT)
+				{
+					if (attribute->ordering == SORTBY_DESC)
+						colOptionP[attn] |= INDOPTION_NULLS_FIRST;
+				}
+				else if (attribute->nulls_ordering == SORTBY_NULLS_FIRST)
 					colOptionP[attn] |= INDOPTION_NULLS_FIRST;
 			}
-			else if (attribute->nulls_ordering == SORTBY_NULLS_FIRST)
-				colOptionP[attn] |= INDOPTION_NULLS_FIRST;
-		}
-		else
-		{
-			/* index AM does not support ordering */
-			if (attribute->ordering != SORTBY_DEFAULT)
-				ereport(ERROR,
-						(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
-						 errmsg("access method \"%s\" does not support ASC/DESC options",
-								accessMethodName)));
-			if (attribute->nulls_ordering != SORTBY_NULLS_DEFAULT)
-				ereport(ERROR,
-						(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
-						 errmsg("access method \"%s\" does not support NULLS FIRST/LAST options",
-								accessMethodName)));
+			else
+			{
+				/* index AM does not support ordering */
+				if (attribute->ordering != SORTBY_DEFAULT)
+					ereport(ERROR,
+							(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+							errmsg("access method \"%s\" does not support ASC/DESC options",
+									accessMethodName)));
+				if (attribute->nulls_ordering != SORTBY_NULLS_DEFAULT)
+					ereport(ERROR,
+							(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+							errmsg("access method \"%s\" does not support NULLS FIRST/LAST options",
+									accessMethodName)));
+			}
 		}
-
+// 		else
+// 		{
+// 			elog(NOTICE, "ComputeIndexAttrs. Included attn %d, nkeycols %d ncols %d  Don't look for opclass", attn, nkeycols, indexInfo->ii_NumIndexAttrs);
+// 		}
 		attn++;
 	}
 }
diff --git a/src/backend/optimizer/util/plancat.c b/src/backend/optimizer/util/plancat.c
index 603d11d..b7c2429 100644
--- a/src/backend/optimizer/util/plancat.c
+++ b/src/backend/optimizer/util/plancat.c
@@ -225,18 +225,22 @@ get_relation_info(PlannerInfo *root, Oid relationObjectId, bool inhparent,
 			 * each indexed column must have an opclass.
 			 */
 			info->indexkeys = (int *) palloc(sizeof(int) * ncolumns);
-			info->indexcollations = (Oid *) palloc(sizeof(Oid) * ncolumns);
-			info->opfamily = (Oid *) palloc(sizeof(Oid) * ncolumns);
-			info->opcintype = (Oid *) palloc(sizeof(Oid) * ncolumns);
+			info->indexcollations = (Oid *) palloc(sizeof(Oid) * nkeycolumns);
+			info->opfamily = (Oid *) palloc(sizeof(Oid) * nkeycolumns);
+			info->opcintype = (Oid *) palloc(sizeof(Oid) * nkeycolumns);
 			info->canreturn = (bool *) palloc(sizeof(bool) * ncolumns);
 
 			for (i = 0; i < ncolumns; i++)
 			{
 				info->indexkeys[i] = index->indkey.values[i];
+				info->canreturn[i] = index_can_return(indexRelation, i + 1);
+			}
+
+			for (i = 0; i < nkeycolumns; i++)
+			{
 				info->indexcollations[i] = indexRelation->rd_indcollation[i];
 				info->opfamily[i] = indexRelation->rd_opfamily[i];
 				info->opcintype[i] = indexRelation->rd_opcintype[i];
-				info->canreturn[i] = index_can_return(indexRelation, i + 1);
 			}
 
 			info->relam = indexRelation->rd_rel->relam;
@@ -260,10 +264,10 @@ get_relation_info(PlannerInfo *root, Oid relationObjectId, bool inhparent,
 				Assert(indexRelation->rd_am->amcanorder);
 
 				info->sortopfamily = info->opfamily;
-				info->reverse_sort = (bool *) palloc(sizeof(bool) * ncolumns);
-				info->nulls_first = (bool *) palloc(sizeof(bool) * ncolumns);
+				info->reverse_sort = (bool *) palloc(sizeof(bool) * nkeycolumns);
+				info->nulls_first = (bool *) palloc(sizeof(bool) * nkeycolumns);
 
-				for (i = 0; i < ncolumns; i++)
+				for (i = 0; i < nkeycolumns; i++)
 				{
 					int16		opt = indexRelation->rd_indoption[i];
 
@@ -287,11 +291,11 @@ get_relation_info(PlannerInfo *root, Oid relationObjectId, bool inhparent,
 				 * of current or foreseeable amcanorder index types, it's not
 				 * worth expending more effort on now.
 				 */
-				info->sortopfamily = (Oid *) palloc(sizeof(Oid) * ncolumns);
-				info->reverse_sort = (bool *) palloc(sizeof(bool) * ncolumns);
-				info->nulls_first = (bool *) palloc(sizeof(bool) * ncolumns);
+				info->sortopfamily = (Oid *) palloc(sizeof(Oid) * nkeycolumns);
+				info->reverse_sort = (bool *) palloc(sizeof(bool) * nkeycolumns);
+				info->nulls_first = (bool *) palloc(sizeof(bool) * nkeycolumns);
 
-				for (i = 0; i < ncolumns; i++)
+				for (i = 0; i < nkeycolumns; i++)
 				{
 					int16		opt = indexRelation->rd_indoption[i];
 					Oid			ltopr;
diff --git a/src/backend/utils/adt/ruleutils.c b/src/backend/utils/adt/ruleutils.c
index 8a7b0f1..05221d9 100644
--- a/src/backend/utils/adt/ruleutils.c
+++ b/src/backend/utils/adt/ruleutils.c
@@ -1194,7 +1194,8 @@ pg_get_indexdef_worker(Oid indexrelid, int colno,
 			keycolcollation = exprCollation(indexkey);
 		}
 
-		if (!attrsOnly && (!colno || colno == keyno + 1))
+		if (!attrsOnly && (!colno || colno == keyno + 1)
+			&& keyno < idxrec->indnkeyatts)
 		{
 			Oid			indcoll;
 
diff --git a/src/backend/utils/cache/relcache.c b/src/backend/utils/cache/relcache.c
index 8851f40..003fa0c 100644
--- a/src/backend/utils/cache/relcache.c
+++ b/src/backend/utils/cache/relcache.c
@@ -1176,7 +1176,7 @@ RelationInitIndexAccessInfo(Relation relation)
 	int2vector *indoption;
 	MemoryContext indexcxt;
 	MemoryContext oldcontext;
-	int			natts;
+	int			natts, nkeyatts;
 	uint16		amsupport;
 
 	/*
@@ -1212,6 +1212,7 @@ RelationInitIndexAccessInfo(Relation relation)
 		elog(ERROR, "relnatts disagrees with indnatts for index %u",
 			 RelationGetRelid(relation));
 	amsupport = aform->amsupport;
+	nkeyatts = IndexRelationGetNumberOfKeyAttributes(relation);
 
 	/*
 	 * Make the private context to hold index access info.  The reason we need
@@ -1235,9 +1236,9 @@ RelationInitIndexAccessInfo(Relation relation)
 		MemoryContextAllocZero(indexcxt, sizeof(RelationAmInfo));
 
 	relation->rd_opfamily = (Oid *)
-		MemoryContextAllocZero(indexcxt, natts * sizeof(Oid));
+		MemoryContextAllocZero(indexcxt, nkeyatts * sizeof(Oid));
 	relation->rd_opcintype = (Oid *)
-		MemoryContextAllocZero(indexcxt, natts * sizeof(Oid));
+		MemoryContextAllocZero(indexcxt, nkeyatts * sizeof(Oid));
 
 	if (amsupport > 0)
 	{
@@ -1292,7 +1293,7 @@ RelationInitIndexAccessInfo(Relation relation)
 	 */
 	IndexSupportInitialize(indclass, relation->rd_support,
 						   relation->rd_opfamily, relation->rd_opcintype,
-						   amsupport, natts);
+						   amsupport, nkeyatts);
 
 	/*
 	 * Similarly extract indoption and copy it to the cache entry
@@ -4364,7 +4365,7 @@ RelationGetExclusionInfo(Relation indexRelation,
 						 Oid **procs,
 						 uint16 **strategies)
 {
-	int			ncols = indexRelation->rd_rel->relnatts;
+	int			nkeycols;
 	Oid		   *ops;
 	Oid		   *funcs;
 	uint16	   *strats;
@@ -4376,17 +4377,19 @@ RelationGetExclusionInfo(Relation indexRelation,
 	MemoryContext oldcxt;
 	int			i;
 
+	nkeycols = IndexRelationGetNumberOfKeyAttributes(indexRelation);
+
 	/* Allocate result space in caller context */
-	*operators = ops = (Oid *) palloc(sizeof(Oid) * ncols);
-	*procs = funcs = (Oid *) palloc(sizeof(Oid) * ncols);
-	*strategies = strats = (uint16 *) palloc(sizeof(uint16) * ncols);
+	*operators = ops = (Oid *) palloc(sizeof(Oid) * nkeycols);
+	*procs = funcs = (Oid *) palloc(sizeof(Oid) * nkeycols);
+	*strategies = strats = (uint16 *) palloc(sizeof(uint16) * nkeycols);
 
 	/* Quick exit if we have the data cached already */
 	if (indexRelation->rd_exclstrats != NULL)
 	{
-		memcpy(ops, indexRelation->rd_exclops, sizeof(Oid) * ncols);
-		memcpy(funcs, indexRelation->rd_exclprocs, sizeof(Oid) * ncols);
-		memcpy(strats, indexRelation->rd_exclstrats, sizeof(uint16) * ncols);
+		memcpy(ops, indexRelation->rd_exclops, sizeof(Oid) * nkeycols);
+		memcpy(funcs, indexRelation->rd_exclprocs, sizeof(Oid) * nkeycols);
+		memcpy(strats, indexRelation->rd_exclstrats, sizeof(uint16) * nkeycols);
 		return;
 	}
 
@@ -4435,12 +4438,12 @@ RelationGetExclusionInfo(Relation indexRelation,
 		arr = DatumGetArrayTypeP(val);	/* ensure not toasted */
 		nelem = ARR_DIMS(arr)[0];
 		if (ARR_NDIM(arr) != 1 ||
-			nelem != ncols ||
+			nelem != nkeycols ||
 			ARR_HASNULL(arr) ||
 			ARR_ELEMTYPE(arr) != OIDOID)
 			elog(ERROR, "conexclop is not a 1-D Oid array");
 
-		memcpy(ops, ARR_DATA_PTR(arr), sizeof(Oid) * ncols);
+		memcpy(ops, ARR_DATA_PTR(arr), sizeof(Oid) * nkeycols);
 	}
 
 	systable_endscan(conscan);
@@ -4451,7 +4454,7 @@ RelationGetExclusionInfo(Relation indexRelation,
 			 RelationGetRelationName(indexRelation));
 
 	/* We need the func OIDs and strategy numbers too */
-	for (i = 0; i < ncols; i++)
+	for (i = 0; i < nkeycols; i++)
 	{
 		funcs[i] = get_opcode(ops[i]);
 		strats[i] = get_op_opfamily_strategy(ops[i],
@@ -4464,12 +4467,12 @@ RelationGetExclusionInfo(Relation indexRelation,
 
 	/* Save a copy of the results in the relcache entry. */
 	oldcxt = MemoryContextSwitchTo(indexRelation->rd_indexcxt);
-	indexRelation->rd_exclops = (Oid *) palloc(sizeof(Oid) * ncols);
-	indexRelation->rd_exclprocs = (Oid *) palloc(sizeof(Oid) * ncols);
-	indexRelation->rd_exclstrats = (uint16 *) palloc(sizeof(uint16) * ncols);
-	memcpy(indexRelation->rd_exclops, ops, sizeof(Oid) * ncols);
-	memcpy(indexRelation->rd_exclprocs, funcs, sizeof(Oid) * ncols);
-	memcpy(indexRelation->rd_exclstrats, strats, sizeof(uint16) * ncols);
+	indexRelation->rd_exclops = (Oid *) palloc(sizeof(Oid) * nkeycols);
+	indexRelation->rd_exclprocs = (Oid *) palloc(sizeof(Oid) * nkeycols);
+	indexRelation->rd_exclstrats = (uint16 *) palloc(sizeof(uint16) * nkeycols);
+	memcpy(indexRelation->rd_exclops, ops, sizeof(Oid) * nkeycols);
+	memcpy(indexRelation->rd_exclprocs, funcs, sizeof(Oid) * nkeycols);
+	memcpy(indexRelation->rd_exclstrats, strats, sizeof(uint16) * nkeycols);
 	MemoryContextSwitchTo(oldcxt);
 }
 
diff --git a/src/backend/utils/sort/tuplesort.c b/src/backend/utils/sort/tuplesort.c
index cf1cdcb..78ab531 100644
--- a/src/backend/utils/sort/tuplesort.c
+++ b/src/backend/utils/sort/tuplesort.c
@@ -812,7 +812,7 @@ tuplesort_begin_index_btree(Relation heapRel,
 	state->enforceUnique = enforceUnique;
 
 	indexScanKey = _bt_mkscankey_nodata(indexRel);
-	state->nKeys = RelationGetNumberOfAttributes(indexRel);
+	state->nKeys = IndexRelationGetNumberOfKeyAttributes(indexRel);
 
 	/* Prepare SortSupport data for each column */
 	state->sortKeys = (SortSupport) palloc0(state->nKeys *

Attachment: test_covering_4.0.sql
Description: application/sql

-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to