From: Andres Freund <and...@anarazel.de>

This adds a new wal_level value 'logical'

Missing cases:
- heap_multi_insert
- primary key changes for updates
- no primary key
- LOG_NEWPAGE
---
 src/backend/access/heap/heapam.c        |  135 ++++++++++++++++++++++++++++---
 src/backend/access/transam/xlog.c       |    1 +
 src/backend/catalog/index.c             |   74 +++++++++++++++++
 src/bin/pg_controldata/pg_controldata.c |    2 +
 src/include/access/xlog.h               |    3 +-
 src/include/catalog/index.h             |    4 +
 6 files changed, 207 insertions(+), 12 deletions(-)

diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c
index 9519e73..9149d53 100644
--- a/src/backend/access/heap/heapam.c
+++ b/src/backend/access/heap/heapam.c
@@ -52,6 +52,7 @@
 #include "access/xact.h"
 #include "access/xlogutils.h"
 #include "catalog/catalog.h"
+#include "catalog/index.h"
 #include "catalog/namespace.h"
 #include "miscadmin.h"
 #include "pgstat.h"
@@ -1937,10 +1938,19 @@ heap_insert(Relation relation, HeapTuple tup, CommandId 
cid,
                xl_heap_insert xlrec;
                xl_heap_header xlhdr;
                XLogRecPtr      recptr;
-               XLogRecData rdata[3];
+               XLogRecData rdata[4];
                Page            page = BufferGetPage(buffer);
                uint8           info = XLOG_HEAP_INSERT;
 
+               /*
+                * For the logical replication case we need the tuple even if 
were
+                * doing a full page write. We could alternatively store a 
pointer into
+                * the fpw though.
+                * For that to work we add another rdata entry for the buffer 
in that
+                * case.
+                */
+               bool        need_tuple = wal_level == WAL_LEVEL_LOGICAL;
+
                xlrec.all_visible_cleared = all_visible_cleared;
                xlrec.target.node = relation->rd_node;
                xlrec.target.tid = heaptup->t_self;
@@ -1960,18 +1970,32 @@ heap_insert(Relation relation, HeapTuple tup, CommandId 
cid,
                 */
                rdata[1].data = (char *) &xlhdr;
                rdata[1].len = SizeOfHeapHeader;
-               rdata[1].buffer = buffer;
+               rdata[1].buffer = need_tuple ? InvalidBuffer : buffer;
                rdata[1].buffer_std = true;
                rdata[1].next = &(rdata[2]);
 
                /* PG73FORMAT: write bitmap [+ padding] [+ oid] + data */
                rdata[2].data = (char *) heaptup->t_data + 
offsetof(HeapTupleHeaderData, t_bits);
                rdata[2].len = heaptup->t_len - offsetof(HeapTupleHeaderData, 
t_bits);
-               rdata[2].buffer = buffer;
+               rdata[2].buffer = need_tuple ? InvalidBuffer : buffer;
                rdata[2].buffer_std = true;
                rdata[2].next = NULL;
 
                /*
+                * add record for the buffer without actual content thats 
removed if
+                * fpw is done for that buffer
+                */
+               if(need_tuple){
+                       rdata[2].next = &(rdata[3]);
+
+                       rdata[3].data = NULL;
+                       rdata[3].len = 0;
+                       rdata[3].buffer = buffer;
+                       rdata[3].buffer_std = true;
+                       rdata[3].next = NULL;
+               }
+
+               /*
                 * If this is the single and first tuple on page, we can reinit 
the
                 * page instead of restoring the whole thing.  Set flag, and 
hide
                 * buffer references from XLogInsert.
@@ -1980,7 +2004,7 @@ heap_insert(Relation relation, HeapTuple tup, CommandId 
cid,
                        PageGetMaxOffsetNumber(page) == FirstOffsetNumber)
                {
                        info |= XLOG_HEAP_INIT_PAGE;
-                       rdata[1].buffer = rdata[2].buffer = InvalidBuffer;
+                       rdata[1].buffer = rdata[2].buffer = rdata[3].buffer = 
InvalidBuffer;
                }
 
                recptr = XLogInsert(RM_HEAP_ID, info, rdata);
@@ -2568,7 +2592,9 @@ l1:
        {
                xl_heap_delete xlrec;
                XLogRecPtr      recptr;
-               XLogRecData rdata[2];
+               XLogRecData rdata[4];
+
+               bool need_tuple = wal_level == WAL_LEVEL_LOGICAL && 
relation->rd_id  >= FirstNormalObjectId;
 
                xlrec.all_visible_cleared = all_visible_cleared;
                xlrec.target.node = relation->rd_node;
@@ -2584,6 +2610,73 @@ l1:
                rdata[1].buffer_std = true;
                rdata[1].next = NULL;
 
+               /*
+                * XXX: We could decide not to log changes when the origin is 
not the
+                * local node, that should reduce redundant logging.
+                */
+               if(need_tuple){
+                       xl_heap_header xlhdr;
+
+                       Oid indexoid = InvalidOid;
+                       int16 pknratts;
+                       int16 pkattnum[INDEX_MAX_KEYS];
+                       Oid pktypoid[INDEX_MAX_KEYS];
+                       Oid pkopclass[INDEX_MAX_KEYS];
+                       TupleDesc desc = RelationGetDescr(relation);
+                       Relation index_rel;
+                       TupleDesc indexdesc;
+                       int natt;
+
+                       Datum idxvals[INDEX_MAX_KEYS];
+                       bool idxisnull[INDEX_MAX_KEYS];
+                       HeapTuple idxtuple;
+
+                       MemSet(pkattnum, 0, sizeof(pkattnum));
+                       MemSet(pktypoid, 0, sizeof(pktypoid));
+                       MemSet(pkopclass, 0, sizeof(pkopclass));
+                       MemSet(idxvals, 0, sizeof(idxvals));
+                       MemSet(idxisnull, 0, sizeof(idxisnull));
+                       relationFindPrimaryKey(relation, &indexoid, &pknratts, 
pkattnum, pktypoid, pkopclass);
+
+                       if(!indexoid){
+                               elog(WARNING, "Could not find primary key for 
table with oid %u",
+                                    relation->rd_id);
+                               goto no_index_found;
+                       }
+
+                       index_rel = index_open(indexoid, AccessShareLock);
+
+                       indexdesc = RelationGetDescr(index_rel);
+
+                       for(natt = 0; natt < indexdesc->natts; natt++){
+                               idxvals[natt] =
+                                       fastgetattr(&tp, pkattnum[natt], desc, 
&idxisnull[natt]);
+                               Assert(!idxisnull[natt]);
+                       }
+
+                       idxtuple = heap_form_tuple(indexdesc, idxvals, 
idxisnull);
+
+                       xlhdr.t_infomask2 = idxtuple->t_data->t_infomask2;
+                       xlhdr.t_infomask = idxtuple->t_data->t_infomask;
+                       xlhdr.t_hoff = idxtuple->t_data->t_hoff;
+
+                       rdata[1].next = &(rdata[2]);
+                       rdata[2].data = (char*)&xlhdr;
+                       rdata[2].len = SizeOfHeapHeader;
+                       rdata[2].buffer = InvalidBuffer;
+                       rdata[2].next = NULL;
+
+                       rdata[2].next = &(rdata[3]);
+                       rdata[3].data = (char *) idxtuple->t_data + 
offsetof(HeapTupleHeaderData, t_bits);
+                       rdata[3].len = idxtuple->t_len - 
offsetof(HeapTupleHeaderData, t_bits);
+                       rdata[3].buffer = InvalidBuffer;
+                       rdata[3].next = NULL;
+
+                       heap_close(index_rel, NoLock);
+               no_index_found:
+                       ;
+               }
+
                recptr = XLogInsert(RM_HEAP_ID, XLOG_HEAP_DELETE, rdata);
 
                PageSetLSN(page, recptr);
@@ -4413,9 +4506,14 @@ log_heap_update(Relation reln, Buffer oldbuf, 
ItemPointerData from,
        xl_heap_header xlhdr;
        uint8           info;
        XLogRecPtr      recptr;
-       XLogRecData rdata[4];
+       XLogRecData rdata[5];
        Page            page = BufferGetPage(newbuf);
 
+       /*
+        * Just as for XLOG_HEAP_INSERT we need to make sure the tuple
+        */
+       bool        need_tuple = wal_level == WAL_LEVEL_LOGICAL;
+
        /* Caller should not call me on a non-WAL-logged relation */
        Assert(RelationNeedsWAL(reln));
 
@@ -4446,28 +4544,43 @@ log_heap_update(Relation reln, Buffer oldbuf, 
ItemPointerData from,
        xlhdr.t_hoff = newtup->t_data->t_hoff;
 
        /*
-        * As with insert records, we need not store the rdata[2] segment if we
-        * decide to store the whole buffer instead.
+        * As with insert's logging , we need not store the the Datum containing
+        * tuples separately from the buffer if we do logical replication that
+        * is...
         */
        rdata[2].data = (char *) &xlhdr;
        rdata[2].len = SizeOfHeapHeader;
-       rdata[2].buffer = newbuf;
+       rdata[2].buffer = need_tuple ? InvalidBuffer : newbuf;
        rdata[2].buffer_std = true;
        rdata[2].next = &(rdata[3]);
 
        /* PG73FORMAT: write bitmap [+ padding] [+ oid] + data */
        rdata[3].data = (char *) newtup->t_data + offsetof(HeapTupleHeaderData, 
t_bits);
        rdata[3].len = newtup->t_len - offsetof(HeapTupleHeaderData, t_bits);
-       rdata[3].buffer = newbuf;
+       rdata[3].buffer = need_tuple ? InvalidBuffer : newbuf;
        rdata[3].buffer_std = true;
        rdata[3].next = NULL;
 
+       /*
+        * separate storage for the buffer reference of the new page in the
+        * wal_level=logical case
+       */
+       if(need_tuple){
+               rdata[3].next = &(rdata[4]);
+
+               rdata[4].data = NULL,
+               rdata[4].len = 0;
+               rdata[4].buffer = newbuf;
+               rdata[4].buffer_std = true;
+               rdata[4].next = NULL;
+       }
+
        /* If new tuple is the single and first tuple on page... */
        if (ItemPointerGetOffsetNumber(&(newtup->t_self)) == FirstOffsetNumber 
&&
                PageGetMaxOffsetNumber(page) == FirstOffsetNumber)
        {
                info |= XLOG_HEAP_INIT_PAGE;
-               rdata[2].buffer = rdata[3].buffer = InvalidBuffer;
+               rdata[2].buffer = rdata[3].buffer = rdata[4].buffer = 
InvalidBuffer;
        }
 
        recptr = XLogInsert(RM_HEAP_ID, info, rdata);
diff --git a/src/backend/access/transam/xlog.c 
b/src/backend/access/transam/xlog.c
index 166efb0..c6feed0 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -105,6 +105,7 @@ const struct config_enum_entry wal_level_options[] = {
        {"minimal", WAL_LEVEL_MINIMAL, false},
        {"archive", WAL_LEVEL_ARCHIVE, false},
        {"hot_standby", WAL_LEVEL_HOT_STANDBY, false},
+       {"logical", WAL_LEVEL_LOGICAL, false},
        {NULL, 0, false}
 };
 
diff --git a/src/backend/catalog/index.c b/src/backend/catalog/index.c
index 9e8b1cc..4cddcac 100644
--- a/src/backend/catalog/index.c
+++ b/src/backend/catalog/index.c
@@ -49,6 +49,7 @@
 #include "nodes/nodeFuncs.h"
 #include "optimizer/clauses.h"
 #include "parser/parser.h"
+#include "parser/parse_relation.h"
 #include "storage/bufmgr.h"
 #include "storage/lmgr.h"
 #include "storage/predicate.h"
@@ -3311,3 +3312,76 @@ ResetReindexPending(void)
 {
        pendingReindexedIndexes = NIL;
 }
+
+/*
+ * relationFindPrimaryKey
+ *             Find primary key for a relation if it exists.
+ *
+ * If no primary key is found *indexOid is set to InvalidOid
+ *
+ * This is quite similar to tablecmd.c's transformFkeyGetPrimaryKey.
+ *
+ * XXX: It might be a good idea to change pg_class.relhaspkey into an bool to
+ * make this more efficient.
+ */
+void
+relationFindPrimaryKey(Relation pkrel, Oid *indexOid,
+                       int16 *nratts, int16 *attnums, Oid *atttypids,
+                       Oid *opclasses){
+       List *indexoidlist;
+       ListCell *indexoidscan;
+       HeapTuple indexTuple = NULL;
+       Datum indclassDatum;
+       bool isnull;
+       oidvector  *indclass;
+       int i;
+       Form_pg_index indexStruct = NULL;
+
+       *indexOid = InvalidOid;
+
+       indexoidlist = RelationGetIndexList(pkrel);
+
+       foreach(indexoidscan, indexoidlist)
+       {
+               Oid indexoid = lfirst_oid(indexoidscan);
+
+               indexTuple = SearchSysCache1(INDEXRELID, 
ObjectIdGetDatum(indexoid));
+               if(!HeapTupleIsValid(indexTuple))
+                       elog(ERROR, "cache lookup failed for index %u", 
indexoid);
+
+               indexStruct = (Form_pg_index) GETSTRUCT(indexTuple);
+               if(indexStruct->indisprimary && indexStruct->indimmediate)
+               {
+                       *indexOid = indexoid;
+                       break;
+               }
+               ReleaseSysCache(indexTuple);
+
+       }
+       list_free(indexoidlist);
+
+       if (!OidIsValid(*indexOid))
+               return;
+
+       /* Must get indclass the hard way */
+       indclassDatum = SysCacheGetAttr(INDEXRELID, indexTuple,
+                                                                       
Anum_pg_index_indclass, &isnull);
+       Assert(!isnull);
+       indclass = (oidvector *) DatumGetPointer(indclassDatum);
+
+       *nratts = indexStruct->indnatts;
+       /*
+        * Now build the list of PK attributes from the indkey definition (we
+        * assume a primary key cannot have expressional elements)
+        */
+       for (i = 0; i < indexStruct->indnatts; i++)
+       {
+               int                     pkattno = indexStruct->indkey.values[i];
+
+               attnums[i] = pkattno;
+               atttypids[i] = attnumTypeId(pkrel, pkattno);
+               opclasses[i] = indclass->values[i];
+       }
+
+       ReleaseSysCache(indexTuple);
+}
diff --git a/src/bin/pg_controldata/pg_controldata.c 
b/src/bin/pg_controldata/pg_controldata.c
index c00183a..47715c9 100644
--- a/src/bin/pg_controldata/pg_controldata.c
+++ b/src/bin/pg_controldata/pg_controldata.c
@@ -82,6 +82,8 @@ wal_level_str(WalLevel wal_level)
                        return "archive";
                case WAL_LEVEL_HOT_STANDBY:
                        return "hot_standby";
+               case WAL_LEVEL_LOGICAL:
+                       return "logical";
        }
        return _("unrecognized wal_level");
 }
diff --git a/src/include/access/xlog.h b/src/include/access/xlog.h
index df5f232..2843aca 100644
--- a/src/include/access/xlog.h
+++ b/src/include/access/xlog.h
@@ -199,7 +199,8 @@ typedef enum WalLevel
 {
        WAL_LEVEL_MINIMAL = 0,
        WAL_LEVEL_ARCHIVE,
-       WAL_LEVEL_HOT_STANDBY
+       WAL_LEVEL_HOT_STANDBY,
+       WAL_LEVEL_LOGICAL
 } WalLevel;
 extern int     wal_level;
 
diff --git a/src/include/catalog/index.h b/src/include/catalog/index.h
index 7c8198f..2ba0ac3 100644
--- a/src/include/catalog/index.h
+++ b/src/include/catalog/index.h
@@ -101,4 +101,8 @@ extern bool ReindexIsProcessingHeap(Oid heapOid);
 extern bool ReindexIsProcessingIndex(Oid indexOid);
 extern Oid     IndexGetRelation(Oid indexId, bool missing_ok);
 
+extern void relationFindPrimaryKey(Relation pkrel, Oid *indexOid,
+                                   int16 *nratts, int16 *attnums, Oid 
*atttypids,
+                                   Oid *opclasses);
+
 #endif   /* INDEX_H */
-- 
1.7.10.rc3.3.g19a6c.dirty


-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to