On 14/09/16 18:21, Andres Freund wrote:
(continuing, uh, a bit happier)

On 2016-09-09 00:59:26 +0200, Petr Jelinek wrote:

+/*
+ * Relcache invalidation callback for our relation map cache.
+ */
+static void
+logicalreprelmap_invalidate_cb(Datum arg, Oid reloid)
+{
+       LogicalRepRelMapEntry  *entry;
+
+       /* Just to be sure. */
+       if (LogicalRepRelMap == NULL)
+               return;
+
+       if (reloid != InvalidOid)
+       {
+               HASH_SEQ_STATUS status;
+
+               hash_seq_init(&status, LogicalRepRelMap);
+
+               /* TODO, use inverse lookup hastable? */

*hashtable

+               while ((entry = (LogicalRepRelMapEntry *) 
hash_seq_search(&status)) != NULL)
+               {
+                       if (entry->reloid == reloid)
+                               entry->reloid = InvalidOid;

can't we break here?


Probably.


+/*
+ * Initialize the relation map cache.
+ */
+static void
+remoterelmap_init(void)
+{
+       HASHCTL         ctl;
+
+       /* Make sure we've initialized CacheMemoryContext. */
+       if (CacheMemoryContext == NULL)
+               CreateCacheMemoryContext();
+
+       /* Initialize the hash table. */
+       MemSet(&ctl, 0, sizeof(ctl));
+       ctl.keysize = sizeof(uint32);
+       ctl.entrysize = sizeof(LogicalRepRelMapEntry);
+       ctl.hcxt = CacheMemoryContext;

Wonder if this (and similar code earlier) should try to do everything in
a sub-context of CacheMemoryContext instead. That'd make some issues
easier to track down.

Sure. don't see why not.


+/*
+ * Open the local relation associated with the remote one.
+ */
+static LogicalRepRelMapEntry *
+logicalreprel_open(uint32 remoteid, LOCKMODE lockmode)
+{
+       LogicalRepRelMapEntry  *entry;
+       bool            found;
+
+       if (LogicalRepRelMap == NULL)
+               remoterelmap_init();
+
+       /* Search for existing entry. */
+       entry = hash_search(LogicalRepRelMap, (void *) &remoteid,
+                                               HASH_FIND, &found);
+
+       if (!found)
+               elog(FATAL, "cache lookup failed for remote relation %u",
+                        remoteid);
+
+       /* Need to update the local cache? */
+       if (!OidIsValid(entry->reloid))
+       {
+               Oid                     nspid;
+               Oid                     relid;
+               int                     i;
+               TupleDesc       desc;
+               LogicalRepRelation *remoterel;
+
+               remoterel = &entry->remoterel;
+
+               nspid = LookupExplicitNamespace(remoterel->nspname, false);
+               if (!OidIsValid(nspid))
+                       ereport(FATAL,
+                                       
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+                                        errmsg("the logical replication target %s 
not found",
+                                                       
quote_qualified_identifier(remoterel->nspname,
                                                                                   
                        remoterel->relname))));
+               relid = get_relname_relid(remoterel->relname, nspid);
+               if (!OidIsValid(relid))
+                       ereport(FATAL,
+                                       
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+                                        errmsg("the logical replication target %s 
not found",
+                                                       
quote_qualified_identifier(remoterel->nspname,
+                                                                                  
                        remoterel->relname))));
+
+               entry->rel = heap_open(relid, lockmode);

This seems rather racy. I think this really instead needs something akin
to RangeVarGetRelidExtended().

Maybe, I am not sure if it really matters here given how it's used, but I can change that.


+/*
+ * Executor state preparation for evaluation of constraint expressions,
+ * indexes and triggers.
+ *
+ * This is based on similar code in copy.c
+ */
+static EState *
+create_estate_for_relation(LogicalRepRelMapEntry *rel)
+{
+       EState     *estate;
+       ResultRelInfo *resultRelInfo;
+       RangeTblEntry *rte;
+
+       estate = CreateExecutorState();
+
+       rte = makeNode(RangeTblEntry);
+       rte->rtekind = RTE_RELATION;
+       rte->relid = RelationGetRelid(rel->rel);
+       rte->relkind = rel->rel->rd_rel->relkind;
+       estate->es_range_table = list_make1(rte);
+
+       resultRelInfo = makeNode(ResultRelInfo);
+       InitResultRelInfo(resultRelInfo, rel->rel, 1, 0);
+
+       estate->es_result_relations = resultRelInfo;
+       estate->es_num_result_relations = 1;
+       estate->es_result_relation_info = resultRelInfo;
+
+       /* Triggers might need a slot */
+       if (resultRelInfo->ri_TrigDesc)
+               estate->es_trig_tuple_slot = ExecInitExtraTupleSlot(estate);
+
+       return estate;
+}

Ugh, we do this for every single change? That's pretty darn heavy.


I plan to add caching but didn't come up with good way of doing that yet.


+/*
+ * Check if the local attribute is present in relation definition used
+ * by upstream and hence updated by the replication.
+ */
+static bool
+physatt_in_attmap(LogicalRepRelMapEntry *rel, int attid)
+{
+       AttrNumber      i;
+
+       /* Fast path for tables that are same on upstream and downstream. */
+       if (attid < rel->remoterel.natts && rel->attmap[attid] == attid)
+               return true;
+
+       /* Try to find the attribute in the map. */
+       for (i = 0; i < rel->remoterel.natts; i++)
+               if (rel->attmap[i] == attid)
+                       return true;
+
+       return false;
+}

Shouldn't we rather try to keep an attribute map that always can map
remote attribute numbers to local ones? That doesn't seem hard on a
first blush? But I might be missing something here.






+static void
+FillSlotDefaults(LogicalRepRelMapEntry *rel, EState *estate,
+                                TupleTableSlot *slot)
+{

Why is this using a different naming scheme?


Because I originally wanted to put it into executor.

+/*
+ * Handle INSERT message.
+ */
+static void
+handle_insert(StringInfo s)
+{
+       LogicalRepRelMapEntry *rel;
+       LogicalRepTupleData     newtup;
+       LogicalRepRelId         relid;
+       EState                     *estate;
+       TupleTableSlot     *remoteslot;
+       MemoryContext           oldctx;
+
+       ensure_transaction();
+
+       relid = logicalrep_read_insert(s, &newtup);
+       rel = logicalreprel_open(relid, RowExclusiveLock);
+
+       /* Initialize the executor state. */
+       estate = create_estate_for_relation(rel);
+       remoteslot = ExecInitExtraTupleSlot(estate);
+       ExecSetSlotDescriptor(remoteslot, RelationGetDescr(rel->rel));

This seems incredibly expensive for replicating a lot of rows.

You mean because of create_estate_for_relation()?


+/*
+ * Search the relation 'rel' for tuple using the replication index.
+ *
+ * If a matching tuple is found lock it with lockmode, fill the slot with its
+ * contents and return true, return false is returned otherwise.
+ */
+static bool
+tuple_find_by_replidx(Relation rel, LockTupleMode lockmode,
+                                         TupleTableSlot *searchslot, 
TupleTableSlot *slot)
+{
+       HeapTuple               scantuple;
+       ScanKeyData             skey[INDEX_MAX_KEYS];
+       IndexScanDesc   scan;
+       SnapshotData    snap;
+       TransactionId   xwait;
+       Oid                             idxoid;
+       Relation                idxrel;
+       bool                    found;
+
+       /* Open REPLICA IDENTITY index.*/
+       idxoid = RelationGetReplicaIndex(rel);
+       if (!OidIsValid(idxoid))
+       {
+               elog(ERROR, "could not find configured replica identity for table 
\"%s\"",
+                        RelationGetRelationName(rel));
+               return false;
+       }
+       idxrel = index_open(idxoid, RowExclusiveLock);
+
+       /* Start an index scan. */
+       InitDirtySnapshot(snap);
+       scan = index_beginscan(rel, idxrel, &snap,
+                                                  
RelationGetNumberOfAttributes(idxrel),
+                                                  0);
+
+       /* Build scan key. */
+       build_replindex_scan_key(skey, rel, idxrel, searchslot);
+
+retry:
+       found = false;
+
+       index_rescan(scan, skey, RelationGetNumberOfAttributes(idxrel), NULL, 
0);
+
+       /* Try to find the tuple */
+       if ((scantuple = index_getnext(scan, ForwardScanDirection)) != NULL)
+       {
+               found = true;
+               ExecStoreTuple(scantuple, slot, InvalidBuffer, false);
+               ExecMaterializeSlot(slot);
+
+               xwait = TransactionIdIsValid(snap.xmin) ?
+                       snap.xmin : snap.xmax;
+
+               /*
+                * If the tuple is locked, wait for locking transaction to 
finish
+                * and retry.
+                */
+               if (TransactionIdIsValid(xwait))
+               {
+                       XactLockTableWait(xwait, NULL, NULL, XLTW_None);
+                       goto retry;
+               }
+       }

Hm. So we potentially find multiple tuples here, and lock all of
them. but then only use one for the update.


That's not how that code reads for me.


+static List *
+get_subscription_list(void)
+{
+       List       *res = NIL;
+       Relation        rel;
+       HeapScanDesc scan;
+       HeapTuple       tup;
+       MemoryContext resultcxt;
+
+       /* This is the context that we will allocate our output data in */
+       resultcxt = CurrentMemoryContext;
+
+       /*
+        * Start a transaction so we can access pg_database, and get a snapshot.
+        * We don't have a use for the snapshot itself, but we're interested in
+        * the secondary effect that it sets RecentGlobalXmin.  (This is 
critical
+        * for anything that reads heap pages, because HOT may decide to prune
+        * them even if the process doesn't attempt to modify any tuples.)
+        */

+       StartTransactionCommand();
+       (void) GetTransactionSnapshot();
+
+       rel = heap_open(SubscriptionRelationId, AccessShareLock);
+       scan = heap_beginscan_catalog(rel, 0, NULL);
+
+       while (HeapTupleIsValid(tup = heap_getnext(scan, ForwardScanDirection)))
+       {
+               Form_pg_subscription subform = (Form_pg_subscription) 
GETSTRUCT(tup);
+               Subscription   *sub;
+               MemoryContext   oldcxt;
+
+               /*
+                * Allocate our results in the caller's context, not the
+                * transaction's. We do this inside the loop, and restore the 
original
+                * context at the end, so that leaky things like heap_getnext() 
are
+                * not called in a potentially long-lived context.
+                */
+               oldcxt = MemoryContextSwitchTo(resultcxt);
+
+               sub = (Subscription *) palloc(sizeof(Subscription));
+               sub->oid = HeapTupleGetOid(tup);
+               sub->dbid = subform->subdbid;
+               sub->enabled = subform->subenabled;
+
+               /* We don't fill fields we are not intereste in. */
+               sub->name = NULL;
+               sub->conninfo = NULL;
+               sub->slotname = NULL;
+               sub->publications = NIL;
+
+               res = lappend(res, sub);
+               MemoryContextSwitchTo(oldcxt);
+       }
+
+       heap_endscan(scan);
+       heap_close(rel, AccessShareLock);
+
+       CommitTransactionCommand();

Hm. this doesn't seem quite right from a locking pov. What if, in the
middle of this, a new subscription is created?


So it will be called again eventually in the next iteration of main loop. We don't perfectly stable world view here, just snapshot of it to work with.


Hadn't I previously read about always streaming data to disk first?

@@ -0,0 +1,674 @@
+/*-------------------------------------------------------------------------
+ * tablesync.c
+ *        PostgreSQL logical replication
+ *
+ * Copyright (c) 2012-2016, PostgreSQL Global Development Group
+ *
+ * IDENTIFICATION
+ *       src/backend/replication/logical/tablesync.c
+ *
+ * NOTES
+ *       This file contains code for initial table data synchronization for
+ *       logical replication.
+ *
+ *    The initial data synchronization is done separately for each table,
+ *    in separate apply worker that only fetches the initial snapshot data
+ *    from the provider and then synchronizes the position in stream with
+ *    the main apply worker.

Why? I guess that's because it allows to incrementally add tables, with
acceptable overhead.


Yes I need to document why's more here. It enables us to copy multiple tables in parallel (in the future). It also is needed for adding tables after the initial sync as you say.


+ *    The stream position synchronization works in multiple steps.
+ *     - sync finishes copy and sets table state as SYNCWAIT and waits
+ *       for state to change in a loop
+ *     - apply periodically checks unsynced tables for SYNCWAIT, when it
+ *       appears it will compare its position in the stream with the
+ *       SYNCWAIT position and decides to either set it to CATCHUP when
+ *       the apply was infront (and wait for the sync to do the catchup),
+ *       or set the state to SYNCDONE if the sync was infront or in case
+ *       both sync and apply are at the same position it will set it to
+ *       READY and stops tracking it

I'm not quite following here.


It's hard for me to explain I guess, that's why the flow diagram is underneath. The point is to reach same LSN for the table before the main apply process can take over the replication of that table. There are 2 possible scenarios a) either apply has replayed more of the stream than sync did and then the sync needs to ask apply to wait for it a bit (which blocks replication for short while) b) or the sync has replayed more of the stream than sync and then apply needs to track the table for a while (and don't apply changes to it) until it reaches the same position where sync stopped and once it reaches that point it can just apply changes to it same as to any old table

+ *     - if the state was set to CATCHUP sync will read the stream and
+ *       apply changes until it catches up to the specified stream
+ *       position and then sets state to READY and signals apply that it
+ *       can stop waiting and exits, if the state was set to something
+ *       else than CATCHUP the sync process will simply end
+ *     - if the state was set to SYNCDONE by apply, the apply will
+ *       continue tracking the table until it reaches the SYNCDONE stream
+ *       position at which point it sets state to READY and stops tracking
+ *
+ *    Example flows look like this:
+ *     - Apply is infront:
+ *           sync:8   -> set SYNCWAIT
+ *        apply:10 -> set CATCHUP
+ *        sync:10  -> set ready
+ *          exit
+ *        apply:10
+ *          stop tracking
+ *          continue rep
+ *    - Sync infront:
+ *        sync:10
+ *          set SYNCWAIT
+ *        apply:8
+ *          set SYNCDONE
+ *        sync:10
+ *          exit
+ *        apply:10
+ *          set READY
+ *          stop tracking
+ *          continue rep

This definitely needs to be expanded a bit. Where are we tracking how
far replication has progressed on individual tables? Are we creating new
slots for syncing? Is there any parallelism in syncing?


Yes, new slots, tracking is in pg_subscription_rel, parallelism is not there yet, but the design is ready for expanding it (I currently artificially limit the number of sync workers to one to limit potential bugs, but afaik it could just be bumped to more and it should work).

+/*
+ * Exit routine for synchronization worker.
+ */
+static void
+finish_sync_worker(char *slotname)
+{
+       LogicalRepWorker   *worker;
+       RepOriginId                     originid;
+       MemoryContext           oldctx = CurrentMemoryContext;
+
+       /*
+        * Drop the replication slot on remote server.
+        * We want to continue even in the case that the slot on remote side
+        * is already gone. This means that we can leave slot on the remote
+        * side but that can happen for other reasons as well so we can't
+        * really protect against that.
+        */
+       PG_TRY();
+       {
+               wrcapi->drop_slot(wrchandle, slotname);
+       }
+       PG_CATCH();
+       {
+               MemoryContext   ectx;
+               ErrorData          *edata;
+
+               ectx = MemoryContextSwitchTo(oldctx);
+               /* Save error info */
+               edata = CopyErrorData();
+               MemoryContextSwitchTo(ectx);
+               FlushErrorState();
+
+               ereport(WARNING,
+                               (errmsg("there was problem dropping the replication 
slot "
+                                               "\"%s\" on provider", slotname),
+                                errdetail("The error was: %s", edata->message),
+                                errhint("You may have to drop it manually")));
+               FreeErrorData(edata);

ISTM we really should rather return success/failure here, and not throw
an error inside the libpqwalreceiver stuff.  I kind of wonder if we
actually can get rid of this indirection.


Yeah I can do success/failure. Not sure what you mean by indirection.

+                * to ensure that we are not behind it (it's going to wait at 
this
+                * point for the change of state). Once we are infront or at 
the same
+                * position as the synchronization proccess we can signal it to
+                * finish the catchup.
+                */
+               if (tstate->state == SUBREL_STATE_SYNCWAIT)
+               {
+                       if (end_lsn > tstate->lsn)
+                       {
+                               /*
+                                * Apply is infront, tell sync to catchup. and 
wait until
+                                * it does.
+                                */
+                               tstate->state = SUBREL_STATE_CATCHUP;
+                               tstate->lsn = end_lsn;
+                               StartTransactionCommand();
+                               
SetSubscriptionRelState(MyLogicalRepWorker->subid,
+                                                                               
tstate->relid, tstate->state,
+                                                                               
tstate->lsn);
+                               CommitTransactionCommand();
+
+                               /* Signal the worker as it may be waiting for 
us. */
+                               LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
+                               worker = 
logicalrep_worker_find(MyLogicalRepWorker->subid,
+                                                                                  
             tstate->relid);
+                               if (worker && worker->proc)
+                                       SetLatch(&worker->proc->procLatch);
+                               LWLockRelease(LogicalRepWorkerLock);

Different parts of this file use different lock level to set the
latch. Why?


The latch does not need the lock, not really following what you mean. But the lock here is for the benefit of logicalrep_worker_find.


+                               if (wait_for_sync_status_change(tstate))
+                                       Assert(tstate->state == 
SUBREL_STATE_READY);
+                       }
+                       else
+                       {
+                               /*
+                                * Apply is either behind in which case sync 
worker is done
+                                * but apply needs to keep tracking the table 
until it
+                                * catches up to where sync finished.
+                                * Or apply and sync are at the same position 
in which case
+                                * table can be switched to standard 
replication mode
+                                * immediately.
+                                */
+                               if (end_lsn < tstate->lsn)
+                                       tstate->state = SUBREL_STATE_SYNCDONE;
+                               else
+                                       tstate->state = SUBREL_STATE_READY;
+

What I'm failing to understand is how this can be done under
concurrency. You probably thought about this, but it should really be
explained somewhere.

Well, so, if the original state was syncdone (the previous branch) the apply won't actually do any work until the state changes (and it can only change to either syncdone or ready at that point) so there is no real concurrently. If reach this branch then either sync worker already exited (if it set the state to syncdone) or it's not doing anything and is waiting for apply to set state to ready in which case there is also no concurrency.

+               /*
+                * In case table is supposed to be synchronizing but the
+                * synchronization worker is not running, start it.
+                * Limit the number of launched workers here to one (for now).
+                */

Hm. That seems problematic for online upgrade type cases, we might never
be catch up that way...


You mean the limit to 1? That's just because I didn't get to creating GUC for configuring this.



+                               /*
+                                * We want to do the table data sync in single
+                                * transaction so do not close the transaction 
opened
+                                * above.
+                                * There will be no BEGIN or COMMIT messages 
coming via
+                                * logical replication while the copy table 
command is
+                                * running so start the transaction here.
+                                * Note the memory context for data handling 
will still
+                                * be done using ensure_transaction called by 
the insert
+                                * handler.
+                                */
+                               StartTransactionCommand();
+
+                               /*
+                                * Don't allow parallel access other than 
SELECT while
+                                * the initial contents are being copied.
+                                */
+                               rel = heap_open(tstate.relid, ExclusiveLock);

Why do we want to allow access at all?


I didn't see reason to not allow selects.



@@ -87,6 +92,8 @@ _PG_output_plugin_init(OutputPluginCallbacks *cb)
        cb->commit_cb = pgoutput_commit_txn;
        cb->filter_by_origin_cb = pgoutput_origin_filter;
        cb->shutdown_cb = pgoutput_shutdown;
+       cb->tuple_cb = pgoutput_tuple;
+       cb->list_tables_cb = pgoutput_list_tables;
 }

What are these new, and undocumented callbacks actually doing? And why
is this integrated into logical decoding?


In the initial email I was saying that I am not very happy with this design, that's still true, because they don't belong to decoding.


 /*
+ * Handle LIST_TABLES command.
+ */
+static void
+SendTableList(ListTablesCmd *cmd)
+{

Ugh.


I really dislike this kind of command. I think we should instead change
things around, allowing to issue normal SQL via the replication
command. We'll have to error out for running sql for non-database
connected replication connections, but that seems fine.


Note per discussion offline we agree to do this stuff over normal connection for now.

--
  Petr Jelinek                  http://www.2ndQuadrant.com/
  PostgreSQL Development, 24x7 Support, Training & Services


--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to