03.10.2016 05:22, Michael Paquier:
On Tue, Sep 27, 2016 at 12:17 AM, Anastasia Lubennikova
<a.lubennik...@postgrespro.ru> wrote:
Ok, I'll write it in a few days.
Marked as returned with feedback per last emails exchanged.
The only complaint about this patch was a lack of README,
which is fixed now (see the attachment). So, I added it to new CF,
marked as ready for committer.
--
Anastasia Lubennikova
Postgres Professional: http://www.postgrespro.com
The Russian Postgres Company
diff --git a/contrib/dblink/dblink.c b/contrib/dblink/dblink.c
index d4f9090..99735ce 100644
--- a/contrib/dblink/dblink.c
+++ b/contrib/dblink/dblink.c
@@ -100,7 +100,7 @@ static remoteConn *getConnectionByName(const char *name);
static HTAB *createConnHash(void);
static void createNewConnection(const char *name, remoteConn *rconn);
static void deleteConnection(const char *name);
-static char **get_pkey_attnames(Relation rel, int16 *numatts);
+static char **get_pkey_attnames(Relation rel, int16 *indnkeyatts);
static char **get_text_array_contents(ArrayType *array, int *numitems);
static char *get_sql_insert(Relation rel, int *pkattnums, int pknumatts, char **src_pkattvals, char **tgt_pkattvals);
static char *get_sql_delete(Relation rel, int *pkattnums, int pknumatts, char **tgt_pkattvals);
@@ -1483,7 +1483,7 @@ PG_FUNCTION_INFO_V1(dblink_get_pkey);
Datum
dblink_get_pkey(PG_FUNCTION_ARGS)
{
- int16 numatts;
+ int16 indnkeyatts;
char **results;
FuncCallContext *funcctx;
int32 call_cntr;
@@ -1509,7 +1509,7 @@ dblink_get_pkey(PG_FUNCTION_ARGS)
rel = get_rel_from_relname(PG_GETARG_TEXT_P(0), AccessShareLock, ACL_SELECT);
/* get the array of attnums */
- results = get_pkey_attnames(rel, &numatts);
+ results = get_pkey_attnames(rel, &indnkeyatts);
relation_close(rel, AccessShareLock);
@@ -1529,9 +1529,9 @@ dblink_get_pkey(PG_FUNCTION_ARGS)
attinmeta = TupleDescGetAttInMetadata(tupdesc);
funcctx->attinmeta = attinmeta;
- if ((results != NULL) && (numatts > 0))
+ if ((results != NULL) && (indnkeyatts > 0))
{
- funcctx->max_calls = numatts;
+ funcctx->max_calls = indnkeyatts;
/* got results, keep track of them */
funcctx->user_fctx = results;
@@ -2021,10 +2021,10 @@ dblink_fdw_validator(PG_FUNCTION_ARGS)
* get_pkey_attnames
*
* Get the primary key attnames for the given relation.
- * Return NULL, and set numatts = 0, if no primary key exists.
+ * Return NULL, and set indnkeyatts = 0, if no primary key exists.
*/
static char **
-get_pkey_attnames(Relation rel, int16 *numatts)
+get_pkey_attnames(Relation rel, int16 *indnkeyatts)
{
Relation indexRelation;
ScanKeyData skey;
@@ -2034,8 +2034,8 @@ get_pkey_attnames(Relation rel, int16 *numatts)
char **result = NULL;
TupleDesc tupdesc;
- /* initialize numatts to 0 in case no primary key exists */
- *numatts = 0;
+ /* initialize indnkeyatts to 0 in case no primary key exists */
+ *indnkeyatts = 0;
tupdesc = rel->rd_att;
@@ -2056,12 +2056,12 @@ get_pkey_attnames(Relation rel, int16 *numatts)
/* we're only interested if it is the primary key */
if (index->indisprimary)
{
- *numatts = index->indnatts;
- if (*numatts > 0)
+ *indnkeyatts = index->indnkeyatts;
+ if (*indnkeyatts > 0)
{
- result = (char **) palloc(*numatts * sizeof(char *));
+ result = (char **) palloc(*indnkeyatts * sizeof(char *));
- for (i = 0; i < *numatts; i++)
+ for (i = 0; i < *indnkeyatts; i++)
result[i] = SPI_fname(tupdesc, index->indkey.values[i]);
}
break;
diff --git a/contrib/tcn/tcn.c b/contrib/tcn/tcn.c
index 7352b29..c024cf3 100644
--- a/contrib/tcn/tcn.c
+++ b/contrib/tcn/tcn.c
@@ -138,9 +138,9 @@ triggered_change_notification(PG_FUNCTION_ARGS)
/* we're only interested if it is the primary key and valid */
if (index->indisprimary && IndexIsValid(index))
{
- int numatts = index->indnatts;
+ int indnkeyatts = index->indnkeyatts;
- if (numatts > 0)
+ if (indnkeyatts > 0)
{
int i;
@@ -150,7 +150,7 @@ triggered_change_notification(PG_FUNCTION_ARGS)
appendStringInfoCharMacro(payload, ',');
appendStringInfoCharMacro(payload, operation);
- for (i = 0; i < numatts; i++)
+ for (i = 0; i < indnkeyatts; i++)
{
int colno = index->indkey.values[i];
diff --git a/doc/src/sgml/catalogs.sgml b/doc/src/sgml/catalogs.sgml
index 322d8d6..0983c93 100644
--- a/doc/src/sgml/catalogs.sgml
+++ b/doc/src/sgml/catalogs.sgml
@@ -3564,6 +3564,14 @@
<literal>pg_class.relnatts</literal>)</entry>
</row>
+ <row>
+ <entry><structfield>indnkeyatts</structfield></entry>
+ <entry><type>int2</type></entry>
+ <entry></entry>
+ <entry>The number of key columns in the index. "Key columns" are ordinary
+ index columns in contrast with "included" columns.</entry>
+ </row>
+
<row>
<entry><structfield>indisunique</structfield></entry>
<entry><type>bool</type></entry>
diff --git a/doc/src/sgml/indexam.sgml b/doc/src/sgml/indexam.sgml
index 40f201b..92bef4a 100644
--- a/doc/src/sgml/indexam.sgml
+++ b/doc/src/sgml/indexam.sgml
@@ -110,6 +110,8 @@ typedef struct IndexAmRoutine
bool amclusterable;
/* does AM handle predicate locks? */
bool ampredlocks;
+ /* does AM support columns included with clause INCLUDING? */
+ bool amcaninclude;
/* type of data stored in index, or InvalidOid if variable */
Oid amkeytype;
@@ -903,7 +905,8 @@ amrestrpos (IndexScanDesc scan);
using <firstterm>unique indexes</>, which are indexes that disallow
multiple entries with identical keys. An access method that supports this
feature sets <structfield>amcanunique</> true.
- (At present, only b-tree supports it.)
+ (At present, only b-tree supports it.) Columns which are present in the
+ <literal>INCLUDING</> clause are not used to enforce uniqueness.
</para>
<para>
diff --git a/doc/src/sgml/indices.sgml b/doc/src/sgml/indices.sgml
index 46f8e55..f0c6382 100644
--- a/doc/src/sgml/indices.sgml
+++ b/doc/src/sgml/indices.sgml
@@ -651,7 +651,8 @@ CREATE INDEX test3_desc_index ON test3 (id DESC NULLS LAST);
Indexes can also be used to enforce uniqueness of a column's value,
or the uniqueness of the combined values of more than one column.
<synopsis>
-CREATE UNIQUE INDEX <replaceable>name</replaceable> ON <replaceable>table</replaceable> (<replaceable>column</replaceable> <optional>, ...</optional>);
+CREATE UNIQUE INDEX <replaceable>name</replaceable> ON <replaceable>table</replaceable> (<replaceable>column</replaceable> <optional>, ...</optional>)
+<optional>INCLUDING (<replaceable>column</replaceable> <optional>, ...</optional>)</optional>;
</synopsis>
Currently, only B-tree indexes can be declared unique.
</para>
@@ -660,7 +661,9 @@ CREATE UNIQUE INDEX <replaceable>name</replaceable> ON <replaceable>table</repla
When an index is declared unique, multiple table rows with equal
indexed values are not allowed. Null values are not considered
equal. A multicolumn unique index will only reject cases where all
- indexed columns are equal in multiple rows.
+ indexed columns are equal in multiple rows. Columns included with clause
+ <literal>INCLUDING</literal> aren't used to enforce constraints (UNIQUE,
+ PRIMARY KEY, etc).
</para>
<para>
diff --git a/doc/src/sgml/ref/create_index.sgml b/doc/src/sgml/ref/create_index.sgml
index e9f47c4..4be8cfc 100644
--- a/doc/src/sgml/ref/create_index.sgml
+++ b/doc/src/sgml/ref/create_index.sgml
@@ -23,6 +23,7 @@ PostgreSQL documentation
<synopsis>
CREATE [ UNIQUE ] INDEX [ CONCURRENTLY ] [ [ IF NOT EXISTS ] <replaceable class="parameter">name</replaceable> ] ON <replaceable class="parameter">table_name</replaceable> [ USING <replaceable class="parameter">method</replaceable> ]
( { <replaceable class="parameter">column_name</replaceable> | ( <replaceable class="parameter">expression</replaceable> ) } [ COLLATE <replaceable class="parameter">collation</replaceable> ] [ <replaceable class="parameter">opclass</replaceable> ] [ ASC | DESC ] [ NULLS { FIRST | LAST } ] [, ...] )
+ [ INCLUDING ( <replaceable class="parameter">column_name</replaceable> ) ]
[ WITH ( <replaceable class="PARAMETER">storage_parameter</replaceable> = <replaceable class="PARAMETER">value</replaceable> [, ... ] ) ]
[ TABLESPACE <replaceable class="parameter">tablespace_name</replaceable> ]
[ WHERE <replaceable class="parameter">predicate</replaceable> ]
@@ -139,6 +140,34 @@ CREATE [ UNIQUE ] INDEX [ CONCURRENTLY ] [ [ IF NOT EXISTS ] <replaceable class=
</varlistentry>
<varlistentry>
+ <term><literal>INCLUDING</literal></term>
+ <listitem>
+ <para>
+ An optional <literal>INCLUDING</> clause allows a list of columns to be
+ specified which will be included in the index, in the non-key portion of
+ the index. Columns which are part of this clause cannot also exist in the
+ key columns portion of the index, and vice versa. The
+ <literal>INCLUDING</> columns exist solely to allow more queries to benefit
+ from <firstterm>index-only scans</> by including certain columns in the
+ index, the value of which would otherwise have to be obtained by reading
+ the table's heap. Having these columns in the <literal>INCLUDING</> clause
+ in some cases allows <productname>PostgreSQL</> to skip the heap read
+ completely. This also allows <literal>UNIQUE</> indexes to be defined on
+ one set of columns, which can include another set of column in the
+ <literal>INCLUDING</> clause, on which the uniqueness is not enforced upon.
+ It's the same with other constraints (PRIMARY KEY and EXCLUDE). This can
+ also can be used for non-unique indexes as any columns which are not required
+ for the searching or ordering of records can be included in the
+ <literal>INCLUDING</> clause, which can slightly reduce the size of the index,
+ due to storing included attributes only in leaf index pages.
+ Currently, only the B-tree access method supports this feature.
+ Expressions as included columns are not supported since they cannot be used
+ in index-only scan.
+ </para>
+ </listitem>
+ </varlistentry>
+
+ <varlistentry>
<term><replaceable class="parameter">name</replaceable></term>
<listitem>
<para>
@@ -590,7 +619,7 @@ Indexes:
<title>Examples</title>
<para>
- To create a B-tree index on the column <literal>title</literal> in
+ To create a unique B-tree index on the column <literal>title</literal> in
the table <literal>films</literal>:
<programlisting>
CREATE UNIQUE INDEX title_idx ON films (title);
@@ -598,6 +627,15 @@ CREATE UNIQUE INDEX title_idx ON films (title);
</para>
<para>
+ To create a unique B-tree index on the column <literal>title</literal>
+ and included columns <literal>director</literal> and <literal>rating</literal>
+ in the table <literal>films</literal>:
+<programlisting>
+CREATE UNIQUE INDEX title_idx ON films (title) INCLUDING (director, rating);
+</programlisting>
+ </para>
+
+ <para>
To create an index on the expression <literal>lower(title)</>,
allowing efficient case-insensitive searches:
<programlisting>
diff --git a/doc/src/sgml/ref/create_table.sgml b/doc/src/sgml/ref/create_table.sgml
index bf2ad64..c48412e 100644
--- a/doc/src/sgml/ref/create_table.sgml
+++ b/doc/src/sgml/ref/create_table.sgml
@@ -59,8 +59,8 @@ CREATE [ [ GLOBAL | LOCAL ] { TEMPORARY | TEMP } | UNLOGGED ] TABLE [ IF NOT EXI
[ CONSTRAINT <replaceable class="PARAMETER">constraint_name</replaceable> ]
{ CHECK ( <replaceable class="PARAMETER">expression</replaceable> ) [ NO INHERIT ] |
- UNIQUE ( <replaceable class="PARAMETER">column_name</replaceable> [, ... ] ) <replaceable class="PARAMETER">index_parameters</replaceable> |
- PRIMARY KEY ( <replaceable class="PARAMETER">column_name</replaceable> [, ... ] ) <replaceable class="PARAMETER">index_parameters</replaceable> |
+ UNIQUE ( <replaceable class="PARAMETER">column_name</replaceable> [, ... ] ) <replaceable class="PARAMETER">index_parameters</replaceable> <optional>INCLUDING (<replaceable class="PARAMETER">column_name</replaceable> [, ...])</optional> |
+ PRIMARY KEY ( <replaceable class="PARAMETER">column_name</replaceable> [, ... ] ) <replaceable class="PARAMETER">index_parameters</replaceable> <optional>INCLUDING (<replaceable class="PARAMETER">column_name</replaceable> [, ...])</optional> |
EXCLUDE [ USING <replaceable class="parameter">index_method</replaceable> ] ( <replaceable class="parameter">exclude_element</replaceable> WITH <replaceable class="parameter">operator</replaceable> [, ... ] ) <replaceable class="parameter">index_parameters</replaceable> [ WHERE ( <replaceable class="parameter">predicate</replaceable> ) ] |
FOREIGN KEY ( <replaceable class="PARAMETER">column_name</replaceable> [, ... ] ) REFERENCES <replaceable class="PARAMETER">reftable</replaceable> [ ( <replaceable class="PARAMETER">refcolumn</replaceable> [, ... ] ) ]
[ MATCH FULL | MATCH PARTIAL | MATCH SIMPLE ] [ ON DELETE <replaceable class="parameter">action</replaceable> ] [ ON UPDATE <replaceable class="parameter">action</replaceable> ] }
@@ -485,8 +485,8 @@ CREATE [ [ GLOBAL | LOCAL ] { TEMPORARY | TEMP } | UNLOGGED ] TABLE [ IF NOT EXI
<varlistentry>
<term><literal>UNIQUE</> (column constraint)</term>
- <term><literal>UNIQUE ( <replaceable class="PARAMETER">column_name</replaceable> [, ... ] )</> (table constraint)</term>
-
+ <term><literal>UNIQUE ( <replaceable class="PARAMETER">column_name</replaceable> [, ... ] )
+ <optional>INCLUDING ( <replaceable class="PARAMETER">column_name</replaceable> [, ...])</optional></> (table constraint)</term>
<listitem>
<para>
The <literal>UNIQUE</literal> constraint specifies that a
@@ -507,12 +507,26 @@ CREATE [ [ GLOBAL | LOCAL ] { TEMPORARY | TEMP } | UNLOGGED ] TABLE [ IF NOT EXI
primary key constraint defined for the table. (Otherwise it
would just be the same constraint listed twice.)
</para>
+
+ <para>
+ Adding a unique constraint will automatically create a unique btree
+ index on the column or group of columns used in the constraint.
+ Optional clause <literal>INCLUDING</literal> allows to add into the index
+ a portion of columns on which the uniqueness is not enforced upon.
+ Note, that althogh constraint is not enforced upon included columns, it still
+ depends on them. Consequently, some operations on these columns (e.g. <literal>DROP COLUMN</literal>)
+ can cause cascade constraint and index deletion.
+ See paragraph about <literal>INCLUDING</literal> in
+ <xref linkend="SQL-CREATEINDEX"> for more information.
+ </para>
+
</listitem>
</varlistentry>
<varlistentry>
<term><literal>PRIMARY KEY</> (column constraint)</term>
- <term><literal>PRIMARY KEY ( <replaceable class="PARAMETER">column_name</replaceable> [, ... ] )</> (table constraint)</term>
+ <term><literal>PRIMARY KEY ( <replaceable class="PARAMETER">column_name</replaceable> [, ... ] )
+ <optional>INCLUDING ( <replaceable class="PARAMETER">column_name</replaceable> [, ...])</optional></> (table constraint)</term>
<listitem>
<para>
The <literal>PRIMARY KEY</> constraint specifies that a column or
@@ -535,6 +549,18 @@ CREATE [ [ GLOBAL | LOCAL ] { TEMPORARY | TEMP } | UNLOGGED ] TABLE [ IF NOT EXI
about the design of the schema, since a primary key implies that other
tables can rely on this set of columns as a unique identifier for rows.
</para>
+
+ <para>
+ Adding a <literal>PRIMARY KEY</literal> constraint will automatically create a unique btree
+ index on the column or group of columns used in the constraint.
+ Optional clause <literal>INCLUDING</literal> allows to add into the index
+ a portion of columns on which the constraint is not enforced upon.
+ Note, that althogh constraint is not enforced upon included columns, it still
+ depends on them. Consequently, some operations on these columns (e.g. <literal>DROP COLUMN</literal>)
+ can cause cascade constraint and index deletion.
+ See paragraph about <literal>INCLUDING</literal> in
+ <xref linkend="SQL-CREATEINDEX"> for more information.
+ </para>
</listitem>
</varlistentry>
diff --git a/src/backend/access/brin/brin.c b/src/backend/access/brin/brin.c
index 1b45a4c..d2602af 100644
--- a/src/backend/access/brin/brin.c
+++ b/src/backend/access/brin/brin.c
@@ -92,6 +92,7 @@ brinhandler(PG_FUNCTION_ARGS)
amroutine->amstorage = true;
amroutine->amclusterable = false;
amroutine->ampredlocks = false;
+ amroutine->amcaninclude = false;
amroutine->amkeytype = InvalidOid;
amroutine->ambuild = brinbuild;
diff --git a/src/backend/access/common/indextuple.c b/src/backend/access/common/indextuple.c
index 274a6c2..8884c1e 100644
--- a/src/backend/access/common/indextuple.c
+++ b/src/backend/access/common/indextuple.c
@@ -19,6 +19,7 @@
#include "access/heapam.h"
#include "access/itup.h"
#include "access/tuptoaster.h"
+#include "utils/rel.h"
/* ----------------------------------------------------------------
@@ -441,3 +442,33 @@ CopyIndexTuple(IndexTuple source)
memcpy(result, source, size);
return result;
}
+
+/*
+ * Reform index tuple. Truncate nonkey (INCLUDING) attributes.
+ */
+IndexTuple
+index_truncate_tuple(Relation idxrel, IndexTuple olditup)
+{
+ TupleDesc itupdesc = RelationGetDescr(idxrel);
+ Datum values[INDEX_MAX_KEYS];
+ bool isnull[INDEX_MAX_KEYS];
+ IndexTuple newitup;
+ int indnatts = IndexRelationGetNumberOfAttributes(idxrel);
+ int indnkeyatts = IndexRelationGetNumberOfKeyAttributes(idxrel);
+
+ Assert(indnatts <= INDEX_MAX_KEYS);
+ Assert(indnkeyatts > 0);
+ Assert(indnkeyatts < indnatts);
+
+ index_deform_tuple(olditup, itupdesc, values, isnull);
+
+ /* form new tuple that will contain only key attributes */
+ itupdesc->natts = indnkeyatts;
+ newitup = index_form_tuple(itupdesc, values, isnull);
+ newitup->t_tid = olditup->t_tid;
+
+ itupdesc->natts = indnatts;
+
+ Assert(IndexTupleSize(newitup) <= IndexTupleSize(olditup));
+ return newitup;
+}
diff --git a/src/backend/access/gin/ginutil.c b/src/backend/access/gin/ginutil.c
index d914648..db9b816 100644
--- a/src/backend/access/gin/ginutil.c
+++ b/src/backend/access/gin/ginutil.c
@@ -47,6 +47,7 @@ ginhandler(PG_FUNCTION_ARGS)
amroutine->amstorage = true;
amroutine->amclusterable = false;
amroutine->ampredlocks = false;
+ amroutine->amcaninclude = false;
amroutine->amkeytype = InvalidOid;
amroutine->ambuild = ginbuild;
diff --git a/src/backend/access/gist/gist.c b/src/backend/access/gist/gist.c
index f7f44b4..1803a8b 100644
--- a/src/backend/access/gist/gist.c
+++ b/src/backend/access/gist/gist.c
@@ -69,6 +69,7 @@ gisthandler(PG_FUNCTION_ARGS)
amroutine->amstorage = true;
amroutine->amclusterable = true;
amroutine->ampredlocks = false;
+ amroutine->amcaninclude = false;
amroutine->amkeytype = InvalidOid;
amroutine->ambuild = gistbuild;
diff --git a/src/backend/access/hash/hash.c b/src/backend/access/hash/hash.c
index e3b1eef..e8e83fc 100644
--- a/src/backend/access/hash/hash.c
+++ b/src/backend/access/hash/hash.c
@@ -66,6 +66,7 @@ hashhandler(PG_FUNCTION_ARGS)
amroutine->amstorage = false;
amroutine->amclusterable = false;
amroutine->ampredlocks = false;
+ amroutine->amcaninclude = false;
amroutine->amkeytype = INT4OID;
amroutine->ambuild = hashbuild;
diff --git a/src/backend/access/index/genam.c b/src/backend/access/index/genam.c
index 65c941d..7b47bd4 100644
--- a/src/backend/access/index/genam.c
+++ b/src/backend/access/index/genam.c
@@ -156,7 +156,8 @@ IndexScanEnd(IndexScanDesc scan)
*
* Construct a string describing the contents of an index entry, in the
* form "(key_name, ...)=(key_value, ...)". This is currently used
- * for building unique-constraint and exclusion-constraint error messages.
+ * for building unique-constraint and exclusion-constraint error messages,
+ * so only key columns of index are checked and printed.
*
* Note that if the user does not have permissions to view all of the
* columns involved then a NULL is returned. Returning a partial key seems
@@ -174,13 +175,14 @@ BuildIndexValueDescription(Relation indexRelation,
StringInfoData buf;
Form_pg_index idxrec;
HeapTuple ht_idx;
- int natts = indexRelation->rd_rel->relnatts;
+ int indnkeyatts;
int i;
int keyno;
Oid indexrelid = RelationGetRelid(indexRelation);
Oid indrelid;
AclResult aclresult;
+ indnkeyatts = IndexRelationGetNumberOfKeyAttributes(indexRelation);
/*
* Check permissions- if the user does not have access to view all of the
* key columns then return NULL to avoid leaking data.
@@ -218,7 +220,7 @@ BuildIndexValueDescription(Relation indexRelation,
* No table-level access, so step through the columns in the index and
* make sure the user has SELECT rights on all of them.
*/
- for (keyno = 0; keyno < idxrec->indnatts; keyno++)
+ for (keyno = 0; keyno < idxrec->indnkeyatts; keyno++)
{
AttrNumber attnum = idxrec->indkey.values[keyno];
@@ -244,7 +246,7 @@ BuildIndexValueDescription(Relation indexRelation,
appendStringInfo(&buf, "(%s)=(",
pg_get_indexdef_columns(indexrelid, true));
- for (i = 0; i < natts; i++)
+ for (i = 0; i < indnkeyatts; i++)
{
char *val;
@@ -362,7 +364,7 @@ systable_beginscan(Relation heapRelation,
{
int j;
- for (j = 0; j < irel->rd_index->indnatts; j++)
+ for (j = 0; j < IndexRelationGetNumberOfAttributes(irel); j++)
{
if (key[i].sk_attno == irel->rd_index->indkey.values[j])
{
@@ -370,7 +372,7 @@ systable_beginscan(Relation heapRelation,
break;
}
}
- if (j == irel->rd_index->indnatts)
+ if (j == IndexRelationGetNumberOfAttributes(irel))
elog(ERROR, "column is not in index");
}
@@ -564,7 +566,7 @@ systable_beginscan_ordered(Relation heapRelation,
{
int j;
- for (j = 0; j < indexRelation->rd_index->indnatts; j++)
+ for (j = 0; j < IndexRelationGetNumberOfAttributes(indexRelation); j++)
{
if (key[i].sk_attno == indexRelation->rd_index->indkey.values[j])
{
@@ -572,7 +574,7 @@ systable_beginscan_ordered(Relation heapRelation,
break;
}
}
- if (j == indexRelation->rd_index->indnatts)
+ if (j == IndexRelationGetNumberOfAttributes(indexRelation))
elog(ERROR, "column is not in index");
}
diff --git a/src/backend/access/nbtree/README b/src/backend/access/nbtree/README
index 067d15c..df50bf1 100644
--- a/src/backend/access/nbtree/README
+++ b/src/backend/access/nbtree/README
@@ -675,3 +675,15 @@ Also, index searches using a key of a different datatype require comparisons
to behave sanely across two datatypes. The extensions to three or more
datatypes within a family are not strictly required by the btree index
mechanism itself, but the planner relies on them for optimization purposes.
+
+Included attributes in B-tree indexes
+-------------------------------------
+
+Since 10.0 there is an optional INCLUDING clause, that allows to add
+a portion of non-key attributes to index. They exist to allow more queries
+to benefit from index-only scans. We never use included attributes in
+ScanKeys, neither for search nor for inserts. That allows us to include
+into B-tree any datatypes, even those which don't have suitable opclass.
+Included columns only stored in regular items on leaf pages. All inner
+keys and high keys are truncated and contain only key attributes.
+That helps to reduce the size of index.
diff --git a/src/backend/access/nbtree/nbtinsert.c b/src/backend/access/nbtree/nbtinsert.c
index ef69290..3d912fb 100644
--- a/src/backend/access/nbtree/nbtinsert.c
+++ b/src/backend/access/nbtree/nbtinsert.c
@@ -78,8 +78,6 @@ static OffsetNumber _bt_findsplitloc(Relation rel, Page page,
static void _bt_checksplitloc(FindSplitData *state,
OffsetNumber firstoldonright, bool newitemonleft,
int dataitemstoleft, Size firstoldonrightsz);
-static bool _bt_pgaddtup(Page page, Size itemsize, IndexTuple itup,
- OffsetNumber itup_off);
static bool _bt_isequal(TupleDesc itupdesc, Page page, OffsetNumber offnum,
int keysz, ScanKey scankey);
static void _bt_vacuum_one_page(Relation rel, Buffer buffer, Relation heapRel);
@@ -108,18 +106,22 @@ _bt_doinsert(Relation rel, IndexTuple itup,
IndexUniqueCheck checkUnique, Relation heapRel)
{
bool is_unique = false;
- int natts = rel->rd_rel->relnatts;
+ int indnkeyatts;
ScanKey itup_scankey;
BTStack stack;
Buffer buf;
OffsetNumber offset;
+ Assert(IndexRelationGetNumberOfAttributes(rel) != 0);
+ indnkeyatts = IndexRelationGetNumberOfKeyAttributes(rel);
+ Assert(indnkeyatts != 0);
+
/* we need an insertion scan key to do our search, so build one */
itup_scankey = _bt_mkscankey(rel, itup);
top:
/* find the first page containing this key */
- stack = _bt_search(rel, natts, itup_scankey, false, &buf, BT_WRITE, NULL);
+ stack = _bt_search(rel, indnkeyatts, itup_scankey, false, &buf, BT_WRITE, NULL);
offset = InvalidOffsetNumber;
@@ -134,7 +136,7 @@ top:
* move right in the tree. See Lehman and Yao for an excruciatingly
* precise description.
*/
- buf = _bt_moveright(rel, buf, natts, itup_scankey, false,
+ buf = _bt_moveright(rel, buf, indnkeyatts, itup_scankey, false,
true, stack, BT_WRITE, NULL);
/*
@@ -163,7 +165,7 @@ top:
TransactionId xwait;
uint32 speculativeToken;
- offset = _bt_binsrch(rel, buf, natts, itup_scankey, false);
+ offset = _bt_binsrch(rel, buf, indnkeyatts, itup_scankey, false);
xwait = _bt_check_unique(rel, itup, heapRel, buf, offset, itup_scankey,
checkUnique, &is_unique, &speculativeToken);
@@ -199,7 +201,7 @@ top:
*/
CheckForSerializableConflictIn(rel, NULL, buf);
/* do the insertion */
- _bt_findinsertloc(rel, &buf, &offset, natts, itup_scankey, itup,
+ _bt_findinsertloc(rel, &buf, &offset, indnkeyatts, itup_scankey, itup,
stack, heapRel);
_bt_insertonpg(rel, buf, InvalidBuffer, stack, itup, offset, false);
}
@@ -242,7 +244,7 @@ _bt_check_unique(Relation rel, IndexTuple itup, Relation heapRel,
uint32 *speculativeToken)
{
TupleDesc itupdesc = RelationGetDescr(rel);
- int natts = rel->rd_rel->relnatts;
+ int indnkeyatts = IndexRelationGetNumberOfKeyAttributes(rel);
SnapshotData SnapshotDirty;
OffsetNumber maxoff;
Page page;
@@ -301,7 +303,7 @@ _bt_check_unique(Relation rel, IndexTuple itup, Relation heapRel,
* in real comparison, but only for ordering/finding items on
* pages. - vadim 03/24/97
*/
- if (!_bt_isequal(itupdesc, page, offset, natts, itup_scankey))
+ if (!_bt_isequal(itupdesc, page, offset, indnkeyatts, itup_scankey))
break; /* we're past all the equal tuples */
/* okay, we gotta fetch the heap tuple ... */
@@ -466,7 +468,7 @@ _bt_check_unique(Relation rel, IndexTuple itup, Relation heapRel,
if (P_RIGHTMOST(opaque))
break;
if (!_bt_isequal(itupdesc, page, P_HIKEY,
- natts, itup_scankey))
+ indnkeyatts, itup_scankey))
break;
/* Advance to next non-dead page --- there must be one */
for (;;)
@@ -981,6 +983,9 @@ _bt_split(Relation rel, Buffer buf, Buffer cbuf, OffsetNumber firstright,
OffsetNumber i;
bool isroot;
bool isleaf;
+ IndexTuple lefthikey;
+ int indnatts = IndexRelationGetNumberOfAttributes(rel);
+ int indnkeyatts = IndexRelationGetNumberOfKeyAttributes(rel);
/* Acquire a new page to split into */
rbuf = _bt_getbuf(rel, P_NEW, BT_WRITE);
@@ -1081,7 +1086,22 @@ _bt_split(Relation rel, Buffer buf, Buffer cbuf, OffsetNumber firstright,
itemsz = ItemIdGetLength(itemid);
item = (IndexTuple) PageGetItem(origpage, itemid);
}
- if (PageAddItem(leftpage, (Item) item, itemsz, leftoff,
+
+ /*
+ * We must truncate the "high key" item, before insert it onto the leaf page.
+ * It's the only point in insertion process, where we perform truncation.
+ * All other functions work with this high key and do not change it.
+ */
+ if (indnatts != indnkeyatts && P_ISLEAF(lopaque))
+ {
+ lefthikey = index_truncate_tuple(rel, item);
+ itemsz = IndexTupleSize(lefthikey);
+ itemsz = MAXALIGN(itemsz);
+ }
+ else
+ lefthikey = item;
+
+ if (PageAddItem(leftpage, (Item) lefthikey, itemsz, leftoff,
false, false) == InvalidOffsetNumber)
{
memset(rightpage, 0, BufferGetPageSize(rbuf));
@@ -2088,7 +2108,7 @@ _bt_newroot(Relation rel, Buffer lbuf, Buffer rbuf)
* we insert the tuples in order, so that the given itup_off does
* represent the final position of the tuple!
*/
-static bool
+bool
_bt_pgaddtup(Page page,
Size itemsize,
IndexTuple itup,
diff --git a/src/backend/access/nbtree/nbtpage.c b/src/backend/access/nbtree/nbtpage.c
index 2001dc1..bd5b8d7 100644
--- a/src/backend/access/nbtree/nbtpage.c
+++ b/src/backend/access/nbtree/nbtpage.c
@@ -1254,8 +1254,9 @@ _bt_pagedel(Relation rel, Buffer buf)
/* we need an insertion scan key for the search, so build one */
itup_scankey = _bt_mkscankey(rel, targetkey);
/* find the leftmost leaf page containing this key */
- stack = _bt_search(rel, rel->rd_rel->relnatts, itup_scankey,
- false, &lbuf, BT_READ, NULL);
+ stack = _bt_search(rel,
+ IndexRelationGetNumberOfKeyAttributes(rel),
+ itup_scankey, false, &lbuf, BT_READ, NULL);
/* don't need a pin on the page */
_bt_relbuf(rel, lbuf);
diff --git a/src/backend/access/nbtree/nbtree.c b/src/backend/access/nbtree/nbtree.c
index 128744c..d985d9f 100644
--- a/src/backend/access/nbtree/nbtree.c
+++ b/src/backend/access/nbtree/nbtree.c
@@ -98,6 +98,7 @@ bthandler(PG_FUNCTION_ARGS)
amroutine->amstorage = false;
amroutine->amclusterable = true;
amroutine->ampredlocks = true;
+ amroutine->amcaninclude = true;
amroutine->amkeytype = InvalidOid;
amroutine->ambuild = btbuild;
diff --git a/src/backend/access/nbtree/nbtsort.c b/src/backend/access/nbtree/nbtsort.c
index 99a014e..cca7277 100644
--- a/src/backend/access/nbtree/nbtsort.c
+++ b/src/backend/access/nbtree/nbtsort.c
@@ -456,6 +456,9 @@ _bt_buildadd(BTWriteState *wstate, BTPageState *state, IndexTuple itup)
OffsetNumber last_off;
Size pgspc;
Size itupsz;
+ BTPageOpaque pageop;
+ int indnatts = IndexRelationGetNumberOfAttributes(wstate->index);
+ int indnkeyatts = IndexRelationGetNumberOfKeyAttributes(wstate->index);
/*
* This is a handy place to check for cancel interrupts during the btree
@@ -510,6 +513,8 @@ _bt_buildadd(BTWriteState *wstate, BTPageState *state, IndexTuple itup)
ItemId ii;
ItemId hii;
IndexTuple oitup;
+ IndexTuple keytup;
+ BTPageOpaque opageop = (BTPageOpaque) PageGetSpecialPointer(opage);
/* Create new page of same level */
npage = _bt_blnewpage(state->btps_level);
@@ -537,6 +542,28 @@ _bt_buildadd(BTWriteState *wstate, BTPageState *state, IndexTuple itup)
ItemIdSetUnused(ii); /* redundant */
((PageHeader) opage)->pd_lower -= sizeof(ItemIdData);
+ if (indnkeyatts != indnatts && P_ISLEAF(opageop))
+ {
+ /*
+ * It's essential to truncate High key here.
+ * The purpose is not just to save more space on this particular page,
+ * but to keep whole b-tree structure consistent. Subsequent insertions
+ * assume that hikey is already truncated, and so they should not
+ * worry about it, when copying the high key into the parent page
+ * as a downlink.
+ * NOTE It is not crutial for reliability in present,
+ * but maybe it will be that in the future.
+ */
+ keytup = index_truncate_tuple(wstate->index, oitup);
+
+ /* delete "wrong" high key, insert keytup as P_HIKEY. */
+ PageIndexTupleDelete(opage, P_HIKEY);
+
+ if (!_bt_pgaddtup(opage, IndexTupleSize(keytup), keytup, P_HIKEY))
+ elog(ERROR, "failed to rewrite compressed item in index \"%s\"",
+ RelationGetRelationName(wstate->index));
+ }
+
/*
* Link the old page into its parent, using its minimum key. If we
* don't have a parent, we have to create one; this adds a new btree
@@ -554,7 +581,11 @@ _bt_buildadd(BTWriteState *wstate, BTPageState *state, IndexTuple itup)
* Save a copy of the minimum key for the new page. We have to copy
* it off the old page, not the new one, in case we are not at leaf
* level.
+ * Despite oitup is already initialized, it's important to get high
+ * key from the page, since we could have replaced it with truncated
+ * copy. See comment above.
*/
+ oitup = (IndexTuple) PageGetItem(opage,PageGetItemId(opage, P_HIKEY));
state->btps_minkey = CopyIndexTuple(oitup);
/*
@@ -581,6 +612,7 @@ _bt_buildadd(BTWriteState *wstate, BTPageState *state, IndexTuple itup)
last_off = P_FIRSTKEY;
}
+ pageop = (BTPageOpaque) PageGetSpecialPointer(npage);
/*
* If the new item is the first for its page, stash a copy for later. Note
* this will only happen for the first item on a level; on later pages,
@@ -590,7 +622,14 @@ _bt_buildadd(BTWriteState *wstate, BTPageState *state, IndexTuple itup)
if (last_off == P_HIKEY)
{
Assert(state->btps_minkey == NULL);
- state->btps_minkey = CopyIndexTuple(itup);
+ /*
+ * Truncate the tuple that we're going to insert
+ * into the parent page as a downlink
+ */
+ if (indnkeyatts != indnatts && P_ISLEAF(pageop))
+ state->btps_minkey = index_truncate_tuple(wstate->index, itup);
+ else
+ state->btps_minkey = CopyIndexTuple(itup);
}
/*
@@ -685,7 +724,7 @@ _bt_load(BTWriteState *wstate, BTSpool *btspool, BTSpool *btspool2)
load1;
TupleDesc tupdes = RelationGetDescr(wstate->index);
int i,
- keysz = RelationGetNumberOfAttributes(wstate->index);
+ keysz = IndexRelationGetNumberOfKeyAttributes(wstate->index);
ScanKey indexScanKey = NULL;
SortSupport sortKeys;
diff --git a/src/backend/access/nbtree/nbtutils.c b/src/backend/access/nbtree/nbtutils.c
index 063c988..03b2f44 100644
--- a/src/backend/access/nbtree/nbtutils.c
+++ b/src/backend/access/nbtree/nbtutils.c
@@ -63,17 +63,26 @@ _bt_mkscankey(Relation rel, IndexTuple itup)
{
ScanKey skey;
TupleDesc itupdesc;
- int natts;
+ int indnatts PG_USED_FOR_ASSERTS_ONLY;
+ int indnkeyatts;
int16 *indoption;
int i;
itupdesc = RelationGetDescr(rel);
- natts = RelationGetNumberOfAttributes(rel);
+ indnatts = IndexRelationGetNumberOfAttributes(rel);
+ indnkeyatts = IndexRelationGetNumberOfKeyAttributes(rel);
indoption = rel->rd_indoption;
- skey = (ScanKey) palloc(natts * sizeof(ScanKeyData));
+ Assert(indnkeyatts != 0);
+ Assert(indnkeyatts <= indnatts);
- for (i = 0; i < natts; i++)
+ /*
+ * We'll execute search using ScanKey constructed on key columns.
+ * Non key (included) columns must be omitted.
+ */
+ skey = (ScanKey) palloc(indnkeyatts * sizeof(ScanKeyData));
+
+ for (i = 0; i < indnkeyatts; i++)
{
FmgrInfo *procinfo;
Datum arg;
@@ -115,16 +124,16 @@ ScanKey
_bt_mkscankey_nodata(Relation rel)
{
ScanKey skey;
- int natts;
+ int indnkeyatts;
int16 *indoption;
int i;
- natts = RelationGetNumberOfAttributes(rel);
+ indnkeyatts = IndexRelationGetNumberOfKeyAttributes(rel);
indoption = rel->rd_indoption;
- skey = (ScanKey) palloc(natts * sizeof(ScanKeyData));
+ skey = (ScanKey) palloc(indnkeyatts * sizeof(ScanKeyData));
- for (i = 0; i < natts; i++)
+ for (i = 0; i < indnkeyatts; i++)
{
FmgrInfo *procinfo;
int flags;
diff --git a/src/backend/access/spgist/spgutils.c b/src/backend/access/spgist/spgutils.c
index d570ae5..d06fff9 100644
--- a/src/backend/access/spgist/spgutils.c
+++ b/src/backend/access/spgist/spgutils.c
@@ -48,6 +48,7 @@ spghandler(PG_FUNCTION_ARGS)
amroutine->amstorage = false;
amroutine->amclusterable = false;
amroutine->ampredlocks = false;
+ amroutine->amcaninclude = false;
amroutine->amkeytype = InvalidOid;
amroutine->ambuild = spgbuild;
diff --git a/src/backend/bootstrap/bootparse.y b/src/backend/bootstrap/bootparse.y
index 41d2fd4..c47c387 100644
--- a/src/backend/bootstrap/bootparse.y
+++ b/src/backend/bootstrap/bootparse.y
@@ -293,6 +293,7 @@ Boot_DeclareIndexStmt:
stmt->accessMethod = $8;
stmt->tableSpace = NULL;
stmt->indexParams = $10;
+ stmt->indexIncludingParams = NIL;
stmt->options = NIL;
stmt->whereClause = NULL;
stmt->excludeOpNames = NIL;
@@ -336,6 +337,7 @@ Boot_DeclareUniqueIndexStmt:
stmt->accessMethod = $9;
stmt->tableSpace = NULL;
stmt->indexParams = $11;
+ stmt->indexIncludingParams = NIL;
stmt->options = NIL;
stmt->whereClause = NULL;
stmt->excludeOpNames = NIL;
diff --git a/src/backend/bootstrap/bootstrap.c b/src/backend/bootstrap/bootstrap.c
index 3870a4d..fd62e76 100644
--- a/src/backend/bootstrap/bootstrap.c
+++ b/src/backend/bootstrap/bootstrap.c
@@ -594,7 +594,7 @@ boot_openrel(char *relname)
relname, (int) ATTRIBUTE_FIXED_PART_SIZE);
boot_reldesc = heap_openrv(makeRangeVar(NULL, relname, -1), NoLock);
- numattr = boot_reldesc->rd_rel->relnatts;
+ numattr = RelationGetNumberOfAttributes(boot_reldesc);
for (i = 0; i < numattr; i++)
{
if (attrtypes[i] == NULL)
diff --git a/src/backend/catalog/heap.c b/src/backend/catalog/heap.c
index e997b57..46c6109 100644
--- a/src/backend/catalog/heap.c
+++ b/src/backend/catalog/heap.c
@@ -2043,7 +2043,8 @@ StoreRelCheck(Relation rel, char *ccname, Node *expr,
is_validated,
RelationGetRelid(rel), /* relation */
attNos, /* attrs in the constraint */
- keycount, /* # attrs in the constraint */
+ keycount, /* # key attrs in the constraint */
+ keycount, /* # total attrs in the constraint */
InvalidOid, /* not a domain constraint */
InvalidOid, /* no associated index */
InvalidOid, /* Foreign key fields */
diff --git a/src/backend/catalog/index.c b/src/backend/catalog/index.c
index b0b43cf..8db6eac 100644
--- a/src/backend/catalog/index.c
+++ b/src/backend/catalog/index.c
@@ -216,7 +216,7 @@ index_check_primary_key(Relation heapRel,
* null, otherwise attempt to ALTER TABLE .. SET NOT NULL
*/
cmds = NIL;
- for (i = 0; i < indexInfo->ii_NumIndexAttrs; i++)
+ for (i = 0; i < indexInfo->ii_NumIndexKeyAttrs; i++)
{
AttrNumber attnum = indexInfo->ii_KeyAttrNumbers[i];
HeapTuple atttuple;
@@ -425,6 +425,13 @@ ConstructTupleDescriptor(Relation heapRelation,
colnames_item = lnext(colnames_item);
/*
+ * Code below is concerned to the opclasses which are not used
+ * with the included columns.
+ */
+ if (i >= indexInfo->ii_NumIndexKeyAttrs)
+ continue;
+
+ /*
* Check the opclass and index AM to see if either provides a keytype
* (overriding the attribute type). Opclass takes precedence.
*/
@@ -560,7 +567,7 @@ UpdateIndexRelation(Oid indexoid,
for (i = 0; i < indexInfo->ii_NumIndexAttrs; i++)
indkey->values[i] = indexInfo->ii_KeyAttrNumbers[i];
indcollation = buildoidvector(collationOids, indexInfo->ii_NumIndexAttrs);
- indclass = buildoidvector(classOids, indexInfo->ii_NumIndexAttrs);
+ indclass = buildoidvector(classOids, indexInfo->ii_NumIndexKeyAttrs);
indoption = buildint2vector(coloptions, indexInfo->ii_NumIndexAttrs);
/*
@@ -605,6 +612,7 @@ UpdateIndexRelation(Oid indexoid,
values[Anum_pg_index_indexrelid - 1] = ObjectIdGetDatum(indexoid);
values[Anum_pg_index_indrelid - 1] = ObjectIdGetDatum(heapoid);
values[Anum_pg_index_indnatts - 1] = Int16GetDatum(indexInfo->ii_NumIndexAttrs);
+ values[Anum_pg_index_indnkeyatts - 1] = Int16GetDatum(indexInfo->ii_NumIndexKeyAttrs);
values[Anum_pg_index_indisunique - 1] = BoolGetDatum(indexInfo->ii_Unique);
values[Anum_pg_index_indisprimary - 1] = BoolGetDatum(primary);
values[Anum_pg_index_indisexclusion - 1] = BoolGetDatum(isexclusion);
@@ -1010,7 +1018,7 @@ index_create(Relation heapRelation,
}
/* Store dependency on operator classes */
- for (i = 0; i < indexInfo->ii_NumIndexAttrs; i++)
+ for (i = 0; i < indexInfo->ii_NumIndexKeyAttrs; i++)
{
referenced.classId = OperatorClassRelationId;
referenced.objectId = classObjectId[i];
@@ -1068,6 +1076,8 @@ index_create(Relation heapRelation,
else
Assert(indexRelation->rd_indexcxt != NULL);
+ indexRelation->rd_index->indnkeyatts = indexInfo->ii_NumIndexKeyAttrs;
+
/*
* If this is bootstrap (initdb) time, then we don't actually fill in the
* index yet. We'll be creating more indexes and classes later, so we
@@ -1188,6 +1198,7 @@ index_constraint_create(Relation heapRelation,
true,
RelationGetRelid(heapRelation),
indexInfo->ii_KeyAttrNumbers,
+ indexInfo->ii_NumIndexKeyAttrs,
indexInfo->ii_NumIndexAttrs,
InvalidOid, /* no domain */
indexRelationId, /* index OID */
@@ -1628,15 +1639,19 @@ BuildIndexInfo(Relation index)
IndexInfo *ii = makeNode(IndexInfo);
Form_pg_index indexStruct = index->rd_index;
int i;
- int numKeys;
+ int numAtts;
/* check the number of keys, and copy attr numbers into the IndexInfo */
- numKeys = indexStruct->indnatts;
- if (numKeys < 1 || numKeys > INDEX_MAX_KEYS)
+ numAtts = indexStruct->indnatts;
+ if (numAtts < 1 || numAtts > INDEX_MAX_KEYS)
elog(ERROR, "invalid indnatts %d for index %u",
- numKeys, RelationGetRelid(index));
- ii->ii_NumIndexAttrs = numKeys;
- for (i = 0; i < numKeys; i++)
+ numAtts, RelationGetRelid(index));
+ ii->ii_NumIndexAttrs = numAtts;
+ ii->ii_NumIndexKeyAttrs = indexStruct->indnkeyatts;
+ Assert(ii->ii_NumIndexKeyAttrs != 0);
+ Assert(ii->ii_NumIndexKeyAttrs <= ii->ii_NumIndexAttrs);
+
+ for (i = 0; i < numAtts; i++)
ii->ii_KeyAttrNumbers[i] = indexStruct->indkey.values[i];
/* fetch any expressions needed for expressional indexes */
@@ -1692,9 +1707,11 @@ BuildIndexInfo(Relation index)
void
BuildSpeculativeIndexInfo(Relation index, IndexInfo *ii)
{
- int ncols = index->rd_rel->relnatts;
+ int indnkeyatts;
int i;
+ indnkeyatts = IndexRelationGetNumberOfKeyAttributes(index);
+
/*
* fetch info for checking unique indexes
*/
@@ -1703,16 +1720,16 @@ BuildSpeculativeIndexInfo(Relation index, IndexInfo *ii)
if (index->rd_rel->relam != BTREE_AM_OID)
elog(ERROR, "unexpected non-btree speculative unique index");
- ii->ii_UniqueOps = (Oid *) palloc(sizeof(Oid) * ncols);
- ii->ii_UniqueProcs = (Oid *) palloc(sizeof(Oid) * ncols);
- ii->ii_UniqueStrats = (uint16 *) palloc(sizeof(uint16) * ncols);
+ ii->ii_UniqueOps = (Oid *) palloc(sizeof(Oid) * indnkeyatts);
+ ii->ii_UniqueProcs = (Oid *) palloc(sizeof(Oid) * indnkeyatts);
+ ii->ii_UniqueStrats = (uint16 *) palloc(sizeof(uint16) * indnkeyatts);
/*
* We have to look up the operator's strategy number. This provides a
* cross-check that the operator does match the index.
*/
/* We need the func OIDs and strategy numbers too */
- for (i = 0; i < ncols; i++)
+ for (i = 0; i < indnkeyatts; i++)
{
ii->ii_UniqueStrats[i] = BTEqualStrategyNumber;
ii->ii_UniqueOps[i] =
diff --git a/src/backend/catalog/indexing.c b/src/backend/catalog/indexing.c
index b9fe102..6096fa5 100644
--- a/src/backend/catalog/indexing.c
+++ b/src/backend/catalog/indexing.c
@@ -119,6 +119,7 @@ CatalogIndexInsert(CatalogIndexState indstate, HeapTuple heapTuple)
Assert(indexInfo->ii_Predicate == NIL);
Assert(indexInfo->ii_ExclusionOps == NULL);
Assert(relationDescs[i]->rd_index->indimmediate);
+ Assert(indexInfo->ii_NumIndexKeyAttrs != 0);
/*
* FormIndexDatum fills in its values and isnull parameters with the
diff --git a/src/backend/catalog/pg_constraint.c b/src/backend/catalog/pg_constraint.c
index 8fabe68..fc0872a 100644
--- a/src/backend/catalog/pg_constraint.c
+++ b/src/backend/catalog/pg_constraint.c
@@ -55,6 +55,7 @@ CreateConstraintEntry(const char *constraintName,
Oid relId,
const int16 *constraintKey,
int constraintNKeys,
+ int constraintNTotalKeys,
Oid domainId,
Oid indexRelId,
Oid foreignRelId,
@@ -81,6 +82,7 @@ CreateConstraintEntry(const char *constraintName,
bool nulls[Natts_pg_constraint];
Datum values[Natts_pg_constraint];
ArrayType *conkeyArray;
+ ArrayType *conincludingArray;
ArrayType *confkeyArray;
ArrayType *conpfeqopArray;
ArrayType *conppeqopArray;
@@ -111,6 +113,21 @@ CreateConstraintEntry(const char *constraintName,
else
conkeyArray = NULL;
+ if (constraintNTotalKeys > constraintNKeys)
+ {
+ Datum *conincluding;
+ int j = 0;
+ int constraintNIncludedKeys = constraintNTotalKeys - constraintNKeys;
+
+ conincluding = (Datum *) palloc(constraintNIncludedKeys* sizeof(Datum));
+ for (i = constraintNKeys; i < constraintNTotalKeys; i++)
+ conincluding[j++] = Int16GetDatum(constraintKey[i]);
+ conincludingArray = construct_array(conincluding, constraintNIncludedKeys,
+ INT2OID, 2, true, 's');
+ }
+ else
+ conincludingArray = NULL;
+
if (foreignNKeys > 0)
{
Datum *fkdatums;
@@ -183,6 +200,11 @@ CreateConstraintEntry(const char *constraintName,
else
nulls[Anum_pg_constraint_conkey - 1] = true;
+ if (conincludingArray)
+ values[Anum_pg_constraint_conincluding - 1] = PointerGetDatum(conincludingArray);
+ else
+ nulls[Anum_pg_constraint_conincluding - 1] = true;
+
if (confkeyArray)
values[Anum_pg_constraint_confkey - 1] = PointerGetDatum(confkeyArray);
else
@@ -247,9 +269,9 @@ CreateConstraintEntry(const char *constraintName,
relobject.classId = RelationRelationId;
relobject.objectId = relId;
- if (constraintNKeys > 0)
+ if (constraintNTotalKeys > 0)
{
- for (i = 0; i < constraintNKeys; i++)
+ for (i = 0; i < constraintNTotalKeys; i++)
{
relobject.objectSubId = constraintKey[i];
diff --git a/src/backend/catalog/toasting.c b/src/backend/catalog/toasting.c
index 564e10e..03f52a1 100644
--- a/src/backend/catalog/toasting.c
+++ b/src/backend/catalog/toasting.c
@@ -302,6 +302,7 @@ create_toast_table(Relation rel, Oid toastOid, Oid toastIndexOid,
indexInfo = makeNode(IndexInfo);
indexInfo->ii_NumIndexAttrs = 2;
+ indexInfo->ii_NumIndexKeyAttrs = 2;
indexInfo->ii_KeyAttrNumbers[0] = 1;
indexInfo->ii_KeyAttrNumbers[1] = 2;
indexInfo->ii_Expressions = NIL;
diff --git a/src/backend/commands/indexcmds.c b/src/backend/commands/indexcmds.c
index 85817c6..b74d6d6 100644
--- a/src/backend/commands/indexcmds.c
+++ b/src/backend/commands/indexcmds.c
@@ -215,7 +215,7 @@ CheckIndexCompatible(Oid oldId,
}
/* Any change in operator class or collation breaks compatibility. */
- old_natts = indexForm->indnatts;
+ old_natts = indexForm->indnkeyatts;
Assert(old_natts == numberOfAttributes);
d = SysCacheGetAttr(INDEXRELID, tuple, Anum_pg_index_indcollation, &isnull);
@@ -329,6 +329,7 @@ DefineIndex(Oid relationId,
int16 *coloptions;
IndexInfo *indexInfo;
int numberOfAttributes;
+ int numberOfKeyAttributes;
TransactionId limitXmin;
VirtualTransactionId *old_snapshots;
ObjectAddress address;
@@ -339,14 +340,27 @@ DefineIndex(Oid relationId,
Snapshot snapshot;
int i;
+ if(list_intersection(stmt->indexParams, stmt->indexIncludingParams) != NIL)
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
+ errmsg("included columns must not intersect with key columns")));
+ /*
+ * count key attributes in index
+ */
+ numberOfKeyAttributes = list_length(stmt->indexParams);
+
/*
- * count attributes in index
+ * We append any INCLUDING columns onto the indexParams list so that
+ * we have one list with all columns. Later we can determine which of these
+ * are key columns, and which are just part of the INCLUDING list by check
+ * the list position. A list item in a position less than
+ * ii_NumIndexKeyAttrs is part of the key columns, and anything equal to
+ * and over is part of the INCLUDING columns.
*/
+ stmt->indexParams = list_concat(stmt->indexParams,
+ stmt->indexIncludingParams);
numberOfAttributes = list_length(stmt->indexParams);
- if (numberOfAttributes <= 0)
- ereport(ERROR,
- (errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
- errmsg("must specify at least one column")));
+
if (numberOfAttributes > INDEX_MAX_KEYS)
ereport(ERROR,
(errcode(ERRCODE_TOO_MANY_COLUMNS),
@@ -509,6 +523,11 @@ DefineIndex(Oid relationId,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("access method \"%s\" does not support unique indexes",
accessMethodName)));
+ if (list_length(stmt->indexIncludingParams) > 0 && !amRoutine->amcaninclude)
+ ereport(ERROR,
+ (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+ errmsg("access method \"%s\" does not support included columns",
+ accessMethodName)));
if (numberOfAttributes > 1 && !amRoutine->amcanmulticol)
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
@@ -546,6 +565,7 @@ DefineIndex(Oid relationId,
*/
indexInfo = makeNode(IndexInfo);
indexInfo->ii_NumIndexAttrs = numberOfAttributes;
+ indexInfo->ii_NumIndexKeyAttrs = numberOfKeyAttributes;
indexInfo->ii_Expressions = NIL; /* for now */
indexInfo->ii_ExpressionsState = NIL;
indexInfo->ii_Predicate = make_ands_implicit((Expr *) stmt->whereClause);
@@ -561,7 +581,7 @@ DefineIndex(Oid relationId,
typeObjectId = (Oid *) palloc(numberOfAttributes * sizeof(Oid));
collationObjectId = (Oid *) palloc(numberOfAttributes * sizeof(Oid));
- classObjectId = (Oid *) palloc(numberOfAttributes * sizeof(Oid));
+ classObjectId = (Oid *) palloc(numberOfKeyAttributes * sizeof(Oid));
coloptions = (int16 *) palloc(numberOfAttributes * sizeof(int16));
ComputeIndexAttrs(indexInfo,
typeObjectId, collationObjectId, classObjectId,
@@ -1003,16 +1023,15 @@ ComputeIndexAttrs(IndexInfo *indexInfo,
ListCell *nextExclOp;
ListCell *lc;
int attn;
+ int nkeycols = indexInfo->ii_NumIndexKeyAttrs;
/* Allocate space for exclusion operator info, if needed */
if (exclusionOpNames)
{
- int ncols = list_length(attList);
-
- Assert(list_length(exclusionOpNames) == ncols);
- indexInfo->ii_ExclusionOps = (Oid *) palloc(sizeof(Oid) * ncols);
- indexInfo->ii_ExclusionProcs = (Oid *) palloc(sizeof(Oid) * ncols);
- indexInfo->ii_ExclusionStrats = (uint16 *) palloc(sizeof(uint16) * ncols);
+ Assert(list_length(exclusionOpNames) == nkeycols);
+ indexInfo->ii_ExclusionOps = (Oid *) palloc(sizeof(Oid) * nkeycols);
+ indexInfo->ii_ExclusionProcs = (Oid *) palloc(sizeof(Oid) * nkeycols);
+ indexInfo->ii_ExclusionStrats = (uint16 *) palloc(sizeof(uint16) * nkeycols);
nextExclOp = list_head(exclusionOpNames);
}
else
@@ -1065,6 +1084,11 @@ ComputeIndexAttrs(IndexInfo *indexInfo,
Node *expr = attribute->expr;
Assert(expr != NULL);
+
+ if (attn >= nkeycols)
+ ereport(ERROR,
+ (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+ errmsg("expressions are not supported in included columns")));
atttype = exprType(expr);
attcollation = exprCollation(expr);
@@ -1143,6 +1167,16 @@ ComputeIndexAttrs(IndexInfo *indexInfo,
collationOidP[attn] = attcollation;
/*
+ * Skip opclass and ordering options for included columns.
+ */
+ if (attn >= nkeycols)
+ {
+ colOptionP[attn] = 0;
+ attn++;
+ continue;
+ }
+
+ /*
* Identify the opclass to use.
*/
classOidP[attn] = GetIndexOpClass(attribute->opclass,
diff --git a/src/backend/commands/matview.c b/src/backend/commands/matview.c
index 6cddcbd..75809ac 100644
--- a/src/backend/commands/matview.c
+++ b/src/backend/commands/matview.c
@@ -613,7 +613,7 @@ refresh_by_match_merge(Oid matviewOid, Oid tempOid, Oid relowner,
RelationGetRelationName(tempRel));
diffname = make_temptable_name_n(tempname, 2);
- relnatts = matviewRel->rd_rel->relnatts;
+ relnatts = RelationGetNumberOfAttributes(matviewRel);
usedForQual = (bool *) palloc0(sizeof(bool) * relnatts);
/* Open SPI context. */
@@ -699,11 +699,11 @@ refresh_by_match_merge(Oid matviewOid, Oid tempOid, Oid relowner,
RelationGetIndexExpressions(indexRel) == NIL &&
RelationGetIndexPredicate(indexRel) == NIL)
{
- int numatts = indexStruct->indnatts;
+ int indnkeyatts = indexStruct->indnkeyatts;
int i;
/* Add quals for all columns from this index. */
- for (i = 0; i < numatts; i++)
+ for (i = 0; i < indnkeyatts; i++)
{
int attnum = indexStruct->indkey.values[i];
Oid type;
diff --git a/src/backend/commands/tablecmds.c b/src/backend/commands/tablecmds.c
index 86e9814..14447b5 100644
--- a/src/backend/commands/tablecmds.c
+++ b/src/backend/commands/tablecmds.c
@@ -5234,7 +5234,7 @@ ATExecDropNotNull(Relation rel, const char *colName, LOCKMODE lockmode)
* Loop over each attribute in the primary key and see if it
* matches the to-be-altered attribute
*/
- for (i = 0; i < indexStruct->indnatts; i++)
+ for (i = 0; i < indexStruct->indnkeyatts; i++)
{
if (indexStruct->indkey.values[i] == attnum)
ereport(ERROR,
@@ -6576,6 +6576,7 @@ ATAddForeignKeyConstraint(AlteredTableInfo *tab, Relation rel,
RelationGetRelid(rel),
fkattnum,
numfks,
+ numfks,
InvalidOid, /* not a domain
* constraint */
indexOid,
@@ -7083,7 +7084,7 @@ transformFkeyGetPrimaryKey(Relation pkrel, Oid *indexOid,
* assume a primary key cannot have expressional elements)
*/
*attnamelist = NIL;
- for (i = 0; i < indexStruct->indnatts; i++)
+ for (i = 0; i < indexStruct->indnkeyatts; i++)
{
int pkattno = indexStruct->indkey.values[i];
@@ -7161,7 +7162,7 @@ transformFkeyCheckAttrs(Relation pkrel,
* partial index; forget it if there are any expressions, too. Invalid
* indexes are out as well.
*/
- if (indexStruct->indnatts == numattrs &&
+ if (indexStruct->indnkeyatts == numattrs &&
indexStruct->indisunique &&
IndexIsValid(indexStruct) &&
heap_attisnull(indexTuple, Anum_pg_index_indpred) &&
@@ -11045,7 +11046,7 @@ ATExecReplicaIdentity(Relation rel, ReplicaIdentityStmt *stmt, LOCKMODE lockmode
RelationGetRelationName(indexRel))));
/* Check index for nullable columns. */
- for (key = 0; key < indexRel->rd_index->indnatts; key++)
+ for (key = 0; key < IndexRelationGetNumberOfKeyAttributes(indexRel); key++)
{
int16 attno = indexRel->rd_index->indkey.values[key];
Form_pg_attribute attr;
diff --git a/src/backend/commands/trigger.c b/src/backend/commands/trigger.c
index 9de22a1..e7a2aea 100644
--- a/src/backend/commands/trigger.c
+++ b/src/backend/commands/trigger.c
@@ -479,6 +479,7 @@ CreateTrigger(CreateTrigStmt *stmt, const char *queryString,
RelationGetRelid(rel),
NULL, /* no conkey */
0,
+ 0,
InvalidOid, /* no domain */
InvalidOid, /* no index */
InvalidOid, /* no foreign key */
diff --git a/src/backend/commands/typecmds.c b/src/backend/commands/typecmds.c
index 6cc7106..c1b7b41 100644
--- a/src/backend/commands/typecmds.c
+++ b/src/backend/commands/typecmds.c
@@ -3061,6 +3061,7 @@ domainAddConstraint(Oid domainOid, Oid domainNamespace, Oid baseTypeOid,
InvalidOid, /* not a relation constraint */
NULL,
0,
+ 0,
domainOid, /* domain constraint */
InvalidOid, /* no associated index */
InvalidOid, /* Foreign key fields */
diff --git a/src/backend/executor/execIndexing.c b/src/backend/executor/execIndexing.c
index 0e2d834..9f11672 100644
--- a/src/backend/executor/execIndexing.c
+++ b/src/backend/executor/execIndexing.c
@@ -651,7 +651,7 @@ check_exclusion_or_unique_constraint(Relation heap, Relation index,
Oid *constr_procs;
uint16 *constr_strats;
Oid *index_collations = index->rd_indcollation;
- int index_natts = index->rd_index->indnatts;
+ int indnkeyatts = IndexRelationGetNumberOfKeyAttributes(index);
IndexScanDesc index_scan;
HeapTuple tup;
ScanKeyData scankeys[INDEX_MAX_KEYS];
@@ -678,7 +678,7 @@ check_exclusion_or_unique_constraint(Relation heap, Relation index,
* If any of the input values are NULL, the constraint check is assumed to
* pass (i.e., we assume the operators are strict).
*/
- for (i = 0; i < index_natts; i++)
+ for (i = 0; i < indnkeyatts; i++)
{
if (isnull[i])
return true;
@@ -690,7 +690,7 @@ check_exclusion_or_unique_constraint(Relation heap, Relation index,
*/
InitDirtySnapshot(DirtySnapshot);
- for (i = 0; i < index_natts; i++)
+ for (i = 0; i < indnkeyatts; i++)
{
ScanKeyEntryInitialize(&scankeys[i],
0,
@@ -722,8 +722,8 @@ check_exclusion_or_unique_constraint(Relation heap, Relation index,
retry:
conflict = false;
found_self = false;
- index_scan = index_beginscan(heap, index, &DirtySnapshot, index_natts, 0);
- index_rescan(index_scan, scankeys, index_natts, NULL, 0);
+ index_scan = index_beginscan(heap, index, &DirtySnapshot, indnkeyatts, 0);
+ index_rescan(index_scan, scankeys, indnkeyatts, NULL, 0);
while ((tup = index_getnext(index_scan,
ForwardScanDirection)) != NULL)
@@ -884,10 +884,10 @@ index_recheck_constraint(Relation index, Oid *constr_procs,
Datum *existing_values, bool *existing_isnull,
Datum *new_values)
{
- int index_natts = index->rd_index->indnatts;
+ int indnkeyatts = IndexRelationGetNumberOfKeyAttributes(index);
int i;
- for (i = 0; i < index_natts; i++)
+ for (i = 0; i < indnkeyatts; i++)
{
/* Assume the exclusion operators are strict */
if (existing_isnull[i])
diff --git a/src/backend/executor/nodeIndexscan.c b/src/backend/executor/nodeIndexscan.c
index 3143bd9..1f1f83c 100644
--- a/src/backend/executor/nodeIndexscan.c
+++ b/src/backend/executor/nodeIndexscan.c
@@ -1155,7 +1155,9 @@ ExecIndexBuildScanKeys(PlanState *planstate, Relation index,
Expr *leftop; /* expr on lhs of operator */
Expr *rightop; /* expr on rhs ... */
AttrNumber varattno; /* att number used in scan */
+ int indnkeyatts;
+ indnkeyatts = IndexRelationGetNumberOfKeyAttributes(index);
if (IsA(clause, OpExpr))
{
/* indexkey op const or indexkey op expression */
@@ -1180,7 +1182,7 @@ ExecIndexBuildScanKeys(PlanState *planstate, Relation index,
elog(ERROR, "indexqual doesn't have key on left side");
varattno = ((Var *) leftop)->varattno;
- if (varattno < 1 || varattno > index->rd_index->indnatts)
+ if (varattno < 1 || varattno > indnkeyatts)
elog(ERROR, "bogus index qualification");
/*
@@ -1303,7 +1305,7 @@ ExecIndexBuildScanKeys(PlanState *planstate, Relation index,
opnos_cell = lnext(opnos_cell);
if (index->rd_rel->relam != BTREE_AM_OID ||
- varattno < 1 || varattno > index->rd_index->indnatts)
+ varattno < 1 || varattno > indnkeyatts)
elog(ERROR, "bogus RowCompare index qualification");
opfamily = index->rd_opfamily[varattno - 1];
@@ -1424,7 +1426,7 @@ ExecIndexBuildScanKeys(PlanState *planstate, Relation index,
elog(ERROR, "indexqual doesn't have key on left side");
varattno = ((Var *) leftop)->varattno;
- if (varattno < 1 || varattno > index->rd_index->indnatts)
+ if (varattno < 1 || varattno > indnkeyatts)
elog(ERROR, "bogus index qualification");
/*
diff --git a/src/backend/nodes/copyfuncs.c b/src/backend/nodes/copyfuncs.c
index be2207e..bb8678d 100644
--- a/src/backend/nodes/copyfuncs.c
+++ b/src/backend/nodes/copyfuncs.c
@@ -2647,6 +2647,7 @@ _copyConstraint(const Constraint *from)
COPY_NODE_FIELD(raw_expr);
COPY_STRING_FIELD(cooked_expr);
COPY_NODE_FIELD(keys);
+ COPY_NODE_FIELD(including);
COPY_NODE_FIELD(exclusions);
COPY_NODE_FIELD(options);
COPY_STRING_FIELD(indexname);
@@ -3137,6 +3138,7 @@ _copyIndexStmt(const IndexStmt *from)
COPY_STRING_FIELD(accessMethod);
COPY_STRING_FIELD(tableSpace);
COPY_NODE_FIELD(indexParams);
+ COPY_NODE_FIELD(indexIncludingParams);
COPY_NODE_FIELD(options);
COPY_NODE_FIELD(whereClause);
COPY_NODE_FIELD(excludeOpNames);
diff --git a/src/backend/nodes/equalfuncs.c b/src/backend/nodes/equalfuncs.c
index c4ec407..ebd62bb 100644
--- a/src/backend/nodes/equalfuncs.c
+++ b/src/backend/nodes/equalfuncs.c
@@ -1263,6 +1263,7 @@ _equalIndexStmt(const IndexStmt *a, const IndexStmt *b)
COMPARE_STRING_FIELD(accessMethod);
COMPARE_STRING_FIELD(tableSpace);
COMPARE_NODE_FIELD(indexParams);
+ COMPARE_NODE_FIELD(indexIncludingParams);
COMPARE_NODE_FIELD(options);
COMPARE_NODE_FIELD(whereClause);
COMPARE_NODE_FIELD(excludeOpNames);
@@ -2397,6 +2398,7 @@ _equalConstraint(const Constraint *a, const Constraint *b)
COMPARE_NODE_FIELD(raw_expr);
COMPARE_STRING_FIELD(cooked_expr);
COMPARE_NODE_FIELD(keys);
+ COMPARE_NODE_FIELD(including);
COMPARE_NODE_FIELD(exclusions);
COMPARE_NODE_FIELD(options);
COMPARE_STRING_FIELD(indexname);
diff --git a/src/backend/nodes/outfuncs.c b/src/backend/nodes/outfuncs.c
index 90fecb1..c3bff94 100644
--- a/src/backend/nodes/outfuncs.c
+++ b/src/backend/nodes/outfuncs.c
@@ -2456,6 +2456,7 @@ _outIndexStmt(StringInfo str, const IndexStmt *node)
WRITE_STRING_FIELD(accessMethod);
WRITE_STRING_FIELD(tableSpace);
WRITE_NODE_FIELD(indexParams);
+ WRITE_NODE_FIELD(indexIncludingParams);
WRITE_NODE_FIELD(options);
WRITE_NODE_FIELD(whereClause);
WRITE_NODE_FIELD(excludeOpNames);
@@ -3197,6 +3198,7 @@ _outConstraint(StringInfo str, const Constraint *node)
case CONSTR_PRIMARY:
appendStringInfoString(str, "PRIMARY_KEY");
WRITE_NODE_FIELD(keys);
+ WRITE_NODE_FIELD(including);
WRITE_NODE_FIELD(options);
WRITE_STRING_FIELD(indexname);
WRITE_STRING_FIELD(indexspace);
@@ -3206,6 +3208,7 @@ _outConstraint(StringInfo str, const Constraint *node)
case CONSTR_UNIQUE:
appendStringInfoString(str, "UNIQUE");
WRITE_NODE_FIELD(keys);
+ WRITE_NODE_FIELD(including);
WRITE_NODE_FIELD(options);
WRITE_STRING_FIELD(indexname);
WRITE_STRING_FIELD(indexspace);
diff --git a/src/backend/optimizer/path/indxpath.c b/src/backend/optimizer/path/indxpath.c
index 2952bfb..69dda34 100644
--- a/src/backend/optimizer/path/indxpath.c
+++ b/src/backend/optimizer/path/indxpath.c
@@ -2143,7 +2143,7 @@ match_clause_to_index(IndexOptInfo *index,
{
int indexcol;
- for (indexcol = 0; indexcol < index->ncolumns; indexcol++)
+ for (indexcol = 0; indexcol < index->nkeycolumns; indexcol++)
{
if (match_clause_to_indexcol(index,
indexcol,
diff --git a/src/backend/optimizer/path/pathkeys.c b/src/backend/optimizer/path/pathkeys.c
index 4436ac1..440024a 100644
--- a/src/backend/optimizer/path/pathkeys.c
+++ b/src/backend/optimizer/path/pathkeys.c
@@ -450,6 +450,13 @@ build_index_pathkeys(PlannerInfo *root,
bool nulls_first;
PathKey *cpathkey;
+ /*
+ * INCLUDING columns are stored in index unordered,
+ * so they don't support ordered index scan.
+ */
+ if(i >= index->nkeycolumns)
+ break;
+
/* We assume we don't need to make a copy of the tlist item */
indexkey = indextle->expr;
diff --git a/src/backend/optimizer/util/plancat.c b/src/backend/optimizer/util/plancat.c
index 5d18206..f1731eb 100644
--- a/src/backend/optimizer/util/plancat.c
+++ b/src/backend/optimizer/util/plancat.c
@@ -173,7 +173,7 @@ get_relation_info(PlannerInfo *root, Oid relationObjectId, bool inhparent,
Form_pg_index index;
IndexAmRoutine *amroutine;
IndexOptInfo *info;
- int ncolumns;
+ int ncolumns, nkeycolumns;
int i;
/*
@@ -216,19 +216,25 @@ get_relation_info(PlannerInfo *root, Oid relationObjectId, bool inhparent,
RelationGetForm(indexRelation)->reltablespace;
info->rel = rel;
info->ncolumns = ncolumns = index->indnatts;
+ info->nkeycolumns = nkeycolumns = index->indnkeyatts;
+
info->indexkeys = (int *) palloc(sizeof(int) * ncolumns);
info->indexcollations = (Oid *) palloc(sizeof(Oid) * ncolumns);
- info->opfamily = (Oid *) palloc(sizeof(Oid) * ncolumns);
- info->opcintype = (Oid *) palloc(sizeof(Oid) * ncolumns);
+ info->opfamily = (Oid *) palloc(sizeof(Oid) * nkeycolumns);
+ info->opcintype = (Oid *) palloc(sizeof(Oid) * nkeycolumns);
info->canreturn = (bool *) palloc(sizeof(bool) * ncolumns);
for (i = 0; i < ncolumns; i++)
{
info->indexkeys[i] = index->indkey.values[i];
info->indexcollations[i] = indexRelation->rd_indcollation[i];
+ info->canreturn[i] = index_can_return(indexRelation, i + 1);
+ }
+
+ for (i = 0; i < nkeycolumns; i++)
+ {
info->opfamily[i] = indexRelation->rd_opfamily[i];
info->opcintype[i] = indexRelation->rd_opcintype[i];
- info->canreturn[i] = index_can_return(indexRelation, i + 1);
}
info->relam = indexRelation->rd_rel->relam;
@@ -256,10 +262,10 @@ get_relation_info(PlannerInfo *root, Oid relationObjectId, bool inhparent,
Assert(amroutine->amcanorder);
info->sortopfamily = info->opfamily;
- info->reverse_sort = (bool *) palloc(sizeof(bool) * ncolumns);
- info->nulls_first = (bool *) palloc(sizeof(bool) * ncolumns);
+ info->reverse_sort = (bool *) palloc(sizeof(bool) * nkeycolumns);
+ info->nulls_first = (bool *) palloc(sizeof(bool) * nkeycolumns);
- for (i = 0; i < ncolumns; i++)
+ for (i = 0; i < nkeycolumns; i++)
{
int16 opt = indexRelation->rd_indoption[i];
@@ -283,11 +289,11 @@ get_relation_info(PlannerInfo *root, Oid relationObjectId, bool inhparent,
* of current or foreseeable amcanorder index types, it's not
* worth expending more effort on now.
*/
- info->sortopfamily = (Oid *) palloc(sizeof(Oid) * ncolumns);
- info->reverse_sort = (bool *) palloc(sizeof(bool) * ncolumns);
- info->nulls_first = (bool *) palloc(sizeof(bool) * ncolumns);
+ info->sortopfamily = (Oid *) palloc(sizeof(Oid) * nkeycolumns);
+ info->reverse_sort = (bool *) palloc(sizeof(bool) * nkeycolumns);
+ info->nulls_first = (bool *) palloc(sizeof(bool) * nkeycolumns);
- for (i = 0; i < ncolumns; i++)
+ for (i = 0; i < nkeycolumns; i++)
{
int16 opt = indexRelation->rd_indoption[i];
Oid ltopr;
@@ -687,7 +693,7 @@ infer_arbiter_indexes(PlannerInfo *root)
/* Build BMS representation of plain (non expression) index attrs */
indexedAttrs = NULL;
- for (natt = 0; natt < idxForm->indnatts; natt++)
+ for (natt = 0; natt < idxForm->indnkeyatts; natt++)
{
int attno = idxRel->rd_index->indkey.values[natt];
@@ -1629,7 +1635,7 @@ has_unique_index(RelOptInfo *rel, AttrNumber attno)
* just the specified attr is unique.
*/
if (index->unique &&
- index->ncolumns == 1 &&
+ index->nkeycolumns == 1 &&
index->indexkeys[0] == attno &&
(index->indpred == NIL || index->predOK))
return true;
diff --git a/src/backend/parser/analyze.c b/src/backend/parser/analyze.c
index eac86cc..e979ec0 100644
--- a/src/backend/parser/analyze.c
+++ b/src/backend/parser/analyze.c
@@ -991,7 +991,7 @@ transformOnConflictClause(ParseState *pstate,
* relation.
*/
Assert(pstate->p_next_resno == 1);
- for (attno = 0; attno < targetrel->rd_rel->relnatts; attno++)
+ for (attno = 0; attno < RelationGetNumberOfAttributes(targetrel); attno++)
{
Form_pg_attribute attr = targetrel->rd_att->attrs[attno];
char *name;
@@ -2193,8 +2193,8 @@ transformUpdateTargetList(ParseState *pstate, List *origTlist)
EXPR_KIND_UPDATE_SOURCE);
/* Prepare to assign non-conflicting resnos to resjunk attributes */
- if (pstate->p_next_resno <= pstate->p_target_relation->rd_rel->relnatts)
- pstate->p_next_resno = pstate->p_target_relation->rd_rel->relnatts + 1;
+ if (pstate->p_next_resno <= RelationGetNumberOfAttributes(pstate->p_target_relation))
+ pstate->p_next_resno = RelationGetNumberOfAttributes(pstate->p_target_relation) + 1;
/* Prepare non-junk columns for assignment to target table */
target_rte = pstate->p_target_rangetblentry;
diff --git a/src/backend/parser/gram.y b/src/backend/parser/gram.y
index b69a77a..97f1973 100644
--- a/src/backend/parser/gram.y
+++ b/src/backend/parser/gram.y
@@ -358,6 +358,7 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query);
oper_argtypes RuleActionList RuleActionMulti
opt_column_list columnList opt_name_list
sort_clause opt_sort_clause sortby_list index_params
+ optincluding opt_including index_including_params
name_list role_list from_clause from_list opt_array_bounds
qualified_name_list any_name any_name_list type_name_list
any_operator expr_list attrs
@@ -374,6 +375,7 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query);
create_generic_options alter_generic_options
relation_expr_list dostmt_opt_list
transform_element_list transform_type_list
+ optcincluding opt_c_including
%type <list> group_by_list
%type <node> group_by_item empty_grouping_set rollup_clause cube_clause
@@ -3217,17 +3219,18 @@ ConstraintElem:
n->initially_valid = !n->skip_validation;
$$ = (Node *)n;
}
- | UNIQUE '(' columnList ')' opt_definition OptConsTableSpace
+ | UNIQUE '(' columnList ')' opt_c_including opt_definition OptConsTableSpace
ConstraintAttributeSpec
{
Constraint *n = makeNode(Constraint);
n->contype = CONSTR_UNIQUE;
n->location = @1;
n->keys = $3;
- n->options = $5;
+ n->including = $5;
+ n->options = $6;
n->indexname = NULL;
- n->indexspace = $6;
- processCASbits($7, @7, "UNIQUE",
+ n->indexspace = $7;
+ processCASbits($8, @8, "UNIQUE",
&n->deferrable, &n->initdeferred, NULL,
NULL, yyscanner);
$$ = (Node *)n;
@@ -3238,6 +3241,7 @@ ConstraintElem:
n->contype = CONSTR_UNIQUE;
n->location = @1;
n->keys = NIL;
+ n->including = NIL;
n->options = NIL;
n->indexname = $2;
n->indexspace = NULL;
@@ -3246,17 +3250,18 @@ ConstraintElem:
NULL, yyscanner);
$$ = (Node *)n;
}
- | PRIMARY KEY '(' columnList ')' opt_definition OptConsTableSpace
+ | PRIMARY KEY '(' columnList ')' opt_c_including opt_definition OptConsTableSpace
ConstraintAttributeSpec
{
Constraint *n = makeNode(Constraint);
n->contype = CONSTR_PRIMARY;
n->location = @1;
n->keys = $4;
- n->options = $6;
+ n->including = $6;
+ n->options = $7;
n->indexname = NULL;
- n->indexspace = $7;
- processCASbits($8, @8, "PRIMARY KEY",
+ n->indexspace = $8;
+ processCASbits($9, @9, "PRIMARY KEY",
&n->deferrable, &n->initdeferred, NULL,
NULL, yyscanner);
$$ = (Node *)n;
@@ -3267,6 +3272,7 @@ ConstraintElem:
n->contype = CONSTR_PRIMARY;
n->location = @1;
n->keys = NIL;
+ n->including = NIL;
n->options = NIL;
n->indexname = $3;
n->indexspace = NULL;
@@ -3334,6 +3340,13 @@ columnElem: ColId
}
;
+opt_c_including: INCLUDING optcincluding { $$ = $2; }
+ | /* EMPTY */ { $$ = NIL; }
+ ;
+
+optcincluding : '(' columnList ')' { $$ = $2; }
+ ;
+
key_match: MATCH FULL
{
$$ = FKCONSTR_MATCH_FULL;
@@ -6630,7 +6643,7 @@ defacl_privilege_target:
IndexStmt: CREATE opt_unique INDEX opt_concurrently opt_index_name
ON qualified_name access_method_clause '(' index_params ')'
- opt_reloptions OptTableSpace where_clause
+ opt_including opt_reloptions OptTableSpace where_clause
{
IndexStmt *n = makeNode(IndexStmt);
n->unique = $2;
@@ -6639,9 +6652,10 @@ IndexStmt: CREATE opt_unique INDEX opt_concurrently opt_index_name
n->relation = $7;
n->accessMethod = $8;
n->indexParams = $10;
- n->options = $12;
- n->tableSpace = $13;
- n->whereClause = $14;
+ n->indexIncludingParams = $12;
+ n->options = $13;
+ n->tableSpace = $14;
+ n->whereClause = $15;
n->excludeOpNames = NIL;
n->idxcomment = NULL;
n->indexOid = InvalidOid;
@@ -6656,7 +6670,7 @@ IndexStmt: CREATE opt_unique INDEX opt_concurrently opt_index_name
}
| CREATE opt_unique INDEX opt_concurrently IF_P NOT EXISTS index_name
ON qualified_name access_method_clause '(' index_params ')'
- opt_reloptions OptTableSpace where_clause
+ opt_including opt_reloptions OptTableSpace where_clause
{
IndexStmt *n = makeNode(IndexStmt);
n->unique = $2;
@@ -6665,9 +6679,10 @@ IndexStmt: CREATE opt_unique INDEX opt_concurrently opt_index_name
n->relation = $10;
n->accessMethod = $11;
n->indexParams = $13;
- n->options = $15;
- n->tableSpace = $16;
- n->whereClause = $17;
+ n->indexIncludingParams = $15;
+ n->options = $16;
+ n->tableSpace = $17;
+ n->whereClause = $18;
n->excludeOpNames = NIL;
n->idxcomment = NULL;
n->indexOid = InvalidOid;
@@ -6746,6 +6761,16 @@ index_elem: ColId opt_collate opt_class opt_asc_desc opt_nulls_order
}
;
+optincluding : '(' index_including_params ')' { $$ = $2; }
+ ;
+opt_including: INCLUDING optincluding { $$ = $2; }
+ | /* EMPTY */ { $$ = NIL; }
+ ;
+
+index_including_params: index_elem { $$ = list_make1($1); }
+ | index_including_params ',' index_elem { $$ = lappend($1, $3); }
+ ;
+
opt_collate: COLLATE any_name { $$ = $2; }
| /*EMPTY*/ { $$ = NIL; }
;
diff --git a/src/backend/parser/parse_relation.c b/src/backend/parser/parse_relation.c
index 1e3ecbc..eff4f81 100644
--- a/src/backend/parser/parse_relation.c
+++ b/src/backend/parser/parse_relation.c
@@ -2875,7 +2875,7 @@ attnameAttNum(Relation rd, const char *attname, bool sysColOK)
{
int i;
- for (i = 0; i < rd->rd_rel->relnatts; i++)
+ for (i = 0; i < RelationGetNumberOfAttributes(rd); i++)
{
Form_pg_attribute att = rd->rd_att->attrs[i];
diff --git a/src/backend/parser/parse_target.c b/src/backend/parser/parse_target.c
index b7b82bf..cdfc262 100644
--- a/src/backend/parser/parse_target.c
+++ b/src/backend/parser/parse_target.c
@@ -898,7 +898,7 @@ checkInsertTargets(ParseState *pstate, List *cols, List **attrnos)
* Generate default column list for INSERT.
*/
Form_pg_attribute *attr = pstate->p_target_relation->rd_att->attrs;
- int numcol = pstate->p_target_relation->rd_rel->relnatts;
+ int numcol = RelationGetNumberOfAttributes(pstate->p_target_relation);
int i;
for (i = 0; i < numcol; i++)
diff --git a/src/backend/parser/parse_utilcmd.c b/src/backend/parser/parse_utilcmd.c
index 7a2950e..d8b52d8 100644
--- a/src/backend/parser/parse_utilcmd.c
+++ b/src/backend/parser/parse_utilcmd.c
@@ -1245,14 +1245,14 @@ generateClonedIndexStmt(CreateStmtContext *cxt, Relation source_idx,
/* Build the list of IndexElem */
index->indexParams = NIL;
+ index->indexIncludingParams = NIL;
indexpr_item = list_head(indexprs);
- for (keyno = 0; keyno < idxrec->indnatts; keyno++)
+ for (keyno = 0; keyno < idxrec->indnkeyatts; keyno++)
{
IndexElem *iparam;
AttrNumber attnum = idxrec->indkey.values[keyno];
int16 opt = source_idx->rd_indoption[keyno];
-
iparam = makeNode(IndexElem);
if (AttributeNumberIsValid(attnum))
@@ -1334,6 +1334,38 @@ generateClonedIndexStmt(CreateStmtContext *cxt, Relation source_idx,
index->indexParams = lappend(index->indexParams, iparam);
}
+ /* Handle included columns separately */
+ for (keyno = idxrec->indnkeyatts; keyno < idxrec->indnatts; keyno++)
+ {
+ IndexElem *iparam;
+ AttrNumber attnum = idxrec->indkey.values[keyno];
+
+ iparam = makeNode(IndexElem);
+
+ if (AttributeNumberIsValid(attnum))
+ {
+ /* Simple index column */
+ char *attname;
+
+ attname = get_relid_attribute_name(indrelid, attnum);
+ keycoltype = get_atttype(indrelid, attnum);
+
+ iparam->name = attname;
+ iparam->expr = NULL;
+ }
+ else
+ ereport(ERROR,
+ (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+ errmsg("expressions are not supported in included columns")));
+
+ /* Copy the original index column name */
+ iparam->indexcolname = pstrdup(NameStr(attrs[keyno]->attname));
+
+ /* Add the collation name, if non-default */
+ iparam->collation = get_collation(indcollation->values[keyno], keycoltype);
+
+ index->indexIncludingParams = lappend(index->indexIncludingParams, iparam);
+ }
/* Copy reloptions if any */
datum = SysCacheGetAttr(RELOID, ht_idxrel,
Anum_pg_class_reloptions, &isnull);
@@ -1526,6 +1558,7 @@ transformIndexConstraints(CreateStmtContext *cxt)
IndexStmt *priorindex = lfirst(k);
if (equal(index->indexParams, priorindex->indexParams) &&
+ equal(index->indexIncludingParams, priorindex->indexIncludingParams) &&
equal(index->whereClause, priorindex->whereClause) &&
equal(index->excludeOpNames, priorindex->excludeOpNames) &&
strcmp(index->accessMethod, priorindex->accessMethod) == 0 &&
@@ -1597,6 +1630,7 @@ transformIndexConstraint(Constraint *constraint, CreateStmtContext *cxt)
index->tableSpace = constraint->indexspace;
index->whereClause = constraint->where_clause;
index->indexParams = NIL;
+ index->indexIncludingParams = NIL;
index->excludeOpNames = NIL;
index->idxcomment = NULL;
index->indexOid = InvalidOid;
@@ -1746,24 +1780,30 @@ transformIndexConstraint(Constraint *constraint, CreateStmtContext *cxt)
heap_rel->rd_rel->relhasoids);
attname = pstrdup(NameStr(attform->attname));
- /*
- * Insist on default opclass and sort options. While the index
- * would still work as a constraint with non-default settings, it
- * might not provide exactly the same uniqueness semantics as
- * you'd get from a normally-created constraint; and there's also
- * the dump/reload problem mentioned above.
- */
- defopclass = GetDefaultOpClass(attform->atttypid,
- index_rel->rd_rel->relam);
- if (indclass->values[i] != defopclass ||
- index_rel->rd_indoption[i] != 0)
- ereport(ERROR,
- (errcode(ERRCODE_WRONG_OBJECT_TYPE),
- errmsg("index \"%s\" does not have default sorting behavior", index_name),
- errdetail("Cannot create a primary key or unique constraint using such an index."),
- parser_errposition(cxt->pstate, constraint->location)));
+ if (i < index_form->indnkeyatts)
+ {
+ /*
+ * Insist on default opclass and sort options. While the index
+ * would still work as a constraint with non-default settings, it
+ * might not provide exactly the same uniqueness semantics as
+ * you'd get from a normally-created constraint; and there's also
+ * the dump/reload problem mentioned above.
+ */
+ defopclass = GetDefaultOpClass(attform->atttypid,
+ index_rel->rd_rel->relam);
+ if (indclass->values[i] != defopclass ||
+ index_rel->rd_indoption[i] != 0)
+ ereport(ERROR,
+ (errcode(ERRCODE_WRONG_OBJECT_TYPE),
+ errmsg("index \"%s\" does not have default sorting behavior", index_name),
+ errdetail("Cannot create a primary key or unique constraint using such an index."),
+ parser_errposition(cxt->pstate, constraint->location)));
+
+ constraint->keys = lappend(constraint->keys, makeString(attname));
+ }
+ else
+ constraint->including = lappend(constraint->including, makeString(attname));
- constraint->keys = lappend(constraint->keys, makeString(attname));
}
/* Close the index relation but keep the lock */
@@ -1776,6 +1816,7 @@ transformIndexConstraint(Constraint *constraint, CreateStmtContext *cxt)
* If it's an EXCLUDE constraint, the grammar returns a list of pairs of
* IndexElems and operator names. We have to break that apart into
* separate lists.
+ * NOTE that exclusion constraints don't support included nonkey attributes
*/
if (constraint->contype == CONSTR_EXCLUSION)
{
@@ -1930,6 +1971,48 @@ transformIndexConstraint(Constraint *constraint, CreateStmtContext *cxt)
index->indexParams = lappend(index->indexParams, iparam);
}
+ /* Here is some ugly code duplication. But we do need it. */
+ foreach(lc, constraint->including)
+ {
+ char *key = strVal(lfirst(lc));
+ bool found = false;
+ ColumnDef *column = NULL;
+ ListCell *columns;
+ IndexElem *iparam;
+
+ foreach(columns, cxt->columns)
+ {
+ column = (ColumnDef *) lfirst(columns);
+ Assert(IsA(column, ColumnDef));
+ if (strcmp(column->colname, key) == 0)
+ {
+ found = true;
+ break;
+ }
+ }
+
+ /*
+ * In the ALTER TABLE case, don't complain about index keys not
+ * created in the command; they may well exist already. DefineIndex
+ * will complain about them if not, and will also take care of marking
+ * them NOT NULL.
+ */
+ if (!found && !cxt->isalter)
+ ereport(ERROR,
+ (errcode(ERRCODE_UNDEFINED_COLUMN),
+ errmsg("column \"%s\" named in key does not exist", key),
+ parser_errposition(cxt->pstate, constraint->location)));
+
+ /* OK, add it to the index definition */
+ iparam = makeNode(IndexElem);
+ iparam->name = pstrdup(key);
+ iparam->expr = NULL;
+ iparam->indexcolname = NULL;
+ iparam->collation = NIL;
+ iparam->opclass = NIL;
+ index->indexIncludingParams = lappend(index->indexIncludingParams, iparam);
+ }
+
return index;
}
diff --git a/src/backend/utils/adt/ruleutils.c b/src/backend/utils/adt/ruleutils.c
index 8a81d7a..b946c58 100644
--- a/src/backend/utils/adt/ruleutils.c
+++ b/src/backend/utils/adt/ruleutils.c
@@ -1240,6 +1240,21 @@ pg_get_indexdef_worker(Oid indexrelid, int colno,
Oid keycoltype;
Oid keycolcollation;
+ /*
+ * attrsOnly flag is used for building unique-constraint and
+ * exclusion-constraint error messages. Included attrs are
+ * meaningless there, so do not include them into the message.
+ */
+ if (attrsOnly && keyno >= idxrec->indnkeyatts)
+ break;
+
+ /* Report the INCLUDED attributes, if any. */
+ if ((!attrsOnly) && keyno == idxrec->indnkeyatts)
+ {
+ appendStringInfoString(&buf, ") INCLUDING (");
+ sep = "";
+ }
+
if (!colno)
appendStringInfoString(&buf, sep);
sep = ", ";
@@ -1292,6 +1307,9 @@ pg_get_indexdef_worker(Oid indexrelid, int colno,
appendStringInfo(&buf, " COLLATE %s",
generate_collation_name((indcoll)));
+ if(keyno >= idxrec->indnkeyatts)
+ continue;
+
/* Add the operator class name, if not default */
get_opclass_name(indclass->values[keyno], keycoltype, &buf);
@@ -1638,6 +1656,19 @@ pg_get_constraintdef_worker(Oid constraintId, bool fullCommand,
appendStringInfoChar(&buf, ')');
+ /* Fetch and build including column list */
+ isnull = true;
+ val = SysCacheGetAttr(CONSTROID, tup,
+ Anum_pg_constraint_conincluding, &isnull);
+ if (!isnull)
+ {
+ appendStringInfoString(&buf, " INCLUDING (");
+
+ decompile_column_index_array(val, conForm->conrelid, &buf);
+
+ appendStringInfoChar(&buf, ')');
+ }
+
indexId = get_constraint_index(constraintId);
/* XXX why do we only print these bits if fullCommand? */
diff --git a/src/backend/utils/adt/selfuncs.c b/src/backend/utils/adt/selfuncs.c
index 56943f2..90d3f55 100644
--- a/src/backend/utils/adt/selfuncs.c
+++ b/src/backend/utils/adt/selfuncs.c
@@ -4520,7 +4520,7 @@ examine_variable(PlannerInfo *root, Node *node, int varRelid,
* should match has_unique_index().
*/
if (index->unique &&
- index->ncolumns == 1 &&
+ index->nkeycolumns == 1 &&
(index->indpred == NIL || index->predOK))
vardata->isunique = true;
@@ -6567,7 +6567,7 @@ btcostestimate(PlannerInfo *root, IndexPath *path, double loop_count,
* NullTest invalidates that theory, even though it sets eqQualHere.
*/
if (index->unique &&
- indexcol == index->ncolumns - 1 &&
+ indexcol == index->nkeycolumns - 1 &&
eqQualHere &&
!found_saop &&
!found_is_null_op)
diff --git a/src/backend/utils/cache/relcache.c b/src/backend/utils/cache/relcache.c
index 79e0b1f..7c4ffbf 100644
--- a/src/backend/utils/cache/relcache.c
+++ b/src/backend/utils/cache/relcache.c
@@ -521,7 +521,7 @@ RelationBuildTupleDesc(Relation relation)
/*
* add attribute data to relation->rd_att
*/
- need = relation->rd_rel->relnatts;
+ need = RelationGetNumberOfAttributes(relation);
while (HeapTupleIsValid(pg_attribute_tuple = systable_getnext(pg_attribute_scan)))
{
@@ -530,7 +530,7 @@ RelationBuildTupleDesc(Relation relation)
attp = (Form_pg_attribute) GETSTRUCT(pg_attribute_tuple);
if (attp->attnum <= 0 ||
- attp->attnum > relation->rd_rel->relnatts)
+ attp->attnum > RelationGetNumberOfAttributes(relation))
elog(ERROR, "invalid attribute number %d for %s",
attp->attnum, RelationGetRelationName(relation));
@@ -547,7 +547,7 @@ RelationBuildTupleDesc(Relation relation)
if (attrdef == NULL)
attrdef = (AttrDefault *)
MemoryContextAllocZero(CacheMemoryContext,
- relation->rd_rel->relnatts *
+ RelationGetNumberOfAttributes(relation) *
sizeof(AttrDefault));
attrdef[ndef].adnum = attp->attnum;
attrdef[ndef].adbin = NULL;
@@ -577,7 +577,7 @@ RelationBuildTupleDesc(Relation relation)
{
int i;
- for (i = 0; i < relation->rd_rel->relnatts; i++)
+ for (i = 0; i < RelationGetNumberOfAttributes(relation); i++)
Assert(relation->rd_att->attrs[i]->attcacheoff == -1);
}
#endif
@@ -587,7 +587,7 @@ RelationBuildTupleDesc(Relation relation)
* attribute: it must be zero. This eliminates the need for special cases
* for attnum=1 that used to exist in fastgetattr() and index_getattr().
*/
- if (relation->rd_rel->relnatts > 0)
+ if (RelationGetNumberOfAttributes(relation) > 0)
relation->rd_att->attrs[0]->attcacheoff = 0;
/*
@@ -599,7 +599,7 @@ RelationBuildTupleDesc(Relation relation)
if (ndef > 0) /* DEFAULTs */
{
- if (ndef < relation->rd_rel->relnatts)
+ if (ndef < RelationGetNumberOfAttributes(relation))
constr->defval = (AttrDefault *)
repalloc(attrdef, ndef * sizeof(AttrDefault));
else
@@ -1206,7 +1206,8 @@ RelationInitIndexAccessInfo(Relation relation)
int2vector *indoption;
MemoryContext indexcxt;
MemoryContext oldcontext;
- int natts;
+ int indnatts;
+ int indnkeyatts;
uint16 amsupport;
/*
@@ -1236,10 +1237,11 @@ RelationInitIndexAccessInfo(Relation relation)
relation->rd_amhandler = aform->amhandler;
ReleaseSysCache(tuple);
- natts = relation->rd_rel->relnatts;
- if (natts != relation->rd_index->indnatts)
+ indnatts = RelationGetNumberOfAttributes(relation);
+ if (indnatts != IndexRelationGetNumberOfAttributes(relation))
elog(ERROR, "relnatts disagrees with indnatts for index %u",
RelationGetRelid(relation));
+ indnkeyatts = IndexRelationGetNumberOfKeyAttributes(relation);
/*
* Make the private context to hold index access info. The reason we need
@@ -1257,17 +1259,19 @@ RelationInitIndexAccessInfo(Relation relation)
InitIndexAmRoutine(relation);
/*
- * Allocate arrays to hold data
+ * Allocate arrays to hold data.
+ * Opclasses are not used for included columns, so
+ * allocate them for indnkeyatts only.
*/
relation->rd_opfamily = (Oid *)
- MemoryContextAllocZero(indexcxt, natts * sizeof(Oid));
+ MemoryContextAllocZero(indexcxt, indnkeyatts * sizeof(Oid));
relation->rd_opcintype = (Oid *)
- MemoryContextAllocZero(indexcxt, natts * sizeof(Oid));
+ MemoryContextAllocZero(indexcxt, indnkeyatts * sizeof(Oid));
amsupport = relation->rd_amroutine->amsupport;
if (amsupport > 0)
{
- int nsupport = natts * amsupport;
+ int nsupport = indnatts * amsupport;
relation->rd_support = (RegProcedure *)
MemoryContextAllocZero(indexcxt, nsupport * sizeof(RegProcedure));
@@ -1281,10 +1285,10 @@ RelationInitIndexAccessInfo(Relation relation)
}
relation->rd_indcollation = (Oid *)
- MemoryContextAllocZero(indexcxt, natts * sizeof(Oid));
+ MemoryContextAllocZero(indexcxt, indnatts * sizeof(Oid));
relation->rd_indoption = (int16 *)
- MemoryContextAllocZero(indexcxt, natts * sizeof(int16));
+ MemoryContextAllocZero(indexcxt, indnatts * sizeof(int16));
/*
* indcollation cannot be referenced directly through the C struct,
@@ -1297,7 +1301,7 @@ RelationInitIndexAccessInfo(Relation relation)
&isnull);
Assert(!isnull);
indcoll = (oidvector *) DatumGetPointer(indcollDatum);
- memcpy(relation->rd_indcollation, indcoll->values, natts * sizeof(Oid));
+ memcpy(relation->rd_indcollation, indcoll->values, indnatts * sizeof(Oid));
/*
* indclass cannot be referenced directly through the C struct, because it
@@ -1318,7 +1322,7 @@ RelationInitIndexAccessInfo(Relation relation)
*/
IndexSupportInitialize(indclass, relation->rd_support,
relation->rd_opfamily, relation->rd_opcintype,
- amsupport, natts);
+ amsupport, indnkeyatts);
/*
* Similarly extract indoption and copy it to the cache entry
@@ -1329,7 +1333,7 @@ RelationInitIndexAccessInfo(Relation relation)
&isnull);
Assert(!isnull);
indoption = (int2vector *) DatumGetPointer(indoptionDatum);
- memcpy(relation->rd_indoption, indoption->values, natts * sizeof(int16));
+ memcpy(relation->rd_indoption, indoption->values, indnatts * sizeof(int16));
/*
* expressions, predicate, exclusion caches will be filled later
@@ -4458,16 +4462,25 @@ RelationGetIndexAttrBitmap(Relation relation, IndexAttrBitmapKind attrKind)
{
int attrnum = indexInfo->ii_KeyAttrNumbers[i];
+ /*
+ * Since we have covering indexes with non-key columns,
+ * we must handle them accurately here. non-key columns
+ * must be added into indexattrs, since they are in index,
+ * and HOT-update shouldn't miss them.
+ * Obviously, non-key columns couldn't be referenced by
+ * foreign key or identity key. Hence we do not include
+ * them into uindexattrs and idindexattrs bitmaps.
+ */
if (attrnum != 0)
{
indexattrs = bms_add_member(indexattrs,
attrnum - FirstLowInvalidHeapAttributeNumber);
- if (isKey)
+ if (isKey && i < indexInfo->ii_NumIndexKeyAttrs)
uindexattrs = bms_add_member(uindexattrs,
attrnum - FirstLowInvalidHeapAttributeNumber);
- if (isIDKey)
+ if (isIDKey && i < indexInfo->ii_NumIndexKeyAttrs)
idindexattrs = bms_add_member(idindexattrs,
attrnum - FirstLowInvalidHeapAttributeNumber);
}
@@ -4535,7 +4548,7 @@ RelationGetExclusionInfo(Relation indexRelation,
Oid **procs,
uint16 **strategies)
{
- int ncols = indexRelation->rd_rel->relnatts;
+ int indnkeyatts;
Oid *ops;
Oid *funcs;
uint16 *strats;
@@ -4547,17 +4560,19 @@ RelationGetExclusionInfo(Relation indexRelation,
MemoryContext oldcxt;
int i;
+ indnkeyatts = IndexRelationGetNumberOfKeyAttributes(indexRelation);
+
/* Allocate result space in caller context */
- *operators = ops = (Oid *) palloc(sizeof(Oid) * ncols);
- *procs = funcs = (Oid *) palloc(sizeof(Oid) * ncols);
- *strategies = strats = (uint16 *) palloc(sizeof(uint16) * ncols);
+ *operators = ops = (Oid *) palloc(sizeof(Oid) * indnkeyatts);
+ *procs = funcs = (Oid *) palloc(sizeof(Oid) * indnkeyatts);
+ *strategies = strats = (uint16 *) palloc(sizeof(uint16) * indnkeyatts);
/* Quick exit if we have the data cached already */
if (indexRelation->rd_exclstrats != NULL)
{
- memcpy(ops, indexRelation->rd_exclops, sizeof(Oid) * ncols);
- memcpy(funcs, indexRelation->rd_exclprocs, sizeof(Oid) * ncols);
- memcpy(strats, indexRelation->rd_exclstrats, sizeof(uint16) * ncols);
+ memcpy(ops, indexRelation->rd_exclops, sizeof(Oid) * indnkeyatts);
+ memcpy(funcs, indexRelation->rd_exclprocs, sizeof(Oid) * indnkeyatts);
+ memcpy(strats, indexRelation->rd_exclstrats, sizeof(uint16) * indnkeyatts);
return;
}
@@ -4606,12 +4621,12 @@ RelationGetExclusionInfo(Relation indexRelation,
arr = DatumGetArrayTypeP(val); /* ensure not toasted */
nelem = ARR_DIMS(arr)[0];
if (ARR_NDIM(arr) != 1 ||
- nelem != ncols ||
+ nelem != indnkeyatts ||
ARR_HASNULL(arr) ||
ARR_ELEMTYPE(arr) != OIDOID)
elog(ERROR, "conexclop is not a 1-D Oid array");
- memcpy(ops, ARR_DATA_PTR(arr), sizeof(Oid) * ncols);
+ memcpy(ops, ARR_DATA_PTR(arr), sizeof(Oid) * indnkeyatts);
}
systable_endscan(conscan);
@@ -4622,7 +4637,7 @@ RelationGetExclusionInfo(Relation indexRelation,
RelationGetRelationName(indexRelation));
/* We need the func OIDs and strategy numbers too */
- for (i = 0; i < ncols; i++)
+ for (i = 0; i < indnkeyatts; i++)
{
funcs[i] = get_opcode(ops[i]);
strats[i] = get_op_opfamily_strategy(ops[i],
@@ -4635,12 +4650,12 @@ RelationGetExclusionInfo(Relation indexRelation,
/* Save a copy of the results in the relcache entry. */
oldcxt = MemoryContextSwitchTo(indexRelation->rd_indexcxt);
- indexRelation->rd_exclops = (Oid *) palloc(sizeof(Oid) * ncols);
- indexRelation->rd_exclprocs = (Oid *) palloc(sizeof(Oid) * ncols);
- indexRelation->rd_exclstrats = (uint16 *) palloc(sizeof(uint16) * ncols);
- memcpy(indexRelation->rd_exclops, ops, sizeof(Oid) * ncols);
- memcpy(indexRelation->rd_exclprocs, funcs, sizeof(Oid) * ncols);
- memcpy(indexRelation->rd_exclstrats, strats, sizeof(uint16) * ncols);
+ indexRelation->rd_exclops = (Oid *) palloc(sizeof(Oid) * indnkeyatts);
+ indexRelation->rd_exclprocs = (Oid *) palloc(sizeof(Oid) * indnkeyatts);
+ indexRelation->rd_exclstrats = (uint16 *) palloc(sizeof(uint16) * indnkeyatts);
+ memcpy(indexRelation->rd_exclops, ops, sizeof(Oid) * indnkeyatts);
+ memcpy(indexRelation->rd_exclprocs, funcs, sizeof(Oid) * indnkeyatts);
+ memcpy(indexRelation->rd_exclstrats, strats, sizeof(uint16) * indnkeyatts);
MemoryContextSwitchTo(oldcxt);
}
diff --git a/src/backend/utils/sort/tuplesort.c b/src/backend/utils/sort/tuplesort.c
index aa8e0e4..e59c296 100644
--- a/src/backend/utils/sort/tuplesort.c
+++ b/src/backend/utils/sort/tuplesort.c
@@ -821,7 +821,7 @@ tuplesort_begin_cluster(TupleDesc tupDesc,
workMem, randomAccess ? 't' : 'f');
#endif
- state->nKeys = RelationGetNumberOfAttributes(indexRel);
+ state->nKeys = IndexRelationGetNumberOfKeyAttributes(indexRel);
TRACE_POSTGRESQL_SORT_START(CLUSTER_SORT,
false, /* no unique check */
@@ -913,7 +913,7 @@ tuplesort_begin_index_btree(Relation heapRel,
workMem, randomAccess ? 't' : 'f');
#endif
- state->nKeys = RelationGetNumberOfAttributes(indexRel);
+ state->nKeys = IndexRelationGetNumberOfKeyAttributes(indexRel);
TRACE_POSTGRESQL_SORT_START(INDEX_SORT,
enforceUnique,
@@ -933,7 +933,6 @@ tuplesort_begin_index_btree(Relation heapRel,
state->enforceUnique = enforceUnique;
indexScanKey = _bt_mkscankey_nodata(indexRel);
- state->nKeys = RelationGetNumberOfAttributes(indexRel);
/* Prepare SortSupport data for each column */
state->sortKeys = (SortSupport) palloc0(state->nKeys *
diff --git a/src/bin/pg_dump/pg_dump.c b/src/bin/pg_dump/pg_dump.c
index a5c2d09..af179cf 100644
--- a/src/bin/pg_dump/pg_dump.c
+++ b/src/bin/pg_dump/pg_dump.c
@@ -6130,7 +6130,8 @@ getIndexes(Archive *fout, TableInfo tblinfo[], int numTables)
i_oid,
i_indexname,
i_indexdef,
- i_indnkeys,
+ i_indnnkeyatts,
+ i_indnatts,
i_indkey,
i_indisclustered,
i_indisreplident,
@@ -6181,7 +6182,42 @@ getIndexes(Archive *fout, TableInfo tblinfo[], int numTables)
* is not.
*/
resetPQExpBuffer(query);
- if (fout->remoteVersion >= 90400)
+ if (fout->remoteVersion >= 90600)
+ {
+ /*
+ * In 9.6 we add INCLUDING columns functionality
+ * that requires new fields to be added.
+ * i.indnkeyattrs is new, and besides we should use
+ * i.indnatts instead of t.relnatts for index relations.
+ *
+ */
+ appendPQExpBuffer(query,
+ "SELECT t.tableoid, t.oid, "
+ "t.relname AS indexname, "
+ "pg_catalog.pg_get_indexdef(i.indexrelid) AS indexdef, "
+ "i.indnkeyatts AS indnkeyatts, "
+ "i.indnatts AS indnatts, "
+ "i.indkey, i.indisclustered, "
+ "i.indisreplident, t.relpages, "
+ "c.contype, c.conname, "
+ "c.condeferrable, c.condeferred, "
+ "c.tableoid AS contableoid, "
+ "c.oid AS conoid, "
+ "pg_catalog.pg_get_constraintdef(c.oid, false) AS condef, "
+ "(SELECT spcname FROM pg_catalog.pg_tablespace s WHERE s.oid = t.reltablespace) AS tablespace, "
+ "t.reloptions AS indreloptions "
+ "FROM pg_catalog.pg_index i "
+ "JOIN pg_catalog.pg_class t ON (t.oid = i.indexrelid) "
+ "LEFT JOIN pg_catalog.pg_constraint c "
+ "ON (i.indrelid = c.conrelid AND "
+ "i.indexrelid = c.conindid AND "
+ "c.contype IN ('p','u','x')) "
+ "WHERE i.indrelid = '%u'::pg_catalog.oid "
+ "AND i.indisvalid AND i.indisready "
+ "ORDER BY indexname",
+ tbinfo->dobj.catId.oid);
+ }
+ else if (fout->remoteVersion >= 90400)
{
/*
* the test on indisready is necessary in 9.2, and harmless in
@@ -6191,6 +6227,8 @@ getIndexes(Archive *fout, TableInfo tblinfo[], int numTables)
"SELECT t.tableoid, t.oid, "
"t.relname AS indexname, "
"pg_catalog.pg_get_indexdef(i.indexrelid) AS indexdef, "
+ "NULL AS indnkeyatts, "
+ "NULL AS indnatts, "
"t.relnatts AS indnkeys, "
"i.indkey, i.indisclustered, "
"i.indisreplident, t.relpages, "
@@ -6222,6 +6260,8 @@ getIndexes(Archive *fout, TableInfo tblinfo[], int numTables)
"SELECT t.tableoid, t.oid, "
"t.relname AS indexname, "
"pg_catalog.pg_get_indexdef(i.indexrelid) AS indexdef, "
+ "NULL AS indnkeyatts, "
+ "NULL AS indnatts, "
"t.relnatts AS indnkeys, "
"i.indkey, i.indisclustered, "
"false AS indisreplident, t.relpages, "
@@ -6249,6 +6289,8 @@ getIndexes(Archive *fout, TableInfo tblinfo[], int numTables)
"SELECT t.tableoid, t.oid, "
"t.relname AS indexname, "
"pg_catalog.pg_get_indexdef(i.indexrelid) AS indexdef, "
+ "NULL AS indnkeyatts, "
+ "NULL AS indnatts, "
"t.relnatts AS indnkeys, "
"i.indkey, i.indisclustered, "
"false AS indisreplident, t.relpages, "
@@ -6279,6 +6321,8 @@ getIndexes(Archive *fout, TableInfo tblinfo[], int numTables)
"SELECT t.tableoid, t.oid, "
"t.relname AS indexname, "
"pg_catalog.pg_get_indexdef(i.indexrelid) AS indexdef, "
+ "NULL AS indnkeyatts, "
+ "NULL AS indnatts, "
"t.relnatts AS indnkeys, "
"i.indkey, i.indisclustered, "
"false AS indisreplident, t.relpages, "
@@ -6308,6 +6352,8 @@ getIndexes(Archive *fout, TableInfo tblinfo[], int numTables)
"SELECT t.tableoid, t.oid, "
"t.relname AS indexname, "
"pg_catalog.pg_get_indexdef(i.indexrelid) AS indexdef, "
+ "NULL AS indnkeyatts, "
+ "NULL AS indnatts, "
"t.relnatts AS indnkeys, "
"i.indkey, i.indisclustered, "
"false AS indisreplident, t.relpages, "
@@ -6337,6 +6383,8 @@ getIndexes(Archive *fout, TableInfo tblinfo[], int numTables)
"SELECT t.tableoid, t.oid, "
"t.relname AS indexname, "
"pg_get_indexdef(i.indexrelid) AS indexdef, "
+ "NULL AS indnkeyatts, "
+ "NULL AS indnatts, "
"t.relnatts AS indnkeys, "
"i.indkey, false AS indisclustered, "
"false AS indisreplident, t.relpages, "
@@ -6364,6 +6412,8 @@ getIndexes(Archive *fout, TableInfo tblinfo[], int numTables)
"t.oid, "
"t.relname AS indexname, "
"pg_get_indexdef(i.indexrelid) AS indexdef, "
+ "NULL AS indnkeyatts, "
+ "NULL AS indnatts, "
"t.relnatts AS indnkeys, "
"i.indkey, false AS indisclustered, "
"false AS indisreplident, t.relpages, "
@@ -6392,7 +6442,8 @@ getIndexes(Archive *fout, TableInfo tblinfo[], int numTables)
i_oid = PQfnumber(res, "oid");
i_indexname = PQfnumber(res, "indexname");
i_indexdef = PQfnumber(res, "indexdef");
- i_indnkeys = PQfnumber(res, "indnkeys");
+ i_indnnkeyatts = PQfnumber(res, "indnkeyatts");
+ i_indnatts = PQfnumber(res, "indnatts");
i_indkey = PQfnumber(res, "indkey");
i_indisclustered = PQfnumber(res, "indisclustered");
i_indisreplident = PQfnumber(res, "indisreplident");
@@ -6422,7 +6473,8 @@ getIndexes(Archive *fout, TableInfo tblinfo[], int numTables)
indxinfo[j].dobj.namespace = tbinfo->dobj.namespace;
indxinfo[j].indextable = tbinfo;
indxinfo[j].indexdef = pg_strdup(PQgetvalue(res, j, i_indexdef));
- indxinfo[j].indnkeys = atoi(PQgetvalue(res, j, i_indnkeys));
+ indxinfo[j].indnkeyattrs = atoi(PQgetvalue(res, j, i_indnnkeyatts));
+ indxinfo[j].indnattrs = atoi(PQgetvalue(res, j, i_indnatts));
indxinfo[j].tablespace = pg_strdup(PQgetvalue(res, j, i_tablespace));
indxinfo[j].indreloptions = pg_strdup(PQgetvalue(res, j, i_indreloptions));
@@ -16039,7 +16091,7 @@ dumpConstraint(Archive *fout, ConstraintInfo *coninfo)
{
appendPQExpBuffer(q, "%s (",
coninfo->contype == 'p' ? "PRIMARY KEY" : "UNIQUE");
- for (k = 0; k < indxinfo->indnkeys; k++)
+ for (k = 0; k < indxinfo->indnkeyattrs; k++)
{
int indkey = (int) indxinfo->indkeys[k];
const char *attname;
@@ -16053,6 +16105,23 @@ dumpConstraint(Archive *fout, ConstraintInfo *coninfo)
fmtId(attname));
}
+ if (indxinfo->indnkeyattrs < indxinfo->indnattrs)
+ appendPQExpBuffer(q, ") INCLUDING (");
+
+ for (k = indxinfo->indnkeyattrs; k < indxinfo->indnattrs; k++)
+ {
+ int indkey = (int) indxinfo->indkeys[k];
+ const char *attname;
+
+ if (indkey == InvalidAttrNumber)
+ break;
+ attname = getAttrName(indkey, tbinfo);
+
+ appendPQExpBuffer(q, "%s%s",
+ (k == indxinfo->indnkeyattrs) ? "" : ", ",
+ fmtId(attname));
+ }
+
appendPQExpBufferChar(q, ')');
if (nonemptyReloptions(indxinfo->indreloptions))
diff --git a/src/bin/pg_dump/pg_dump.h b/src/bin/pg_dump/pg_dump.h
index 2bfa2d9..268ae1e 100644
--- a/src/bin/pg_dump/pg_dump.h
+++ b/src/bin/pg_dump/pg_dump.h
@@ -345,8 +345,10 @@ typedef struct _indxInfo
char *indexdef;
char *tablespace; /* tablespace in which index is stored */
char *indreloptions; /* options specified by WITH (...) */
- int indnkeys;
- Oid *indkeys;
+ int indnkeyattrs; /* number of index key attributes*/
+ int indnattrs; /* total number of index attributes*/
+ Oid *indkeys; /* In spite of the name 'indkeys' this field
+ * contains both key and nonkey attributes*/
bool indisclustered;
bool indisreplident;
/* if there is an associated constraint object, its dumpId: */
diff --git a/src/include/access/amapi.h b/src/include/access/amapi.h
index 1036cca..517bb3c 100644
--- a/src/include/access/amapi.h
+++ b/src/include/access/amapi.h
@@ -175,6 +175,8 @@ typedef struct IndexAmRoutine
bool amclusterable;
/* does AM handle predicate locks? */
bool ampredlocks;
+ /* does AM support columns included with clause INCLUDING? */
+ bool amcaninclude;
/* type of data stored in index, or InvalidOid if variable */
Oid amkeytype;
diff --git a/src/include/access/itup.h b/src/include/access/itup.h
index 8350fa0..b5424c3 100644
--- a/src/include/access/itup.h
+++ b/src/include/access/itup.h
@@ -18,6 +18,7 @@
#include "access/tupmacs.h"
#include "storage/bufpage.h"
#include "storage/itemptr.h"
+#include "utils/rel.h"
/*
* Index tuple header structure
@@ -147,5 +148,6 @@ extern Datum nocache_index_getattr(IndexTuple tup, int attnum,
extern void index_deform_tuple(IndexTuple tup, TupleDesc tupleDescriptor,
Datum *values, bool *isnull);
extern IndexTuple CopyIndexTuple(IndexTuple source);
+extern IndexTuple index_truncate_tuple(Relation idxrel, IndexTuple olditup);
#endif /* ITUP_H */
diff --git a/src/include/access/nbtree.h b/src/include/access/nbtree.h
index c580f51..e9c3fe7 100644
--- a/src/include/access/nbtree.h
+++ b/src/include/access/nbtree.h
@@ -684,7 +684,8 @@ extern bool _bt_doinsert(Relation rel, IndexTuple itup,
IndexUniqueCheck checkUnique, Relation heapRel);
extern Buffer _bt_getstackbuf(Relation rel, BTStack stack, int access);
extern void _bt_finish_split(Relation rel, Buffer bbuf, BTStack stack);
-
+extern bool _bt_pgaddtup(Page page, Size itemsize, IndexTuple itup,
+ OffsetNumber itup_off);
/*
* prototypes for functions in nbtpage.c
*/
diff --git a/src/include/catalog/catversion.h b/src/include/catalog/catversion.h
index c04edad..6d40e4b 100644
--- a/src/include/catalog/catversion.h
+++ b/src/include/catalog/catversion.h
@@ -53,6 +53,6 @@
*/
/* yyyymmddN */
-#define CATALOG_VERSION_NO 201608231
+#define CATALOG_VERSION_NO 201609211
#endif
diff --git a/src/include/catalog/pg_constraint.h b/src/include/catalog/pg_constraint.h
index 666b230..bff2fd1 100644
--- a/src/include/catalog/pg_constraint.h
+++ b/src/include/catalog/pg_constraint.h
@@ -99,6 +99,12 @@ CATALOG(pg_constraint,2606)
int16 conkey[1];
/*
+ * Columns of conrelid that the constraint does not apply to,
+ * but included into the same index with key columns.
+ */
+ int16 conincluding[1];
+
+ /*
* If a foreign key, the referenced columns of confrelid
*/
int16 confkey[1];
@@ -150,7 +156,7 @@ typedef FormData_pg_constraint *Form_pg_constraint;
* compiler constants for pg_constraint
* ----------------
*/
-#define Natts_pg_constraint 24
+#define Natts_pg_constraint 25
#define Anum_pg_constraint_conname 1
#define Anum_pg_constraint_connamespace 2
#define Anum_pg_constraint_contype 3
@@ -168,13 +174,14 @@ typedef FormData_pg_constraint *Form_pg_constraint;
#define Anum_pg_constraint_coninhcount 15
#define Anum_pg_constraint_connoinherit 16
#define Anum_pg_constraint_conkey 17
-#define Anum_pg_constraint_confkey 18
-#define Anum_pg_constraint_conpfeqop 19
-#define Anum_pg_constraint_conppeqop 20
-#define Anum_pg_constraint_conffeqop 21
-#define Anum_pg_constraint_conexclop 22
-#define Anum_pg_constraint_conbin 23
-#define Anum_pg_constraint_consrc 24
+#define Anum_pg_constraint_conincluding 18
+#define Anum_pg_constraint_confkey 19
+#define Anum_pg_constraint_conpfeqop 20
+#define Anum_pg_constraint_conppeqop 21
+#define Anum_pg_constraint_conffeqop 22
+#define Anum_pg_constraint_conexclop 23
+#define Anum_pg_constraint_conbin 24
+#define Anum_pg_constraint_consrc 25
/* ----------------
* initial contents of pg_constraint
diff --git a/src/include/catalog/pg_constraint_fn.h b/src/include/catalog/pg_constraint_fn.h
index 1f11174..72f4502 100644
--- a/src/include/catalog/pg_constraint_fn.h
+++ b/src/include/catalog/pg_constraint_fn.h
@@ -27,30 +27,31 @@ typedef enum ConstraintCategory
CONSTRAINT_ASSERTION /* for future expansion */
} ConstraintCategory;
-extern Oid CreateConstraintEntry(const char *constraintName,
+extern Oid CreateConstraintEntry(const char* constraintName,
Oid constraintNamespace,
char constraintType,
bool isDeferrable,
bool isDeferred,
bool isValidated,
Oid relId,
- const int16 *constraintKey,
+ const int16* constraintKey,
int constraintNKeys,
+ int constraintNTotalKeys,
Oid domainId,
Oid indexRelId,
Oid foreignRelId,
- const int16 *foreignKey,
- const Oid *pfEqOp,
- const Oid *ppEqOp,
- const Oid *ffEqOp,
+ const int16* foreignKey,
+ const Oid* pfEqOp,
+ const Oid* ppEqOp,
+ const Oid* ffEqOp,
int foreignNKeys,
char foreignUpdateType,
char foreignDeleteType,
char foreignMatchType,
- const Oid *exclOp,
- Node *conExpr,
- const char *conBin,
- const char *conSrc,
+ const Oid* exclOp,
+ Node* conExpr,
+ const char* conBin,
+ const char* conSrc,
bool conIsLocal,
int conInhCount,
bool conNoInherit,
diff --git a/src/include/catalog/pg_index.h b/src/include/catalog/pg_index.h
index ee97c5d..fcbd18a 100644
--- a/src/include/catalog/pg_index.h
+++ b/src/include/catalog/pg_index.h
@@ -32,7 +32,8 @@ CATALOG(pg_index,2610) BKI_WITHOUT_OIDS BKI_SCHEMA_MACRO
{
Oid indexrelid; /* OID of the index */
Oid indrelid; /* OID of the relation it indexes */
- int16 indnatts; /* number of columns in index */
+ int16 indnatts; /* total number of columns in index */
+ int16 indnkeyatts; /* number of key columns in index */
bool indisunique; /* is this a unique index? */
bool indisprimary; /* is this index for primary key? */
bool indisexclusion; /* is this index for exclusion constraint? */
@@ -70,26 +71,27 @@ typedef FormData_pg_index *Form_pg_index;
* compiler constants for pg_index
* ----------------
*/
-#define Natts_pg_index 19
+#define Natts_pg_index 20
#define Anum_pg_index_indexrelid 1
#define Anum_pg_index_indrelid 2
#define Anum_pg_index_indnatts 3
-#define Anum_pg_index_indisunique 4
-#define Anum_pg_index_indisprimary 5
-#define Anum_pg_index_indisexclusion 6
-#define Anum_pg_index_indimmediate 7
-#define Anum_pg_index_indisclustered 8
-#define Anum_pg_index_indisvalid 9
-#define Anum_pg_index_indcheckxmin 10
-#define Anum_pg_index_indisready 11
-#define Anum_pg_index_indislive 12
-#define Anum_pg_index_indisreplident 13
-#define Anum_pg_index_indkey 14
-#define Anum_pg_index_indcollation 15
-#define Anum_pg_index_indclass 16
-#define Anum_pg_index_indoption 17
-#define Anum_pg_index_indexprs 18
-#define Anum_pg_index_indpred 19
+#define Anum_pg_index_indnkeyatts 4
+#define Anum_pg_index_indisunique 5
+#define Anum_pg_index_indisprimary 6
+#define Anum_pg_index_indisexclusion 7
+#define Anum_pg_index_indimmediate 8
+#define Anum_pg_index_indisclustered 9
+#define Anum_pg_index_indisvalid 10
+#define Anum_pg_index_indcheckxmin 11
+#define Anum_pg_index_indisready 12
+#define Anum_pg_index_indislive 13
+#define Anum_pg_index_indisreplident 14
+#define Anum_pg_index_indkey 15
+#define Anum_pg_index_indcollation 16
+#define Anum_pg_index_indclass 17
+#define Anum_pg_index_indoption 18
+#define Anum_pg_index_indexprs 19
+#define Anum_pg_index_indpred 20
/*
* Index AMs that support ordered scans must support these two indoption
diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h
index e28477d..0f91e6e 100644
--- a/src/include/nodes/execnodes.h
+++ b/src/include/nodes/execnodes.h
@@ -33,9 +33,11 @@
* entries for a particular index. Used for both index_build and
* retail creation of index entries.
*
- * NumIndexAttrs number of columns in this index
+ * NumIndexAttrs total number of columns in this index
+ * NumIndexKeyAttrs number of key columns in index
* KeyAttrNumbers underlying-rel attribute numbers used as keys
- * (zeroes indicate expressions)
+ * (zeroes indicate expressions). It also contains
+ * info about included columns.
* Expressions expr trees for expression entries, or NIL if none
* ExpressionsState exec state for expressions, or NIL if none
* Predicate partial-index predicate, or NIL if none
@@ -58,7 +60,8 @@
typedef struct IndexInfo
{
NodeTag type;
- int ii_NumIndexAttrs;
+ int ii_NumIndexAttrs; /* total number of columns in index */
+ int ii_NumIndexKeyAttrs; /* number of key columns in index */
AttrNumber ii_KeyAttrNumbers[INDEX_MAX_KEYS];
List *ii_Expressions; /* list of Expr */
List *ii_ExpressionsState; /* list of ExprState */
diff --git a/src/include/nodes/parsenodes.h b/src/include/nodes/parsenodes.h
index 3716c2e..1ff07fe 100644
--- a/src/include/nodes/parsenodes.h
+++ b/src/include/nodes/parsenodes.h
@@ -1837,7 +1837,8 @@ typedef struct Constraint
char *cooked_expr; /* expr, as nodeToString representation */
/* Fields used for unique constraints (UNIQUE and PRIMARY KEY): */
- List *keys; /* String nodes naming referenced column(s) */
+ List *keys; /* String nodes naming referenced key column(s) */
+ List *including; /* String nodes naming referenced nonkey column(s) */
/* Fields used for EXCLUSION constraints: */
List *exclusions; /* list of (IndexElem, operator name) pairs */
@@ -2441,6 +2442,8 @@ typedef struct IndexStmt
char *accessMethod; /* name of access method (eg. btree) */
char *tableSpace; /* tablespace, or NULL for default */
List *indexParams; /* columns to index: a list of IndexElem */
+ List *indexIncludingParams; /* additional columns to index:
+ * a list of IndexElem */
List *options; /* WITH clause options: a list of DefElem */
Node *whereClause; /* qualification (partial-index predicate) */
List *excludeOpNames; /* exclusion operator names, or NIL if none */
diff --git a/src/include/nodes/relation.h b/src/include/nodes/relation.h
index 2709cc7..6839638 100644
--- a/src/include/nodes/relation.h
+++ b/src/include/nodes/relation.h
@@ -548,11 +548,12 @@ typedef struct RelOptInfo
* IndexOptInfo
* Per-index information for planning/optimization
*
- * indexkeys[], indexcollations[], opfamily[], and opcintype[]
- * each have ncolumns entries.
+ * indexkeys[], indexcollations[] each have ncolumns entries.
+ * opfamily[], and opcintype[] each have nkeycolumns entries. They do
+ * not contain any information about included attributes.
*
- * sortopfamily[], reverse_sort[], and nulls_first[] likewise have
- * ncolumns entries, if the index is ordered; but if it is unordered,
+ * sortopfamily[], reverse_sort[], and nulls_first[] have
+ * nkeycolumns entries, if the index is ordered; but if it is unordered,
* those pointers are NULL.
*
* Zeroes in the indexkeys[] array indicate index columns that are
@@ -589,7 +590,9 @@ typedef struct IndexOptInfo
/* index descriptor information */
int ncolumns; /* number of columns in index */
- int *indexkeys; /* column numbers of index's keys, or 0 */
+ int nkeycolumns; /* number of key columns in index */
+ int *indexkeys; /* column numbers of index's attributes
+ * both key and included columns, or 0 */
Oid *indexcollations; /* OIDs of collations of index columns */
Oid *opfamily; /* OIDs of operator families for columns */
Oid *opcintype; /* OIDs of opclass declared input data types */
diff --git a/src/include/utils/rel.h b/src/include/utils/rel.h
index ed14442..40faaa6 100644
--- a/src/include/utils/rel.h
+++ b/src/include/utils/rel.h
@@ -372,11 +372,25 @@ typedef struct ViewOptions
/*
* RelationGetNumberOfAttributes
- * Returns the number of attributes in a relation.
+ * Returns the total number of attributes in a relation.
*/
#define RelationGetNumberOfAttributes(relation) ((relation)->rd_rel->relnatts)
/*
+ * IndexRelationGetNumberOfAttributes
+ * Returns the number of attributes in an index.
+ */
+#define IndexRelationGetNumberOfAttributes(relation) \
+ ((relation)->rd_index->indnatts)
+
+/*
+ * IndexRelationGetNumberOfKeyAttributes
+ * Returns the number of key attributes in an index.
+ */
+#define IndexRelationGetNumberOfKeyAttributes(relation) \
+ ((relation)->rd_index->indnkeyatts)
+
+/*
* RelationGetDescr
* Returns tuple descriptor for a relation.
*/
diff --git a/src/test/regress/expected/create_index.out b/src/test/regress/expected/create_index.out
index 76593e1..65e0e7f 100644
--- a/src/test/regress/expected/create_index.out
+++ b/src/test/regress/expected/create_index.out
@@ -2400,6 +2400,25 @@ DETAIL: Key ((f1 || f2))=(ABCDEF) already exists.
-- but this shouldn't:
INSERT INTO func_index_heap VALUES('QWERTY');
--
+-- Test unique index with included columns
+--
+CREATE TABLE covering_index_heap (f1 int, f2 int, f3 text);
+CREATE UNIQUE INDEX covering_index_index on covering_index_heap (f1,f2) INCLUDING(f3);
+INSERT INTO covering_index_heap VALUES(1,1,'AAA');
+INSERT INTO covering_index_heap VALUES(1,2,'AAA');
+-- this should fail because of unique index on f1,f2:
+INSERT INTO covering_index_heap VALUES(1,2,'BBB');
+ERROR: duplicate key value violates unique constraint "covering_index_index"
+DETAIL: Key (f1, f2)=(1, 2) already exists.
+-- and this shouldn't:
+INSERT INTO covering_index_heap VALUES(1,4,'AAA');
+-- Try to build index on table that already contains data
+CREATE UNIQUE INDEX covering_pkey on covering_index_heap (f1,f2) INCLUDING(f3);
+-- Try to use existing covering index as primary key
+ALTER TABLE covering_index_heap ADD CONSTRAINT covering_pkey PRIMARY KEY USING INDEX
+covering_pkey;
+DROP TABLE covering_index_heap;
+--
-- Also try building functional, expressional, and partial indexes on
-- tables that already contain data.
--
diff --git a/src/test/regress/expected/index_including.out b/src/test/regress/expected/index_including.out
new file mode 100644
index 0000000..1199671
--- /dev/null
+++ b/src/test/regress/expected/index_including.out
@@ -0,0 +1,301 @@
+/*
+ * 1.test CREATE INDEX
+ */
+ -- Regular index with included columns
+CREATE TABLE tbl (c1 int, c2 int, c3 int, c4 box);
+INSERT INTO tbl select x, 2*x, 3*x, box('4,4,4,4') from generate_series(1,10) as x;
+CREATE INDEX tbl_idx ON tbl using btree(c1, c2) INCLUDING (c3,c4);
+-- must fail because of intersection of key and included columns
+CREATE INDEX tbl_idx ON tbl using btree(c1, c2) INCLUDING (c1,c3);
+ERROR: included columns must not intersect with key columns
+DROP TABLE tbl;
+-- Unique index and unique constraint
+CREATE TABLE tbl (c1 int, c2 int, c3 int, c4 box);
+INSERT INTO tbl select x, 2*x, 3*x, box('4,4,4,4') from generate_series(1,10) as x;
+CREATE UNIQUE INDEX tbl_idx_unique ON tbl using btree(c1, c2) INCLUDING (c3,c4);
+ALTER TABLE tbl add UNIQUE USING INDEX tbl_idx_unique;
+ALTER TABLE tbl add UNIQUE(c1, c2) INCLUDING (c3, c4);
+DROP TABLE tbl;
+-- Unique index and unique constraint. Both must fail.
+CREATE TABLE tbl (c1 int, c2 int, c3 int, c4 box);
+INSERT INTO tbl select 1, 2, 3*x, box('4,4,4,4') from generate_series(1,10) as x;
+CREATE UNIQUE INDEX tbl_idx_unique ON tbl using btree(c1, c2) INCLUDING (c3,c4);
+ERROR: could not create unique index "tbl_idx_unique"
+DETAIL: Key (c1, c2)=(1, 2) is duplicated.
+ALTER TABLE tbl add UNIQUE(c1, c2) INCLUDING (c3, c4);
+ERROR: could not create unique index "tbl_c1_c2_c3_c4_key"
+DETAIL: Key (c1, c2)=(1, 2) is duplicated.
+DROP TABLE tbl;
+-- PK constraint
+CREATE TABLE tbl (c1 int, c2 int, c3 int, c4 box);
+INSERT INTO tbl select 1, 2, 3*x, box('4,4,4,4') from generate_series(1,10) as x;
+ALTER TABLE tbl add PRIMARY KEY(c1, c2) INCLUDING (c3, c4);
+ERROR: could not create unique index "tbl_pkey"
+DETAIL: Key (c1, c2)=(1, 2) is duplicated.
+DROP TABLE tbl;
+CREATE TABLE tbl (c1 int, c2 int, c3 int, c4 box);
+INSERT INTO tbl select 1, 2, 3*x, box('4,4,4,4') from generate_series(1,10) as x;
+CREATE UNIQUE INDEX tbl_idx_unique ON tbl using btree(c1, c2) INCLUDING (c3,c4);
+ERROR: could not create unique index "tbl_idx_unique"
+DETAIL: Key (c1, c2)=(1, 2) is duplicated.
+ALTER TABLE tbl add PRIMARY KEY USING INDEX tbl_idx_unique;
+ERROR: index "tbl_idx_unique" does not exist
+LINE 1: ALTER TABLE tbl add PRIMARY KEY USING INDEX tbl_idx_unique;
+ ^
+DROP TABLE tbl;
+-- PK constraint. Must fail.
+CREATE TABLE tbl (c1 int, c2 int, c3 int, c4 box);
+INSERT INTO tbl select 1, 2, 3*x, box('4,4,4,4') from generate_series(1,10) as x;
+ALTER TABLE tbl add PRIMARY KEY(c1, c2) INCLUDING (c3, c4);
+ERROR: could not create unique index "tbl_pkey"
+DETAIL: Key (c1, c2)=(1, 2) is duplicated.
+DROP TABLE tbl;
+/*
+ * 2. Test CREATE TABLE with constraint
+ */
+CREATE TABLE tbl (c1 int,c2 int, c3 int, c4 box,
+ CONSTRAINT covering UNIQUE(c1,c2) INCLUDING(c3,c4));
+select indexrelid::regclass, indnatts, indnkeyatts, indisunique, indisprimary, indkey, indclass from pg_index where indrelid = 'tbl'::regclass::oid;
+ indexrelid | indnatts | indnkeyatts | indisunique | indisprimary | indkey | indclass
+------------+----------+-------------+-------------+--------------+---------+-----------
+ covering | 4 | 2 | t | f | 1 2 3 4 | 1978 1978
+(1 row)
+
+select pg_get_constraintdef(oid), conname, conkey, conincluding from pg_constraint where conrelid = 'tbl'::regclass::oid;
+ pg_get_constraintdef | conname | conkey | conincluding
+------------------------------------+----------+--------+--------------
+ UNIQUE (c1, c2) INCLUDING (c3, c4) | covering | {1,2} | {3,4}
+(1 row)
+
+-- ensure that constraint works
+INSERT INTO tbl select 1, 2, 3*x, box('4,4,4,4') from generate_series(1,10) as x;
+ERROR: duplicate key value violates unique constraint "covering"
+DETAIL: Key (c1, c2)=(1, 2) already exists.
+DROP TABLE tbl;
+CREATE TABLE tbl (c1 int,c2 int, c3 int, c4 box,
+ CONSTRAINT covering PRIMARY KEY(c1,c2) INCLUDING(c3,c4));
+select indexrelid::regclass, indnatts, indnkeyatts, indisunique, indisprimary, indkey, indclass from pg_index where indrelid = 'tbl'::regclass::oid;
+ indexrelid | indnatts | indnkeyatts | indisunique | indisprimary | indkey | indclass
+------------+----------+-------------+-------------+--------------+---------+-----------
+ covering | 4 | 2 | t | t | 1 2 3 4 | 1978 1978
+(1 row)
+
+select pg_get_constraintdef(oid), conname, conkey, conincluding from pg_constraint where conrelid = 'tbl'::regclass::oid;
+ pg_get_constraintdef | conname | conkey | conincluding
+-----------------------------------------+----------+--------+--------------
+ PRIMARY KEY (c1, c2) INCLUDING (c3, c4) | covering | {1,2} | {3,4}
+(1 row)
+
+-- ensure that constraint works
+INSERT INTO tbl select 1, 2, 3*x, box('4,4,4,4') from generate_series(1,10) as x;
+ERROR: duplicate key value violates unique constraint "covering"
+DETAIL: Key (c1, c2)=(1, 2) already exists.
+INSERT INTO tbl select 1, NULL, 3*x, box('4,4,4,4') from generate_series(1,10) as x;
+ERROR: null value in column "c2" violates not-null constraint
+DETAIL: Failing row contains (1, null, 3, (4,4),(4,4)).
+INSERT INTO tbl select x, 2*x, NULL, NULL from generate_series(1,10) as x;
+DROP TABLE tbl;
+CREATE TABLE tbl (c1 int,c2 int, c3 int, c4 box,
+ UNIQUE(c1,c2) INCLUDING(c3,c4));
+select indexrelid::regclass, indnatts, indnkeyatts, indisunique, indisprimary, indkey, indclass from pg_index where indrelid = 'tbl'::regclass::oid;
+ indexrelid | indnatts | indnkeyatts | indisunique | indisprimary | indkey | indclass
+---------------------+----------+-------------+-------------+--------------+---------+-----------
+ tbl_c1_c2_c3_c4_key | 4 | 2 | t | f | 1 2 3 4 | 1978 1978
+(1 row)
+
+select pg_get_constraintdef(oid), conname, conkey, conincluding from pg_constraint where conrelid = 'tbl'::regclass::oid;
+ pg_get_constraintdef | conname | conkey | conincluding
+------------------------------------+---------------------+--------+--------------
+ UNIQUE (c1, c2) INCLUDING (c3, c4) | tbl_c1_c2_c3_c4_key | {1,2} | {3,4}
+(1 row)
+
+-- ensure that constraint works
+INSERT INTO tbl select 1, 2, 3*x, box('4,4,4,4') from generate_series(1,10) as x;
+ERROR: duplicate key value violates unique constraint "tbl_c1_c2_c3_c4_key"
+DETAIL: Key (c1, c2)=(1, 2) already exists.
+DROP TABLE tbl;
+CREATE TABLE tbl (c1 int,c2 int, c3 int, c4 box,
+ PRIMARY KEY(c1,c2) INCLUDING(c3,c4));
+select indexrelid::regclass, indnatts, indnkeyatts, indisunique, indisprimary, indkey, indclass from pg_index where indrelid = 'tbl'::regclass::oid;
+ indexrelid | indnatts | indnkeyatts | indisunique | indisprimary | indkey | indclass
+------------+----------+-------------+-------------+--------------+---------+-----------
+ tbl_pkey | 4 | 2 | t | t | 1 2 3 4 | 1978 1978
+(1 row)
+
+select pg_get_constraintdef(oid), conname, conkey, conincluding from pg_constraint where conrelid = 'tbl'::regclass::oid;
+ pg_get_constraintdef | conname | conkey | conincluding
+-----------------------------------------+----------+--------+--------------
+ PRIMARY KEY (c1, c2) INCLUDING (c3, c4) | tbl_pkey | {1,2} | {3,4}
+(1 row)
+
+-- ensure that constraint works
+INSERT INTO tbl select 1, 2, 3*x, box('4,4,4,4') from generate_series(1,10) as x;
+ERROR: duplicate key value violates unique constraint "tbl_pkey"
+DETAIL: Key (c1, c2)=(1, 2) already exists.
+INSERT INTO tbl select 1, NULL, 3*x, box('4,4,4,4') from generate_series(1,10) as x;
+ERROR: null value in column "c2" violates not-null constraint
+DETAIL: Failing row contains (1, null, 3, (4,4),(4,4)).
+INSERT INTO tbl select x, 2*x, NULL, NULL from generate_series(1,10) as x;
+DROP TABLE tbl;
+/*
+ * 3.0 Test ALTER TABLE DROP COLUMN.
+ * Any column deletion leads to index deletion.
+ */
+CREATE TABLE tbl (c1 int,c2 int, c3 int, c4 int);
+CREATE UNIQUE INDEX tbl_idx ON tbl using btree(c1, c2, c3, c4);
+select indexdef from pg_indexes where tablename = 'tbl' order by indexname;
+ indexdef
+-----------------------------------------------------------------
+ CREATE UNIQUE INDEX tbl_idx ON tbl USING btree (c1, c2, c3, c4)
+(1 row)
+
+ALTER TABLE tbl DROP COLUMN c3;
+select indexdef from pg_indexes where tablename = 'tbl' order by indexname;
+ indexdef
+----------
+(0 rows)
+
+DROP TABLE tbl;
+/*
+ * 3.1 Test ALTER TABLE DROP COLUMN.
+ * Included column deletion leads to the index deletion,
+ * as well as key columns deletion. It's explained in documentation.
+ */
+CREATE TABLE tbl (c1 int,c2 int, c3 int, c4 box);
+CREATE UNIQUE INDEX tbl_idx ON tbl using btree(c1, c2) INCLUDING(c3,c4);
+select indexdef from pg_indexes where tablename = 'tbl' order by indexname;
+ indexdef
+----------------------------------------------------------------------------
+ CREATE UNIQUE INDEX tbl_idx ON tbl USING btree (c1, c2) INCLUDING (c3, c4)
+(1 row)
+
+ALTER TABLE tbl DROP COLUMN c3;
+select indexdef from pg_indexes where tablename = 'tbl' order by indexname;
+ indexdef
+----------
+(0 rows)
+
+DROP TABLE tbl;
+/*
+ * 3.2 Test ALTER TABLE DROP COLUMN.
+ * Included column deletion leads to the index deletion.
+ * as well as key columns deletion. It's explained in documentation.
+ */
+CREATE TABLE tbl (c1 int,c2 int, c3 int, c4 box, UNIQUE(c1, c2) INCLUDING(c3,c4));
+select indexdef from pg_indexes where tablename = 'tbl' order by indexname;
+ indexdef
+----------------------------------------------------------------------------------------
+ CREATE UNIQUE INDEX tbl_c1_c2_c3_c4_key ON tbl USING btree (c1, c2) INCLUDING (c3, c4)
+(1 row)
+
+ALTER TABLE tbl DROP COLUMN c3;
+select indexdef from pg_indexes where tablename = 'tbl' order by indexname;
+ indexdef
+----------
+(0 rows)
+
+ALTER TABLE tbl DROP COLUMN c1;
+select indexdef from pg_indexes where tablename = 'tbl' order by indexname;
+ indexdef
+----------
+(0 rows)
+
+DROP TABLE tbl;
+/*
+ * 4. CREATE INDEX CONCURRENTLY
+ */
+CREATE TABLE tbl (c1 int,c2 int, c3 int, c4 box, UNIQUE(c1, c2) INCLUDING(c3,c4));
+INSERT INTO tbl select x, 2*x, 3*x, box('4,4,4,4') from generate_series(1,1000) as x;
+CREATE UNIQUE INDEX CONCURRENTLY on tbl (c1, c2) INCLUDING (c3, c4);
+select indexdef from pg_indexes where tablename = 'tbl' order by indexname;
+ indexdef
+----------------------------------------------------------------------------------------
+ CREATE UNIQUE INDEX tbl_c1_c2_c3_c4_idx ON tbl USING btree (c1, c2) INCLUDING (c3, c4)
+ CREATE UNIQUE INDEX tbl_c1_c2_c3_c4_key ON tbl USING btree (c1, c2) INCLUDING (c3, c4)
+(2 rows)
+
+DROP TABLE tbl;
+/*
+ * 5. REINDEX
+ */
+CREATE TABLE tbl (c1 int,c2 int, c3 int, c4 box, UNIQUE(c1, c2) INCLUDING(c3,c4));
+select indexdef from pg_indexes where tablename = 'tbl' order by indexname;
+ indexdef
+----------------------------------------------------------------------------------------
+ CREATE UNIQUE INDEX tbl_c1_c2_c3_c4_key ON tbl USING btree (c1, c2) INCLUDING (c3, c4)
+(1 row)
+
+ALTER TABLE tbl DROP COLUMN c3;
+select indexdef from pg_indexes where tablename = 'tbl' order by indexname;
+ indexdef
+----------
+(0 rows)
+
+REINDEX INDEX tbl_c1_c2_c3_c4_key;
+ERROR: relation "tbl_c1_c2_c3_c4_key" does not exist
+select indexdef from pg_indexes where tablename = 'tbl' order by indexname;
+ indexdef
+----------
+(0 rows)
+
+ALTER TABLE tbl DROP COLUMN c1;
+select indexdef from pg_indexes where tablename = 'tbl' order by indexname;
+ indexdef
+----------
+(0 rows)
+
+DROP TABLE tbl;
+/*
+ * 7. Check various AMs. All but brtee must fail.
+ */
+CREATE TABLE tbl (c1 int,c2 int, c3 box, c4 box);
+CREATE INDEX on tbl USING brin(c1, c2) INCLUDING (c3, c4);
+ERROR: access method "brin" does not support included columns
+CREATE INDEX on tbl USING gist(c3) INCLUDING (c4);
+ERROR: access method "gist" does not support included columns
+CREATE INDEX on tbl USING spgist(c3) INCLUDING (c4);
+ERROR: access method "spgist" does not support included columns
+CREATE INDEX on tbl USING gin(c1, c2) INCLUDING (c3, c4);
+ERROR: access method "gin" does not support included columns
+CREATE INDEX on tbl USING hash(c1, c2) INCLUDING (c3, c4);
+WARNING: hash indexes are not WAL-logged and their use is discouraged
+ERROR: access method "hash" does not support included columns
+CREATE INDEX on tbl USING rtree(c1, c2) INCLUDING (c3, c4);
+NOTICE: substituting access method "gist" for obsolete method "rtree"
+ERROR: access method "gist" does not support included columns
+CREATE INDEX on tbl USING btree(c1, c2) INCLUDING (c3, c4);
+DROP TABLE tbl;
+/*
+ * 8. Update, delete values in indexed table.
+ */
+CREATE TABLE tbl (c1 int, c2 int, c3 int, c4 box);
+INSERT INTO tbl select x, 2*x, 3*x, box('4,4,4,4') from generate_series(1,10) as x;
+CREATE UNIQUE INDEX tbl_idx_unique ON tbl using btree(c1, c2) INCLUDING (c3,c4);
+UPDATE tbl SET c1 = 100 WHERE c1 = 2;
+UPDATE tbl SET c1 = 1 WHERE c1 = 3;
+-- should fail
+UPDATE tbl SET c2 = 2 WHERE c1 = 1;
+ERROR: duplicate key value violates unique constraint "tbl_idx_unique"
+DETAIL: Key (c1, c2)=(1, 2) already exists.
+UPDATE tbl SET c3 = 1;
+DELETE FROM tbl WHERE c1 = 5 OR c3 = 12;
+DROP TABLE tbl;
+/*
+ * 9. Alter column type.
+ */
+CREATE TABLE tbl (c1 int,c2 int, c3 int, c4 box, UNIQUE(c1, c2) INCLUDING(c3,c4));
+INSERT INTO tbl select x, 2*x, 3*x, box('4,4,4,4') from generate_series(1,10) as x;
+ALTER TABLE tbl ALTER c1 TYPE bigint;
+ALTER TABLE tbl ALTER c3 TYPE bigint;
+\d tbl
+ Table "public.tbl"
+ Column | Type | Modifiers
+--------+---------+-----------
+ c1 | bigint |
+ c2 | integer |
+ c3 | bigint |
+ c4 | box |
+Indexes:
+ "tbl_c1_c2_c3_c4_key" UNIQUE CONSTRAINT, btree (c1, c2) INCLUDING (c3, c4)
+
+DROP TABLE tbl;
diff --git a/src/test/regress/parallel_schedule b/src/test/regress/parallel_schedule
index 1cb5dfc..111b835 100644
--- a/src/test/regress/parallel_schedule
+++ b/src/test/regress/parallel_schedule
@@ -55,7 +55,7 @@ test: copy copyselect copydml
# ----------
test: create_misc create_operator
# These depend on the above two
-test: create_index create_view
+test: create_index create_view index_including
# ----------
# Another group of parallel tests
diff --git a/src/test/regress/serial_schedule b/src/test/regress/serial_schedule
index 8958d8c..be22299 100644
--- a/src/test/regress/serial_schedule
+++ b/src/test/regress/serial_schedule
@@ -61,6 +61,7 @@ test: copydml
test: create_misc
test: create_operator
test: create_index
+test: index_including
test: create_view
test: create_aggregate
test: create_function_3
diff --git a/src/test/regress/sql/create_index.sql b/src/test/regress/sql/create_index.sql
index 71f4f54..bbc0cff 100644
--- a/src/test/regress/sql/create_index.sql
+++ b/src/test/regress/sql/create_index.sql
@@ -732,6 +732,26 @@ INSERT INTO func_index_heap VALUES('ABCD', 'EF');
INSERT INTO func_index_heap VALUES('QWERTY');
--
+-- Test unique index with included columns
+--
+CREATE TABLE covering_index_heap (f1 int, f2 int, f3 text);
+CREATE UNIQUE INDEX covering_index_index on covering_index_heap (f1,f2) INCLUDING(f3);
+
+INSERT INTO covering_index_heap VALUES(1,1,'AAA');
+INSERT INTO covering_index_heap VALUES(1,2,'AAA');
+-- this should fail because of unique index on f1,f2:
+INSERT INTO covering_index_heap VALUES(1,2,'BBB');
+-- and this shouldn't:
+INSERT INTO covering_index_heap VALUES(1,4,'AAA');
+-- Try to build index on table that already contains data
+CREATE UNIQUE INDEX covering_pkey on covering_index_heap (f1,f2) INCLUDING(f3);
+-- Try to use existing covering index as primary key
+ALTER TABLE covering_index_heap ADD CONSTRAINT covering_pkey PRIMARY KEY USING INDEX
+covering_pkey;
+DROP TABLE covering_index_heap;
+
+
+--
-- Also try building functional, expressional, and partial indexes on
-- tables that already contain data.
--
diff --git a/src/test/regress/sql/index_including.sql b/src/test/regress/sql/index_including.sql
new file mode 100644
index 0000000..c4c61c5
--- /dev/null
+++ b/src/test/regress/sql/index_including.sql
@@ -0,0 +1,181 @@
+/*
+ * 1.test CREATE INDEX
+ */
+ -- Regular index with included columns
+CREATE TABLE tbl (c1 int, c2 int, c3 int, c4 box);
+INSERT INTO tbl select x, 2*x, 3*x, box('4,4,4,4') from generate_series(1,10) as x;
+CREATE INDEX tbl_idx ON tbl using btree(c1, c2) INCLUDING (c3,c4);
+-- must fail because of intersection of key and included columns
+CREATE INDEX tbl_idx ON tbl using btree(c1, c2) INCLUDING (c1,c3);
+DROP TABLE tbl;
+
+-- Unique index and unique constraint
+CREATE TABLE tbl (c1 int, c2 int, c3 int, c4 box);
+INSERT INTO tbl select x, 2*x, 3*x, box('4,4,4,4') from generate_series(1,10) as x;
+CREATE UNIQUE INDEX tbl_idx_unique ON tbl using btree(c1, c2) INCLUDING (c3,c4);
+ALTER TABLE tbl add UNIQUE USING INDEX tbl_idx_unique;
+ALTER TABLE tbl add UNIQUE(c1, c2) INCLUDING (c3, c4);
+DROP TABLE tbl;
+
+-- Unique index and unique constraint. Both must fail.
+CREATE TABLE tbl (c1 int, c2 int, c3 int, c4 box);
+INSERT INTO tbl select 1, 2, 3*x, box('4,4,4,4') from generate_series(1,10) as x;
+CREATE UNIQUE INDEX tbl_idx_unique ON tbl using btree(c1, c2) INCLUDING (c3,c4);
+ALTER TABLE tbl add UNIQUE(c1, c2) INCLUDING (c3, c4);
+DROP TABLE tbl;
+
+-- PK constraint
+CREATE TABLE tbl (c1 int, c2 int, c3 int, c4 box);
+INSERT INTO tbl select 1, 2, 3*x, box('4,4,4,4') from generate_series(1,10) as x;
+ALTER TABLE tbl add PRIMARY KEY(c1, c2) INCLUDING (c3, c4);
+DROP TABLE tbl;
+
+CREATE TABLE tbl (c1 int, c2 int, c3 int, c4 box);
+INSERT INTO tbl select 1, 2, 3*x, box('4,4,4,4') from generate_series(1,10) as x;
+CREATE UNIQUE INDEX tbl_idx_unique ON tbl using btree(c1, c2) INCLUDING (c3,c4);
+ALTER TABLE tbl add PRIMARY KEY USING INDEX tbl_idx_unique;
+DROP TABLE tbl;
+-- PK constraint. Must fail.
+CREATE TABLE tbl (c1 int, c2 int, c3 int, c4 box);
+INSERT INTO tbl select 1, 2, 3*x, box('4,4,4,4') from generate_series(1,10) as x;
+ALTER TABLE tbl add PRIMARY KEY(c1, c2) INCLUDING (c3, c4);
+DROP TABLE tbl;
+
+
+/*
+ * 2. Test CREATE TABLE with constraint
+ */
+CREATE TABLE tbl (c1 int,c2 int, c3 int, c4 box,
+ CONSTRAINT covering UNIQUE(c1,c2) INCLUDING(c3,c4));
+select indexrelid::regclass, indnatts, indnkeyatts, indisunique, indisprimary, indkey, indclass from pg_index where indrelid = 'tbl'::regclass::oid;
+select pg_get_constraintdef(oid), conname, conkey, conincluding from pg_constraint where conrelid = 'tbl'::regclass::oid;
+-- ensure that constraint works
+INSERT INTO tbl select 1, 2, 3*x, box('4,4,4,4') from generate_series(1,10) as x;
+DROP TABLE tbl;
+
+CREATE TABLE tbl (c1 int,c2 int, c3 int, c4 box,
+ CONSTRAINT covering PRIMARY KEY(c1,c2) INCLUDING(c3,c4));
+select indexrelid::regclass, indnatts, indnkeyatts, indisunique, indisprimary, indkey, indclass from pg_index where indrelid = 'tbl'::regclass::oid;
+select pg_get_constraintdef(oid), conname, conkey, conincluding from pg_constraint where conrelid = 'tbl'::regclass::oid;
+-- ensure that constraint works
+INSERT INTO tbl select 1, 2, 3*x, box('4,4,4,4') from generate_series(1,10) as x;
+INSERT INTO tbl select 1, NULL, 3*x, box('4,4,4,4') from generate_series(1,10) as x;
+INSERT INTO tbl select x, 2*x, NULL, NULL from generate_series(1,10) as x;
+DROP TABLE tbl;
+
+CREATE TABLE tbl (c1 int,c2 int, c3 int, c4 box,
+ UNIQUE(c1,c2) INCLUDING(c3,c4));
+select indexrelid::regclass, indnatts, indnkeyatts, indisunique, indisprimary, indkey, indclass from pg_index where indrelid = 'tbl'::regclass::oid;
+select pg_get_constraintdef(oid), conname, conkey, conincluding from pg_constraint where conrelid = 'tbl'::regclass::oid;
+-- ensure that constraint works
+INSERT INTO tbl select 1, 2, 3*x, box('4,4,4,4') from generate_series(1,10) as x;
+DROP TABLE tbl;
+
+CREATE TABLE tbl (c1 int,c2 int, c3 int, c4 box,
+ PRIMARY KEY(c1,c2) INCLUDING(c3,c4));
+select indexrelid::regclass, indnatts, indnkeyatts, indisunique, indisprimary, indkey, indclass from pg_index where indrelid = 'tbl'::regclass::oid;
+select pg_get_constraintdef(oid), conname, conkey, conincluding from pg_constraint where conrelid = 'tbl'::regclass::oid;
+-- ensure that constraint works
+INSERT INTO tbl select 1, 2, 3*x, box('4,4,4,4') from generate_series(1,10) as x;
+INSERT INTO tbl select 1, NULL, 3*x, box('4,4,4,4') from generate_series(1,10) as x;
+INSERT INTO tbl select x, 2*x, NULL, NULL from generate_series(1,10) as x;
+DROP TABLE tbl;
+
+
+/*
+ * 3.0 Test ALTER TABLE DROP COLUMN.
+ * Any column deletion leads to index deletion.
+ */
+CREATE TABLE tbl (c1 int,c2 int, c3 int, c4 int);
+CREATE UNIQUE INDEX tbl_idx ON tbl using btree(c1, c2, c3, c4);
+select indexdef from pg_indexes where tablename = 'tbl' order by indexname;
+ALTER TABLE tbl DROP COLUMN c3;
+select indexdef from pg_indexes where tablename = 'tbl' order by indexname;
+DROP TABLE tbl;
+
+/*
+ * 3.1 Test ALTER TABLE DROP COLUMN.
+ * Included column deletion leads to the index deletion,
+ * as well as key columns deletion. It's explained in documentation.
+ */
+CREATE TABLE tbl (c1 int,c2 int, c3 int, c4 box);
+CREATE UNIQUE INDEX tbl_idx ON tbl using btree(c1, c2) INCLUDING(c3,c4);
+select indexdef from pg_indexes where tablename = 'tbl' order by indexname;
+ALTER TABLE tbl DROP COLUMN c3;
+select indexdef from pg_indexes where tablename = 'tbl' order by indexname;
+DROP TABLE tbl;
+
+/*
+ * 3.2 Test ALTER TABLE DROP COLUMN.
+ * Included column deletion leads to the index deletion.
+ * as well as key columns deletion. It's explained in documentation.
+ */
+CREATE TABLE tbl (c1 int,c2 int, c3 int, c4 box, UNIQUE(c1, c2) INCLUDING(c3,c4));
+select indexdef from pg_indexes where tablename = 'tbl' order by indexname;
+ALTER TABLE tbl DROP COLUMN c3;
+select indexdef from pg_indexes where tablename = 'tbl' order by indexname;
+ALTER TABLE tbl DROP COLUMN c1;
+select indexdef from pg_indexes where tablename = 'tbl' order by indexname;
+DROP TABLE tbl;
+
+
+/*
+ * 4. CREATE INDEX CONCURRENTLY
+ */
+CREATE TABLE tbl (c1 int,c2 int, c3 int, c4 box, UNIQUE(c1, c2) INCLUDING(c3,c4));
+INSERT INTO tbl select x, 2*x, 3*x, box('4,4,4,4') from generate_series(1,1000) as x;
+CREATE UNIQUE INDEX CONCURRENTLY on tbl (c1, c2) INCLUDING (c3, c4);
+select indexdef from pg_indexes where tablename = 'tbl' order by indexname;
+DROP TABLE tbl;
+
+
+/*
+ * 5. REINDEX
+ */
+CREATE TABLE tbl (c1 int,c2 int, c3 int, c4 box, UNIQUE(c1, c2) INCLUDING(c3,c4));
+select indexdef from pg_indexes where tablename = 'tbl' order by indexname;
+ALTER TABLE tbl DROP COLUMN c3;
+select indexdef from pg_indexes where tablename = 'tbl' order by indexname;
+REINDEX INDEX tbl_c1_c2_c3_c4_key;
+select indexdef from pg_indexes where tablename = 'tbl' order by indexname;
+ALTER TABLE tbl DROP COLUMN c1;
+select indexdef from pg_indexes where tablename = 'tbl' order by indexname;
+DROP TABLE tbl;
+
+/*
+ * 7. Check various AMs. All but brtee must fail.
+ */
+CREATE TABLE tbl (c1 int,c2 int, c3 box, c4 box);
+CREATE INDEX on tbl USING brin(c1, c2) INCLUDING (c3, c4);
+CREATE INDEX on tbl USING gist(c3) INCLUDING (c4);
+CREATE INDEX on tbl USING spgist(c3) INCLUDING (c4);
+CREATE INDEX on tbl USING gin(c1, c2) INCLUDING (c3, c4);
+CREATE INDEX on tbl USING hash(c1, c2) INCLUDING (c3, c4);
+CREATE INDEX on tbl USING rtree(c1, c2) INCLUDING (c3, c4);
+CREATE INDEX on tbl USING btree(c1, c2) INCLUDING (c3, c4);
+DROP TABLE tbl;
+
+/*
+ * 8. Update, delete values in indexed table.
+ */
+CREATE TABLE tbl (c1 int, c2 int, c3 int, c4 box);
+INSERT INTO tbl select x, 2*x, 3*x, box('4,4,4,4') from generate_series(1,10) as x;
+CREATE UNIQUE INDEX tbl_idx_unique ON tbl using btree(c1, c2) INCLUDING (c3,c4);
+UPDATE tbl SET c1 = 100 WHERE c1 = 2;
+UPDATE tbl SET c1 = 1 WHERE c1 = 3;
+-- should fail
+UPDATE tbl SET c2 = 2 WHERE c1 = 1;
+UPDATE tbl SET c3 = 1;
+DELETE FROM tbl WHERE c1 = 5 OR c3 = 12;
+DROP TABLE tbl;
+
+/*
+ * 9. Alter column type.
+ */
+CREATE TABLE tbl (c1 int,c2 int, c3 int, c4 box, UNIQUE(c1, c2) INCLUDING(c3,c4));
+INSERT INTO tbl select x, 2*x, 3*x, box('4,4,4,4') from generate_series(1,10) as x;
+ALTER TABLE tbl ALTER c1 TYPE bigint;
+ALTER TABLE tbl ALTER c3 TYPE bigint;
+\d tbl
+DROP TABLE tbl;
+
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers