(continuing, uh, a bit happier)

On 2016-09-09 00:59:26 +0200, Petr Jelinek wrote:

> +/*
> + * Relcache invalidation callback for our relation map cache.
> + */
> +static void
> +logicalreprelmap_invalidate_cb(Datum arg, Oid reloid)
> +{
> +     LogicalRepRelMapEntry  *entry;
> +
> +     /* Just to be sure. */
> +     if (LogicalRepRelMap == NULL)
> +             return;
> +
> +     if (reloid != InvalidOid)
> +     {
> +             HASH_SEQ_STATUS status;
> +
> +             hash_seq_init(&status, LogicalRepRelMap);
> +
> +             /* TODO, use inverse lookup hastable? */

*hashtable

> +             while ((entry = (LogicalRepRelMapEntry *) 
> hash_seq_search(&status)) != NULL)
> +             {
> +                     if (entry->reloid == reloid)
> +                             entry->reloid = InvalidOid;

can't we break here?


> +/*
> + * Initialize the relation map cache.
> + */
> +static void
> +remoterelmap_init(void)
> +{
> +     HASHCTL         ctl;
> +
> +     /* Make sure we've initialized CacheMemoryContext. */
> +     if (CacheMemoryContext == NULL)
> +             CreateCacheMemoryContext();
> +
> +     /* Initialize the hash table. */
> +     MemSet(&ctl, 0, sizeof(ctl));
> +     ctl.keysize = sizeof(uint32);
> +     ctl.entrysize = sizeof(LogicalRepRelMapEntry);
> +     ctl.hcxt = CacheMemoryContext;

Wonder if this (and similar code earlier) should try to do everything in
a sub-context of CacheMemoryContext instead. That'd make some issues
easier to track down.

> +/*
> + * Open the local relation associated with the remote one.
> + */
> +static LogicalRepRelMapEntry *
> +logicalreprel_open(uint32 remoteid, LOCKMODE lockmode)
> +{
> +     LogicalRepRelMapEntry  *entry;
> +     bool            found;
> +
> +     if (LogicalRepRelMap == NULL)
> +             remoterelmap_init();
> +
> +     /* Search for existing entry. */
> +     entry = hash_search(LogicalRepRelMap, (void *) &remoteid,
> +                                             HASH_FIND, &found);
> +
> +     if (!found)
> +             elog(FATAL, "cache lookup failed for remote relation %u",
> +                      remoteid);
> +
> +     /* Need to update the local cache? */
> +     if (!OidIsValid(entry->reloid))
> +     {
> +             Oid                     nspid;
> +             Oid                     relid;
> +             int                     i;
> +             TupleDesc       desc;
> +             LogicalRepRelation *remoterel;
> +
> +             remoterel = &entry->remoterel;
> +
> +             nspid = LookupExplicitNamespace(remoterel->nspname, false);
> +             if (!OidIsValid(nspid))
> +                     ereport(FATAL,
> +                                     
> (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
> +                                      errmsg("the logical replication target 
> %s not found",
> +                                                     
> quote_qualified_identifier(remoterel->nspname,
                                                                                
                           remoterel->relname))));
> +             relid = get_relname_relid(remoterel->relname, nspid);
> +             if (!OidIsValid(relid))
> +                     ereport(FATAL,
> +                                     
> (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
> +                                      errmsg("the logical replication target 
> %s not found",
> +                                                     
> quote_qualified_identifier(remoterel->nspname,
> +                                                                             
>                            remoterel->relname))));
> +
> +             entry->rel = heap_open(relid, lockmode);

This seems rather racy. I think this really instead needs something akin
to RangeVarGetRelidExtended().

> +/*
> + * Executor state preparation for evaluation of constraint expressions,
> + * indexes and triggers.
> + *
> + * This is based on similar code in copy.c
> + */
> +static EState *
> +create_estate_for_relation(LogicalRepRelMapEntry *rel)
> +{
> +     EState     *estate;
> +     ResultRelInfo *resultRelInfo;
> +     RangeTblEntry *rte;
> +
> +     estate = CreateExecutorState();
> +
> +     rte = makeNode(RangeTblEntry);
> +     rte->rtekind = RTE_RELATION;
> +     rte->relid = RelationGetRelid(rel->rel);
> +     rte->relkind = rel->rel->rd_rel->relkind;
> +     estate->es_range_table = list_make1(rte);
> +
> +     resultRelInfo = makeNode(ResultRelInfo);
> +     InitResultRelInfo(resultRelInfo, rel->rel, 1, 0);
> +
> +     estate->es_result_relations = resultRelInfo;
> +     estate->es_num_result_relations = 1;
> +     estate->es_result_relation_info = resultRelInfo;
> +
> +     /* Triggers might need a slot */
> +     if (resultRelInfo->ri_TrigDesc)
> +             estate->es_trig_tuple_slot = ExecInitExtraTupleSlot(estate);
> +
> +     return estate;
> +}

Ugh, we do this for every single change? That's pretty darn heavy.


> +/*
> + * Check if the local attribute is present in relation definition used
> + * by upstream and hence updated by the replication.
> + */
> +static bool
> +physatt_in_attmap(LogicalRepRelMapEntry *rel, int attid)
> +{
> +     AttrNumber      i;
> +
> +     /* Fast path for tables that are same on upstream and downstream. */
> +     if (attid < rel->remoterel.natts && rel->attmap[attid] == attid)
> +             return true;
> +
> +     /* Try to find the attribute in the map. */
> +     for (i = 0; i < rel->remoterel.natts; i++)
> +             if (rel->attmap[i] == attid)
> +                     return true;
> +
> +     return false;
> +}

Shouldn't we rather try to keep an attribute map that always can map
remote attribute numbers to local ones? That doesn't seem hard on a
first blush? But I might be missing something here.


> +/*
> + * Executes default values for columns for which we can't map to remote
> + * relation columns.
> + *
> + * This allows us to support tables which have more columns on the downstream
> + * than on the upsttream.
> + */

Typo: upsttream.


> +static void
> +FillSlotDefaults(LogicalRepRelMapEntry *rel, EState *estate,
> +                              TupleTableSlot *slot)
> +{

Why is this using a different naming scheme?


> +/*
> + * Handle COMMIT message.
> + *
> + * TODO, support tracking of multiple origins
> + */
> +static void
> +handle_commit(StringInfo s)
> +{
> +     XLogRecPtr              commit_lsn;
> +     XLogRecPtr              end_lsn;
> +     TimestampTz             commit_time;
> +
> +     logicalrep_read_commit(s, &commit_lsn, &end_lsn, &commit_time);

Perhaps this (and related routines) should rather be
        LogicalRepCommitdata commit_data;
        logicalrep_read_commit(s, &commit_data);
etc? That way the data can transparently be enhanced.

> +     Assert(commit_lsn == replorigin_session_origin_lsn);
> +     Assert(commit_time == replorigin_session_origin_timestamp);
> +
> +     if (IsTransactionState())
> +     {
> +             FlushPosition *flushpos;
> +
> +             CommitTransactionCommand();
> +             MemoryContextSwitchTo(CacheMemoryContext);
> +
> +             /* Track commit lsn  */
> +             flushpos = (FlushPosition *) palloc(sizeof(FlushPosition));
> +             flushpos->local_end = XactLastCommitEnd;
> +             flushpos->remote_end = end_lsn;
> +
> +             dlist_push_tail(&lsn_mapping, &flushpos->node);
> +             MemoryContextSwitchTo(ApplyContext);

Seems like it should be in a separate function.


> +/*
> + * Handle INSERT message.
> + */
> +static void
> +handle_insert(StringInfo s)
> +{
> +     LogicalRepRelMapEntry *rel;
> +     LogicalRepTupleData     newtup;
> +     LogicalRepRelId         relid;
> +     EState                     *estate;
> +     TupleTableSlot     *remoteslot;
> +     MemoryContext           oldctx;
> +
> +     ensure_transaction();
> +
> +     relid = logicalrep_read_insert(s, &newtup);
> +     rel = logicalreprel_open(relid, RowExclusiveLock);
> +
> +     /* Initialize the executor state. */
> +     estate = create_estate_for_relation(rel);
> +     remoteslot = ExecInitExtraTupleSlot(estate);
> +     ExecSetSlotDescriptor(remoteslot, RelationGetDescr(rel->rel));

This seems incredibly expensive for replicating a lot of rows.

> +     /* Process and store remote tuple in the slot */
> +     oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
> +     SlotStoreCStrings(remoteslot, newtup.values);
> +     FillSlotDefaults(rel, estate, remoteslot);
> +     MemoryContextSwitchTo(oldctx);
> +
> +     PushActiveSnapshot(GetTransactionSnapshot());
> +     ExecOpenIndices(estate->es_result_relation_info, false);
> +
> +     ExecInsert(NULL, /* mtstate is only used for onconflict handling which 
> we don't support atm */
> +                        remoteslot,
> +                        remoteslot,
> +                        NIL,
> +                        ONCONFLICT_NONE,
> +                        estate,
> +                        false);

I have *severe* doubts about just using the (newly) exposed functions
1:1 here.


> +/*
> + * Search the relation 'rel' for tuple using the replication index.
> + *
> + * If a matching tuple is found lock it with lockmode, fill the slot with its
> + * contents and return true, return false is returned otherwise.
> + */
> +static bool
> +tuple_find_by_replidx(Relation rel, LockTupleMode lockmode,
> +                                       TupleTableSlot *searchslot, 
> TupleTableSlot *slot)
> +{
> +     HeapTuple               scantuple;
> +     ScanKeyData             skey[INDEX_MAX_KEYS];
> +     IndexScanDesc   scan;
> +     SnapshotData    snap;
> +     TransactionId   xwait;
> +     Oid                             idxoid;
> +     Relation                idxrel;
> +     bool                    found;
> +
> +     /* Open REPLICA IDENTITY index.*/
> +     idxoid = RelationGetReplicaIndex(rel);
> +     if (!OidIsValid(idxoid))
> +     {
> +             elog(ERROR, "could not find configured replica identity for 
> table \"%s\"",
> +                      RelationGetRelationName(rel));
> +             return false;
> +     }
> +     idxrel = index_open(idxoid, RowExclusiveLock);
> +
> +     /* Start an index scan. */
> +     InitDirtySnapshot(snap);
> +     scan = index_beginscan(rel, idxrel, &snap,
> +                                                
> RelationGetNumberOfAttributes(idxrel),
> +                                                0);
> +
> +     /* Build scan key. */
> +     build_replindex_scan_key(skey, rel, idxrel, searchslot);
> +
> +retry:
> +     found = false;
> +
> +     index_rescan(scan, skey, RelationGetNumberOfAttributes(idxrel), NULL, 
> 0);
> +
> +     /* Try to find the tuple */
> +     if ((scantuple = index_getnext(scan, ForwardScanDirection)) != NULL)
> +     {
> +             found = true;
> +             ExecStoreTuple(scantuple, slot, InvalidBuffer, false);
> +             ExecMaterializeSlot(slot);
> +
> +             xwait = TransactionIdIsValid(snap.xmin) ?
> +                     snap.xmin : snap.xmax;
> +
> +             /*
> +              * If the tuple is locked, wait for locking transaction to 
> finish
> +              * and retry.
> +              */
> +             if (TransactionIdIsValid(xwait))
> +             {
> +                     XactLockTableWait(xwait, NULL, NULL, XLTW_None);
> +                     goto retry;
> +             }
> +     }

Hm. So we potentially find multiple tuples here, and lock all of
them. but then only use one for the update.



> +static List *
> +get_subscription_list(void)
> +{
> +     List       *res = NIL;
> +     Relation        rel;
> +     HeapScanDesc scan;
> +     HeapTuple       tup;
> +     MemoryContext resultcxt;
> +
> +     /* This is the context that we will allocate our output data in */
> +     resultcxt = CurrentMemoryContext;
> +
> +     /*
> +      * Start a transaction so we can access pg_database, and get a snapshot.
> +      * We don't have a use for the snapshot itself, but we're interested in
> +      * the secondary effect that it sets RecentGlobalXmin.  (This is 
> critical
> +      * for anything that reads heap pages, because HOT may decide to prune
> +      * them even if the process doesn't attempt to modify any tuples.)
> +      */

> +     StartTransactionCommand();
> +     (void) GetTransactionSnapshot();
> +
> +     rel = heap_open(SubscriptionRelationId, AccessShareLock);
> +     scan = heap_beginscan_catalog(rel, 0, NULL);
> +
> +     while (HeapTupleIsValid(tup = heap_getnext(scan, ForwardScanDirection)))
> +     {
> +             Form_pg_subscription subform = (Form_pg_subscription) 
> GETSTRUCT(tup);
> +             Subscription   *sub;
> +             MemoryContext   oldcxt;
> +
> +             /*
> +              * Allocate our results in the caller's context, not the
> +              * transaction's. We do this inside the loop, and restore the 
> original
> +              * context at the end, so that leaky things like heap_getnext() 
> are
> +              * not called in a potentially long-lived context.
> +              */
> +             oldcxt = MemoryContextSwitchTo(resultcxt);
> +
> +             sub = (Subscription *) palloc(sizeof(Subscription));
> +             sub->oid = HeapTupleGetOid(tup);
> +             sub->dbid = subform->subdbid;
> +             sub->enabled = subform->subenabled;
> +
> +             /* We don't fill fields we are not intereste in. */
> +             sub->name = NULL;
> +             sub->conninfo = NULL;
> +             sub->slotname = NULL;
> +             sub->publications = NIL;
> +
> +             res = lappend(res, sub);
> +             MemoryContextSwitchTo(oldcxt);
> +     }
> +
> +     heap_endscan(scan);
> +     heap_close(rel, AccessShareLock);
> +
> +     CommitTransactionCommand();

Hm. this doesn't seem quite right from a locking pov. What if, in the
middle of this, a new subscription is created?



> +void
> +logicalrep_worker_stop(LogicalRepWorker *worker)
> +{
> +     Assert(LWLockHeldByMe(LogicalRepWorkerLock));
> +
> +     /* Check that the worker is up and what we expect. */
> +     if (!worker->proc)
> +             return;
> +     if (!IsBackendPid(worker->proc->pid))
> +             return;
> +
> +     /* Terminate the worker. */
> +     kill(worker->proc->pid, SIGTERM);
> +
> +     LWLockRelease(LogicalRepLauncherLock);
> +
> +     /* Wait for it to detach. */
> +     for (;;)
> +     {
> +             int     rc = WaitLatch(&MyProc->procLatch,
> +                                                WL_LATCH_SET | WL_TIMEOUT | 
> WL_POSTMASTER_DEATH,
> +                                                1000L);
> +
> +        /* emergency bailout if postmaster has died */
> +        if (rc & WL_POSTMASTER_DEATH)
> +                     proc_exit(1);
> +
> +        ResetLatch(&MyProc->procLatch);
> +
> +             CHECK_FOR_INTERRUPTS();
> +
> +             if (!worker->proc)
> +                     return;
> +     }
> +}

indentation here seems scfrewed.



> +static void
> +xacthook_signal_launcher(XactEvent event, void *arg)
> +{
> +     switch (event)
> +     {
> +             case XACT_EVENT_COMMIT:
> +                     if (xacthook_do_signal_launcher)
> +                             ApplyLauncherWakeup();
> +                     break;
> +             default:
> +                     /* We're not interested in other tx events */
> +                     break;
> +     }
> +}

> +void
> +ApplyLauncherWakeupOnCommit(void)
> +{
> +     if (!xacthook_do_signal_launcher)
> +     {
> +             RegisterXactCallback(xacthook_signal_launcher, NULL);
> +             xacthook_do_signal_launcher = true;
> +     }
> +}

Hm. This seems like it really should be an AtCommit_* routine instead.
This also needs more docs.


Hadn't I previously read about always streaming data to disk first?

> @@ -0,0 +1,674 @@
> +/*-------------------------------------------------------------------------
> + * tablesync.c
> + *      PostgreSQL logical replication
> + *
> + * Copyright (c) 2012-2016, PostgreSQL Global Development Group
> + *
> + * IDENTIFICATION
> + *     src/backend/replication/logical/tablesync.c
> + *
> + * NOTES
> + *     This file contains code for initial table data synchronization for
> + *     logical replication.
> + *
> + *    The initial data synchronization is done separately for each table,
> + *    in separate apply worker that only fetches the initial snapshot data
> + *    from the provider and then synchronizes the position in stream with
> + *    the main apply worker.

Why? I guess that's because it allows to incrementally add tables, with
acceptable overhead.


> + *    The stream position synchronization works in multiple steps.
> + *     - sync finishes copy and sets table state as SYNCWAIT and waits
> + *       for state to change in a loop
> + *     - apply periodically checks unsynced tables for SYNCWAIT, when it
> + *       appears it will compare its position in the stream with the
> + *       SYNCWAIT position and decides to either set it to CATCHUP when
> + *       the apply was infront (and wait for the sync to do the catchup),
> + *       or set the state to SYNCDONE if the sync was infront or in case
> + *       both sync and apply are at the same position it will set it to
> + *       READY and stops tracking it

I'm not quite following here.

> + *     - if the state was set to CATCHUP sync will read the stream and
> + *       apply changes until it catches up to the specified stream
> + *       position and then sets state to READY and signals apply that it
> + *       can stop waiting and exits, if the state was set to something
> + *       else than CATCHUP the sync process will simply end
> + *     - if the state was set to SYNCDONE by apply, the apply will
> + *       continue tracking the table until it reaches the SYNCDONE stream
> + *       position at which point it sets state to READY and stops tracking
> + *
> + *    Example flows look like this:
> + *     - Apply is infront:
> + *         sync:8   -> set SYNCWAIT
> + *        apply:10 -> set CATCHUP
> + *        sync:10  -> set ready
> + *          exit
> + *        apply:10
> + *          stop tracking
> + *          continue rep
> + *    - Sync infront:
> + *        sync:10
> + *          set SYNCWAIT
> + *        apply:8
> + *          set SYNCDONE
> + *        sync:10
> + *          exit
> + *        apply:10
> + *          set READY
> + *          stop tracking
> + *          continue rep

This definitely needs to be expanded a bit. Where are we tracking how
far replication has progressed on individual tables? Are we creating new
slots for syncing? Is there any parallelism in syncing?

> +/*
> + * Exit routine for synchronization worker.
> + */
> +static void
> +finish_sync_worker(char *slotname)
> +{
> +     LogicalRepWorker   *worker;
> +     RepOriginId                     originid;
> +     MemoryContext           oldctx = CurrentMemoryContext;
> +
> +     /*
> +      * Drop the replication slot on remote server.
> +      * We want to continue even in the case that the slot on remote side
> +      * is already gone. This means that we can leave slot on the remote
> +      * side but that can happen for other reasons as well so we can't
> +      * really protect against that.
> +      */
> +     PG_TRY();
> +     {
> +             wrcapi->drop_slot(wrchandle, slotname);
> +     }
> +     PG_CATCH();
> +     {
> +             MemoryContext   ectx;
> +             ErrorData          *edata;
> +
> +             ectx = MemoryContextSwitchTo(oldctx);
> +             /* Save error info */
> +             edata = CopyErrorData();
> +             MemoryContextSwitchTo(ectx);
> +             FlushErrorState();
> +
> +             ereport(WARNING,
> +                             (errmsg("there was problem dropping the 
> replication slot "
> +                                             "\"%s\" on provider", slotname),
> +                              errdetail("The error was: %s", edata->message),
> +                              errhint("You may have to drop it manually")));
> +             FreeErrorData(edata);

ISTM we really should rather return success/failure here, and not throw
an error inside the libpqwalreceiver stuff.  I kind of wonder if we
actually can get rid of this indirection.


> +     /* Find the main apply worker and signal it. */
> +     LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE);
> +     worker = logicalrep_worker_find(MyLogicalRepWorker->subid, InvalidOid);
> +     if (worker && worker->proc)
> +             SetLatch(&worker->proc->procLatch);
> +     LWLockRelease(LogicalRepWorkerLock);

I'd rather do the SetLatch outside of the critical section.

> +static bool
> +wait_for_sync_status_change(TableState *tstate)
> +{
> +     int             rc;
> +     char    state = tstate->state;
> +
> +     while (!got_SIGTERM)
> +     {
> +             StartTransactionCommand();
> +             tstate->state = 
> GetSubscriptionRelState(MyLogicalRepWorker->subid,
> +                                                                             
>                 tstate->relid,
> +                                                                             
>                 &tstate->lsn,
> +                                                                             
>                 true);
> +             CommitTransactionCommand();
> +
> +             /* Status record was removed. */
> +             if (tstate->state == SUBREL_STATE_UNKNOWN)
> +                     return false;
> +
> +             if (tstate->state != state)
> +                     return true;
> +
> +             rc = WaitLatch(&MyProc->procLatch,
> +                                        WL_LATCH_SET | WL_TIMEOUT | 
> WL_POSTMASTER_DEATH,
> +                                        10000L);
> +
> +             /* emergency bailout if postmaster has died */
> +             if (rc & WL_POSTMASTER_DEATH)
> +                     proc_exit(1);
> +
> +        ResetLatch(&MyProc->procLatch);

broken indentation.


> +/*
> + * Read the state of the tables in the subscription and update our table
> + * state list.
> + */
> +static void
> +reread_sync_state(Oid relid)
> +{
> +     dlist_mutable_iter      iter;
> +     Relation        rel;
> +     HeapTuple       tup;
> +     ScanKeyData     skey[2];
> +     HeapScanDesc    scan;
> +
> +     /* Clean the old list. */
> +     dlist_foreach_modify(iter, &table_states)
> +     {
> +             TableState *tstate = dlist_container(TableState, node, 
> iter.cur);
> +
> +             dlist_delete(iter.cur);
> +             pfree(tstate);
> +     }
> +
> +     /*
> +      * Fetch all the subscription relation states that are not marked as
> +      * ready and push them into our table state tracking list.
> +      */
> +     rel = heap_open(SubscriptionRelRelationId, RowExclusiveLock);
> +
> +     ScanKeyInit(&skey[0],
> +                             Anum_pg_subscription_rel_subid,
> +                             BTEqualStrategyNumber, F_OIDEQ,
> +                             ObjectIdGetDatum(MyLogicalRepWorker->subid));
> +
> +     if (OidIsValid(relid))
> +     {
> +             ScanKeyInit(&skey[1],
> +                                     Anum_pg_subscription_rel_subrelid,
> +                                     BTEqualStrategyNumber, F_OIDEQ,
> +                                     ObjectIdGetDatum(relid));
> +     }
> +     else
> +     {
> +             ScanKeyInit(&skey[1],
> +                                     Anum_pg_subscription_rel_substate,
> +                                     BTEqualStrategyNumber, F_CHARNE,
> +                                     CharGetDatum(SUBREL_STATE_READY));
> +     }
> +
> +     scan = heap_beginscan_catalog(rel, 2, skey);

Hm. So this is a seqscan. Shouldn't we make this use an index (depending
on which branch is taken above)?


> +     while (HeapTupleIsValid(tup = heap_getnext(scan, ForwardScanDirection)))
> +     {
> +             Form_pg_subscription_rel        subrel;
> +             TableState         *tstate;
> +             MemoryContext   oldctx;
> +
> +             subrel = (Form_pg_subscription_rel) GETSTRUCT(tup);
> +
> +             /* Allocate the tracking info in a permament memory context. */

s/permament/permanent/

> +/*
> + * Handle table synchronization cooperation from the synchroniation
> + * worker.
> + */
> +static void
> +process_syncing_tables_sync(char *slotname, XLogRecPtr end_lsn)
> +{
> +     TableState *tstate;
> +     TimeLineID      tli;
> +
> +     Assert(!IsTransactionState());
> +
> +     /*
> +      * Synchronization workers don't keep track of all synchronization
> +      * tables, they only care about their table.
> +      */
> +     if (!table_states_valid)
> +     {
> +             StartTransactionCommand();
> +             reread_sync_state(MyLogicalRepWorker->relid);
> +             CommitTransactionCommand();
> +     }
> +
> +     /* Somebody removed table underneath this worker, nothing more to do. */
> +     if (dlist_is_empty(&table_states))
> +     {
> +             wrcapi->endstreaming(wrchandle, &tli);
> +             finish_sync_worker(slotname);
> +     }
> +
> +     /* Check if we are done with catchup now. */
> +     tstate = dlist_container(TableState, node, 
> dlist_head_node(&table_states));
> +     if (tstate->state == SUBREL_STATE_CATCHUP)
> +     {
> +             Assert(tstate->lsn != InvalidXLogRecPtr);
> +
> +             if (tstate->lsn == end_lsn)
> +             {
> +                     tstate->state = SUBREL_STATE_READY;
> +                     tstate->lsn = InvalidXLogRecPtr;
> +                     /* Update state of the synchronization. */
> +                     StartTransactionCommand();
> +                     SetSubscriptionRelState(MyLogicalRepWorker->subid,
> +                                                                     
> tstate->relid, tstate->state,
> +                                                                     
> tstate->lsn);
> +                     CommitTransactionCommand();
> +
> +                     wrcapi->endstreaming(wrchandle, &tli);
> +                     finish_sync_worker(slotname);
> +             }
> +             return;
> +     }
> +}

The return inside the if is a bit weird. Makes one think it might be a
loop or such.


> +/*
> + * Handle table synchronization cooperation from the apply worker.
> + */
> +static void
> +process_syncing_tables_apply(char *slotname, XLogRecPtr end_lsn)
> +{
> +     dlist_mutable_iter      iter;
> +
> +     Assert(!IsTransactionState());
> +
> +     if (!table_states_valid)
> +     {
> +             StartTransactionCommand();
> +             reread_sync_state(InvalidOid);
> +             CommitTransactionCommand();
> +     }

So this pattern is repeated a bunch of times, maybe we can encapsulate
that somewhat? Maybe like ensure_sync_state_valid() or such?


> +     dlist_foreach_modify(iter, &table_states)
> +     {
> +             TableState *tstate = dlist_container(TableState, node, 
> iter.cur);
> +             bool            start_worker;
> +             LogicalRepWorker   *worker;
> +
> +             /*
> +              * When the synchronization process is at the cachup phase we 
> need

s/cachup/catchup/


> +              * to ensure that we are not behind it (it's going to wait at 
> this
> +              * point for the change of state). Once we are infront or at 
> the same
> +              * position as the synchronization proccess we can signal it to
> +              * finish the catchup.
> +              */
> +             if (tstate->state == SUBREL_STATE_SYNCWAIT)
> +             {
> +                     if (end_lsn > tstate->lsn)
> +                     {
> +                             /*
> +                              * Apply is infront, tell sync to catchup. and 
> wait until
> +                              * it does.
> +                              */
> +                             tstate->state = SUBREL_STATE_CATCHUP;
> +                             tstate->lsn = end_lsn;
> +                             StartTransactionCommand();
> +                             
> SetSubscriptionRelState(MyLogicalRepWorker->subid,
> +                                                                             
> tstate->relid, tstate->state,
> +                                                                             
> tstate->lsn);
> +                             CommitTransactionCommand();
> +
> +                             /* Signal the worker as it may be waiting for 
> us. */
> +                             LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
> +                             worker = 
> logicalrep_worker_find(MyLogicalRepWorker->subid,
> +                                                                             
>                 tstate->relid);
> +                             if (worker && worker->proc)
> +                                     SetLatch(&worker->proc->procLatch);
> +                             LWLockRelease(LogicalRepWorkerLock);

Different parts of this file use different lock level to set the
latch. Why?


> +                             if (wait_for_sync_status_change(tstate))
> +                                     Assert(tstate->state == 
> SUBREL_STATE_READY);
> +                     }
> +                     else
> +                     {
> +                             /*
> +                              * Apply is either behind in which case sync 
> worker is done
> +                              * but apply needs to keep tracking the table 
> until it
> +                              * catches up to where sync finished.
> +                              * Or apply and sync are at the same position 
> in which case
> +                              * table can be switched to standard 
> replication mode
> +                              * immediately.
> +                              */
> +                             if (end_lsn < tstate->lsn)
> +                                     tstate->state = SUBREL_STATE_SYNCDONE;
> +                             else
> +                                     tstate->state = SUBREL_STATE_READY;
> +

What I'm failing to understand is how this can be done under
concurrency. You probably thought about this, but it should really be
explained somewhere.


> +                             StartTransactionCommand();
> +                             
> SetSubscriptionRelState(MyLogicalRepWorker->subid,
> +                                                                             
> tstate->relid, tstate->state,
> +                                                                             
> tstate->lsn);
> +                             CommitTransactionCommand();
> +
> +                             /* Signal the worker as it may be waiting for 
> us. */
> +                             LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
> +                             worker = 
> logicalrep_worker_find(MyLogicalRepWorker->subid,
> +                                                                             
>                 tstate->relid);
> +                             if (worker && worker->proc)
> +                                     SetLatch(&worker->proc->procLatch);
> +                             LWLockRelease(LogicalRepWorkerLock);

Oh, and again, please set latches outside of the lock.


> +             else if (tstate->state == SUBREL_STATE_SYNCDONE &&
> +                              end_lsn >= tstate->lsn)
> +             {
> +                     /*
> +                      * Apply catched up to the position where table sync 
> finished,
> +                      * mark the table as ready for normal replication.
> +                      */

Sentence needs to be rephrased a bit.

> +             /*
> +              * In case table is supposed to be synchronizing but the
> +              * synchronization worker is not running, start it.
> +              * Limit the number of launched workers here to one (for now).
> +              */

Hm. That seems problematic for online upgrade type cases, we might never
be catch up that way...


> +/*
> + * Start syncing the table in the sync worker.
> + */
> +char *
> +LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
> +{
> +     StringInfoData  s;
> +     TableState              tstate;
> +     MemoryContext   oldctx;
> +     char               *slotname;
> +
> +     /* Check the state of the table synchronization. */
> +     StartTransactionCommand();
> +     tstate.relid = MyLogicalRepWorker->relid;
> +     tstate.state = GetSubscriptionRelState(MySubscription->oid, 
> tstate.relid,
> +                                                                             
>    &tstate.lsn, false);
> +
> +     /*
> +      * Build unique slot name.
> +      * TODO: protect against too long slot name.
> +      */
> +     oldctx = MemoryContextSwitchTo(CacheMemoryContext);
> +     initStringInfo(&s);
> +     appendStringInfo(&s, "%s_sync_%s", MySubscription->slotname,
> +                                      get_rel_name(tstate.relid));
> +     slotname = s.data;

Is this memory freed somewhere?


> +                             /*
> +                              * We want to do the table data sync in single
> +                              * transaction so do not close the transaction 
> opened
> +                              * above.
> +                              * There will be no BEGIN or COMMIT messages 
> coming via
> +                              * logical replication while the copy table 
> command is
> +                              * running so start the transaction here.
> +                              * Note the memory context for data handling 
> will still
> +                              * be done using ensure_transaction called by 
> the insert
> +                              * handler.
> +                              */
> +                             StartTransactionCommand();
> +
> +                             /*
> +                              * Don't allow parallel access other than 
> SELECT while
> +                              * the initial contents are being copied.
> +                              */
> +                             rel = heap_open(tstate.relid, ExclusiveLock);

Why do we want to allow access at all?



> @@ -87,6 +92,8 @@ _PG_output_plugin_init(OutputPluginCallbacks *cb)
>       cb->commit_cb = pgoutput_commit_txn;
>       cb->filter_by_origin_cb = pgoutput_origin_filter;
>       cb->shutdown_cb = pgoutput_shutdown;
> +     cb->tuple_cb = pgoutput_tuple;
> +     cb->list_tables_cb = pgoutput_list_tables;
>  }

What are these new, and undocumented callbacks actually doing? And why
is this integrated into logical decoding?


>  /*
> + * Handle LIST_TABLES command.
> + */
> +static void
> +SendTableList(ListTablesCmd *cmd)
> +{

Ugh.


I really dislike this kind of command. I think we should instead change
things around, allowing to issue normal SQL via the replication
command. We'll have to error out for running sql for non-database
connected replication connections, but that seems fine.


Andres


-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to