(continuing, uh, a bit happier) On 2016-09-09 00:59:26 +0200, Petr Jelinek wrote:
> +/* > + * Relcache invalidation callback for our relation map cache. > + */ > +static void > +logicalreprelmap_invalidate_cb(Datum arg, Oid reloid) > +{ > + LogicalRepRelMapEntry *entry; > + > + /* Just to be sure. */ > + if (LogicalRepRelMap == NULL) > + return; > + > + if (reloid != InvalidOid) > + { > + HASH_SEQ_STATUS status; > + > + hash_seq_init(&status, LogicalRepRelMap); > + > + /* TODO, use inverse lookup hastable? */ *hashtable > + while ((entry = (LogicalRepRelMapEntry *) > hash_seq_search(&status)) != NULL) > + { > + if (entry->reloid == reloid) > + entry->reloid = InvalidOid; can't we break here? > +/* > + * Initialize the relation map cache. > + */ > +static void > +remoterelmap_init(void) > +{ > + HASHCTL ctl; > + > + /* Make sure we've initialized CacheMemoryContext. */ > + if (CacheMemoryContext == NULL) > + CreateCacheMemoryContext(); > + > + /* Initialize the hash table. */ > + MemSet(&ctl, 0, sizeof(ctl)); > + ctl.keysize = sizeof(uint32); > + ctl.entrysize = sizeof(LogicalRepRelMapEntry); > + ctl.hcxt = CacheMemoryContext; Wonder if this (and similar code earlier) should try to do everything in a sub-context of CacheMemoryContext instead. That'd make some issues easier to track down. > +/* > + * Open the local relation associated with the remote one. > + */ > +static LogicalRepRelMapEntry * > +logicalreprel_open(uint32 remoteid, LOCKMODE lockmode) > +{ > + LogicalRepRelMapEntry *entry; > + bool found; > + > + if (LogicalRepRelMap == NULL) > + remoterelmap_init(); > + > + /* Search for existing entry. */ > + entry = hash_search(LogicalRepRelMap, (void *) &remoteid, > + HASH_FIND, &found); > + > + if (!found) > + elog(FATAL, "cache lookup failed for remote relation %u", > + remoteid); > + > + /* Need to update the local cache? */ > + if (!OidIsValid(entry->reloid)) > + { > + Oid nspid; > + Oid relid; > + int i; > + TupleDesc desc; > + LogicalRepRelation *remoterel; > + > + remoterel = &entry->remoterel; > + > + nspid = LookupExplicitNamespace(remoterel->nspname, false); > + if (!OidIsValid(nspid)) > + ereport(FATAL, > + > (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), > + errmsg("the logical replication target > %s not found", > + > quote_qualified_identifier(remoterel->nspname, remoterel->relname)))); > + relid = get_relname_relid(remoterel->relname, nspid); > + if (!OidIsValid(relid)) > + ereport(FATAL, > + > (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), > + errmsg("the logical replication target > %s not found", > + > quote_qualified_identifier(remoterel->nspname, > + > remoterel->relname)))); > + > + entry->rel = heap_open(relid, lockmode); This seems rather racy. I think this really instead needs something akin to RangeVarGetRelidExtended(). > +/* > + * Executor state preparation for evaluation of constraint expressions, > + * indexes and triggers. > + * > + * This is based on similar code in copy.c > + */ > +static EState * > +create_estate_for_relation(LogicalRepRelMapEntry *rel) > +{ > + EState *estate; > + ResultRelInfo *resultRelInfo; > + RangeTblEntry *rte; > + > + estate = CreateExecutorState(); > + > + rte = makeNode(RangeTblEntry); > + rte->rtekind = RTE_RELATION; > + rte->relid = RelationGetRelid(rel->rel); > + rte->relkind = rel->rel->rd_rel->relkind; > + estate->es_range_table = list_make1(rte); > + > + resultRelInfo = makeNode(ResultRelInfo); > + InitResultRelInfo(resultRelInfo, rel->rel, 1, 0); > + > + estate->es_result_relations = resultRelInfo; > + estate->es_num_result_relations = 1; > + estate->es_result_relation_info = resultRelInfo; > + > + /* Triggers might need a slot */ > + if (resultRelInfo->ri_TrigDesc) > + estate->es_trig_tuple_slot = ExecInitExtraTupleSlot(estate); > + > + return estate; > +} Ugh, we do this for every single change? That's pretty darn heavy. > +/* > + * Check if the local attribute is present in relation definition used > + * by upstream and hence updated by the replication. > + */ > +static bool > +physatt_in_attmap(LogicalRepRelMapEntry *rel, int attid) > +{ > + AttrNumber i; > + > + /* Fast path for tables that are same on upstream and downstream. */ > + if (attid < rel->remoterel.natts && rel->attmap[attid] == attid) > + return true; > + > + /* Try to find the attribute in the map. */ > + for (i = 0; i < rel->remoterel.natts; i++) > + if (rel->attmap[i] == attid) > + return true; > + > + return false; > +} Shouldn't we rather try to keep an attribute map that always can map remote attribute numbers to local ones? That doesn't seem hard on a first blush? But I might be missing something here. > +/* > + * Executes default values for columns for which we can't map to remote > + * relation columns. > + * > + * This allows us to support tables which have more columns on the downstream > + * than on the upsttream. > + */ Typo: upsttream. > +static void > +FillSlotDefaults(LogicalRepRelMapEntry *rel, EState *estate, > + TupleTableSlot *slot) > +{ Why is this using a different naming scheme? > +/* > + * Handle COMMIT message. > + * > + * TODO, support tracking of multiple origins > + */ > +static void > +handle_commit(StringInfo s) > +{ > + XLogRecPtr commit_lsn; > + XLogRecPtr end_lsn; > + TimestampTz commit_time; > + > + logicalrep_read_commit(s, &commit_lsn, &end_lsn, &commit_time); Perhaps this (and related routines) should rather be LogicalRepCommitdata commit_data; logicalrep_read_commit(s, &commit_data); etc? That way the data can transparently be enhanced. > + Assert(commit_lsn == replorigin_session_origin_lsn); > + Assert(commit_time == replorigin_session_origin_timestamp); > + > + if (IsTransactionState()) > + { > + FlushPosition *flushpos; > + > + CommitTransactionCommand(); > + MemoryContextSwitchTo(CacheMemoryContext); > + > + /* Track commit lsn */ > + flushpos = (FlushPosition *) palloc(sizeof(FlushPosition)); > + flushpos->local_end = XactLastCommitEnd; > + flushpos->remote_end = end_lsn; > + > + dlist_push_tail(&lsn_mapping, &flushpos->node); > + MemoryContextSwitchTo(ApplyContext); Seems like it should be in a separate function. > +/* > + * Handle INSERT message. > + */ > +static void > +handle_insert(StringInfo s) > +{ > + LogicalRepRelMapEntry *rel; > + LogicalRepTupleData newtup; > + LogicalRepRelId relid; > + EState *estate; > + TupleTableSlot *remoteslot; > + MemoryContext oldctx; > + > + ensure_transaction(); > + > + relid = logicalrep_read_insert(s, &newtup); > + rel = logicalreprel_open(relid, RowExclusiveLock); > + > + /* Initialize the executor state. */ > + estate = create_estate_for_relation(rel); > + remoteslot = ExecInitExtraTupleSlot(estate); > + ExecSetSlotDescriptor(remoteslot, RelationGetDescr(rel->rel)); This seems incredibly expensive for replicating a lot of rows. > + /* Process and store remote tuple in the slot */ > + oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate)); > + SlotStoreCStrings(remoteslot, newtup.values); > + FillSlotDefaults(rel, estate, remoteslot); > + MemoryContextSwitchTo(oldctx); > + > + PushActiveSnapshot(GetTransactionSnapshot()); > + ExecOpenIndices(estate->es_result_relation_info, false); > + > + ExecInsert(NULL, /* mtstate is only used for onconflict handling which > we don't support atm */ > + remoteslot, > + remoteslot, > + NIL, > + ONCONFLICT_NONE, > + estate, > + false); I have *severe* doubts about just using the (newly) exposed functions 1:1 here. > +/* > + * Search the relation 'rel' for tuple using the replication index. > + * > + * If a matching tuple is found lock it with lockmode, fill the slot with its > + * contents and return true, return false is returned otherwise. > + */ > +static bool > +tuple_find_by_replidx(Relation rel, LockTupleMode lockmode, > + TupleTableSlot *searchslot, > TupleTableSlot *slot) > +{ > + HeapTuple scantuple; > + ScanKeyData skey[INDEX_MAX_KEYS]; > + IndexScanDesc scan; > + SnapshotData snap; > + TransactionId xwait; > + Oid idxoid; > + Relation idxrel; > + bool found; > + > + /* Open REPLICA IDENTITY index.*/ > + idxoid = RelationGetReplicaIndex(rel); > + if (!OidIsValid(idxoid)) > + { > + elog(ERROR, "could not find configured replica identity for > table \"%s\"", > + RelationGetRelationName(rel)); > + return false; > + } > + idxrel = index_open(idxoid, RowExclusiveLock); > + > + /* Start an index scan. */ > + InitDirtySnapshot(snap); > + scan = index_beginscan(rel, idxrel, &snap, > + > RelationGetNumberOfAttributes(idxrel), > + 0); > + > + /* Build scan key. */ > + build_replindex_scan_key(skey, rel, idxrel, searchslot); > + > +retry: > + found = false; > + > + index_rescan(scan, skey, RelationGetNumberOfAttributes(idxrel), NULL, > 0); > + > + /* Try to find the tuple */ > + if ((scantuple = index_getnext(scan, ForwardScanDirection)) != NULL) > + { > + found = true; > + ExecStoreTuple(scantuple, slot, InvalidBuffer, false); > + ExecMaterializeSlot(slot); > + > + xwait = TransactionIdIsValid(snap.xmin) ? > + snap.xmin : snap.xmax; > + > + /* > + * If the tuple is locked, wait for locking transaction to > finish > + * and retry. > + */ > + if (TransactionIdIsValid(xwait)) > + { > + XactLockTableWait(xwait, NULL, NULL, XLTW_None); > + goto retry; > + } > + } Hm. So we potentially find multiple tuples here, and lock all of them. but then only use one for the update. > +static List * > +get_subscription_list(void) > +{ > + List *res = NIL; > + Relation rel; > + HeapScanDesc scan; > + HeapTuple tup; > + MemoryContext resultcxt; > + > + /* This is the context that we will allocate our output data in */ > + resultcxt = CurrentMemoryContext; > + > + /* > + * Start a transaction so we can access pg_database, and get a snapshot. > + * We don't have a use for the snapshot itself, but we're interested in > + * the secondary effect that it sets RecentGlobalXmin. (This is > critical > + * for anything that reads heap pages, because HOT may decide to prune > + * them even if the process doesn't attempt to modify any tuples.) > + */ > + StartTransactionCommand(); > + (void) GetTransactionSnapshot(); > + > + rel = heap_open(SubscriptionRelationId, AccessShareLock); > + scan = heap_beginscan_catalog(rel, 0, NULL); > + > + while (HeapTupleIsValid(tup = heap_getnext(scan, ForwardScanDirection))) > + { > + Form_pg_subscription subform = (Form_pg_subscription) > GETSTRUCT(tup); > + Subscription *sub; > + MemoryContext oldcxt; > + > + /* > + * Allocate our results in the caller's context, not the > + * transaction's. We do this inside the loop, and restore the > original > + * context at the end, so that leaky things like heap_getnext() > are > + * not called in a potentially long-lived context. > + */ > + oldcxt = MemoryContextSwitchTo(resultcxt); > + > + sub = (Subscription *) palloc(sizeof(Subscription)); > + sub->oid = HeapTupleGetOid(tup); > + sub->dbid = subform->subdbid; > + sub->enabled = subform->subenabled; > + > + /* We don't fill fields we are not intereste in. */ > + sub->name = NULL; > + sub->conninfo = NULL; > + sub->slotname = NULL; > + sub->publications = NIL; > + > + res = lappend(res, sub); > + MemoryContextSwitchTo(oldcxt); > + } > + > + heap_endscan(scan); > + heap_close(rel, AccessShareLock); > + > + CommitTransactionCommand(); Hm. this doesn't seem quite right from a locking pov. What if, in the middle of this, a new subscription is created? > +void > +logicalrep_worker_stop(LogicalRepWorker *worker) > +{ > + Assert(LWLockHeldByMe(LogicalRepWorkerLock)); > + > + /* Check that the worker is up and what we expect. */ > + if (!worker->proc) > + return; > + if (!IsBackendPid(worker->proc->pid)) > + return; > + > + /* Terminate the worker. */ > + kill(worker->proc->pid, SIGTERM); > + > + LWLockRelease(LogicalRepLauncherLock); > + > + /* Wait for it to detach. */ > + for (;;) > + { > + int rc = WaitLatch(&MyProc->procLatch, > + WL_LATCH_SET | WL_TIMEOUT | > WL_POSTMASTER_DEATH, > + 1000L); > + > + /* emergency bailout if postmaster has died */ > + if (rc & WL_POSTMASTER_DEATH) > + proc_exit(1); > + > + ResetLatch(&MyProc->procLatch); > + > + CHECK_FOR_INTERRUPTS(); > + > + if (!worker->proc) > + return; > + } > +} indentation here seems scfrewed. > +static void > +xacthook_signal_launcher(XactEvent event, void *arg) > +{ > + switch (event) > + { > + case XACT_EVENT_COMMIT: > + if (xacthook_do_signal_launcher) > + ApplyLauncherWakeup(); > + break; > + default: > + /* We're not interested in other tx events */ > + break; > + } > +} > +void > +ApplyLauncherWakeupOnCommit(void) > +{ > + if (!xacthook_do_signal_launcher) > + { > + RegisterXactCallback(xacthook_signal_launcher, NULL); > + xacthook_do_signal_launcher = true; > + } > +} Hm. This seems like it really should be an AtCommit_* routine instead. This also needs more docs. Hadn't I previously read about always streaming data to disk first? > @@ -0,0 +1,674 @@ > +/*------------------------------------------------------------------------- > + * tablesync.c > + * PostgreSQL logical replication > + * > + * Copyright (c) 2012-2016, PostgreSQL Global Development Group > + * > + * IDENTIFICATION > + * src/backend/replication/logical/tablesync.c > + * > + * NOTES > + * This file contains code for initial table data synchronization for > + * logical replication. > + * > + * The initial data synchronization is done separately for each table, > + * in separate apply worker that only fetches the initial snapshot data > + * from the provider and then synchronizes the position in stream with > + * the main apply worker. Why? I guess that's because it allows to incrementally add tables, with acceptable overhead. > + * The stream position synchronization works in multiple steps. > + * - sync finishes copy and sets table state as SYNCWAIT and waits > + * for state to change in a loop > + * - apply periodically checks unsynced tables for SYNCWAIT, when it > + * appears it will compare its position in the stream with the > + * SYNCWAIT position and decides to either set it to CATCHUP when > + * the apply was infront (and wait for the sync to do the catchup), > + * or set the state to SYNCDONE if the sync was infront or in case > + * both sync and apply are at the same position it will set it to > + * READY and stops tracking it I'm not quite following here. > + * - if the state was set to CATCHUP sync will read the stream and > + * apply changes until it catches up to the specified stream > + * position and then sets state to READY and signals apply that it > + * can stop waiting and exits, if the state was set to something > + * else than CATCHUP the sync process will simply end > + * - if the state was set to SYNCDONE by apply, the apply will > + * continue tracking the table until it reaches the SYNCDONE stream > + * position at which point it sets state to READY and stops tracking > + * > + * Example flows look like this: > + * - Apply is infront: > + * sync:8 -> set SYNCWAIT > + * apply:10 -> set CATCHUP > + * sync:10 -> set ready > + * exit > + * apply:10 > + * stop tracking > + * continue rep > + * - Sync infront: > + * sync:10 > + * set SYNCWAIT > + * apply:8 > + * set SYNCDONE > + * sync:10 > + * exit > + * apply:10 > + * set READY > + * stop tracking > + * continue rep This definitely needs to be expanded a bit. Where are we tracking how far replication has progressed on individual tables? Are we creating new slots for syncing? Is there any parallelism in syncing? > +/* > + * Exit routine for synchronization worker. > + */ > +static void > +finish_sync_worker(char *slotname) > +{ > + LogicalRepWorker *worker; > + RepOriginId originid; > + MemoryContext oldctx = CurrentMemoryContext; > + > + /* > + * Drop the replication slot on remote server. > + * We want to continue even in the case that the slot on remote side > + * is already gone. This means that we can leave slot on the remote > + * side but that can happen for other reasons as well so we can't > + * really protect against that. > + */ > + PG_TRY(); > + { > + wrcapi->drop_slot(wrchandle, slotname); > + } > + PG_CATCH(); > + { > + MemoryContext ectx; > + ErrorData *edata; > + > + ectx = MemoryContextSwitchTo(oldctx); > + /* Save error info */ > + edata = CopyErrorData(); > + MemoryContextSwitchTo(ectx); > + FlushErrorState(); > + > + ereport(WARNING, > + (errmsg("there was problem dropping the > replication slot " > + "\"%s\" on provider", slotname), > + errdetail("The error was: %s", edata->message), > + errhint("You may have to drop it manually"))); > + FreeErrorData(edata); ISTM we really should rather return success/failure here, and not throw an error inside the libpqwalreceiver stuff. I kind of wonder if we actually can get rid of this indirection. > + /* Find the main apply worker and signal it. */ > + LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE); > + worker = logicalrep_worker_find(MyLogicalRepWorker->subid, InvalidOid); > + if (worker && worker->proc) > + SetLatch(&worker->proc->procLatch); > + LWLockRelease(LogicalRepWorkerLock); I'd rather do the SetLatch outside of the critical section. > +static bool > +wait_for_sync_status_change(TableState *tstate) > +{ > + int rc; > + char state = tstate->state; > + > + while (!got_SIGTERM) > + { > + StartTransactionCommand(); > + tstate->state = > GetSubscriptionRelState(MyLogicalRepWorker->subid, > + > tstate->relid, > + > &tstate->lsn, > + > true); > + CommitTransactionCommand(); > + > + /* Status record was removed. */ > + if (tstate->state == SUBREL_STATE_UNKNOWN) > + return false; > + > + if (tstate->state != state) > + return true; > + > + rc = WaitLatch(&MyProc->procLatch, > + WL_LATCH_SET | WL_TIMEOUT | > WL_POSTMASTER_DEATH, > + 10000L); > + > + /* emergency bailout if postmaster has died */ > + if (rc & WL_POSTMASTER_DEATH) > + proc_exit(1); > + > + ResetLatch(&MyProc->procLatch); broken indentation. > +/* > + * Read the state of the tables in the subscription and update our table > + * state list. > + */ > +static void > +reread_sync_state(Oid relid) > +{ > + dlist_mutable_iter iter; > + Relation rel; > + HeapTuple tup; > + ScanKeyData skey[2]; > + HeapScanDesc scan; > + > + /* Clean the old list. */ > + dlist_foreach_modify(iter, &table_states) > + { > + TableState *tstate = dlist_container(TableState, node, > iter.cur); > + > + dlist_delete(iter.cur); > + pfree(tstate); > + } > + > + /* > + * Fetch all the subscription relation states that are not marked as > + * ready and push them into our table state tracking list. > + */ > + rel = heap_open(SubscriptionRelRelationId, RowExclusiveLock); > + > + ScanKeyInit(&skey[0], > + Anum_pg_subscription_rel_subid, > + BTEqualStrategyNumber, F_OIDEQ, > + ObjectIdGetDatum(MyLogicalRepWorker->subid)); > + > + if (OidIsValid(relid)) > + { > + ScanKeyInit(&skey[1], > + Anum_pg_subscription_rel_subrelid, > + BTEqualStrategyNumber, F_OIDEQ, > + ObjectIdGetDatum(relid)); > + } > + else > + { > + ScanKeyInit(&skey[1], > + Anum_pg_subscription_rel_substate, > + BTEqualStrategyNumber, F_CHARNE, > + CharGetDatum(SUBREL_STATE_READY)); > + } > + > + scan = heap_beginscan_catalog(rel, 2, skey); Hm. So this is a seqscan. Shouldn't we make this use an index (depending on which branch is taken above)? > + while (HeapTupleIsValid(tup = heap_getnext(scan, ForwardScanDirection))) > + { > + Form_pg_subscription_rel subrel; > + TableState *tstate; > + MemoryContext oldctx; > + > + subrel = (Form_pg_subscription_rel) GETSTRUCT(tup); > + > + /* Allocate the tracking info in a permament memory context. */ s/permament/permanent/ > +/* > + * Handle table synchronization cooperation from the synchroniation > + * worker. > + */ > +static void > +process_syncing_tables_sync(char *slotname, XLogRecPtr end_lsn) > +{ > + TableState *tstate; > + TimeLineID tli; > + > + Assert(!IsTransactionState()); > + > + /* > + * Synchronization workers don't keep track of all synchronization > + * tables, they only care about their table. > + */ > + if (!table_states_valid) > + { > + StartTransactionCommand(); > + reread_sync_state(MyLogicalRepWorker->relid); > + CommitTransactionCommand(); > + } > + > + /* Somebody removed table underneath this worker, nothing more to do. */ > + if (dlist_is_empty(&table_states)) > + { > + wrcapi->endstreaming(wrchandle, &tli); > + finish_sync_worker(slotname); > + } > + > + /* Check if we are done with catchup now. */ > + tstate = dlist_container(TableState, node, > dlist_head_node(&table_states)); > + if (tstate->state == SUBREL_STATE_CATCHUP) > + { > + Assert(tstate->lsn != InvalidXLogRecPtr); > + > + if (tstate->lsn == end_lsn) > + { > + tstate->state = SUBREL_STATE_READY; > + tstate->lsn = InvalidXLogRecPtr; > + /* Update state of the synchronization. */ > + StartTransactionCommand(); > + SetSubscriptionRelState(MyLogicalRepWorker->subid, > + > tstate->relid, tstate->state, > + > tstate->lsn); > + CommitTransactionCommand(); > + > + wrcapi->endstreaming(wrchandle, &tli); > + finish_sync_worker(slotname); > + } > + return; > + } > +} The return inside the if is a bit weird. Makes one think it might be a loop or such. > +/* > + * Handle table synchronization cooperation from the apply worker. > + */ > +static void > +process_syncing_tables_apply(char *slotname, XLogRecPtr end_lsn) > +{ > + dlist_mutable_iter iter; > + > + Assert(!IsTransactionState()); > + > + if (!table_states_valid) > + { > + StartTransactionCommand(); > + reread_sync_state(InvalidOid); > + CommitTransactionCommand(); > + } So this pattern is repeated a bunch of times, maybe we can encapsulate that somewhat? Maybe like ensure_sync_state_valid() or such? > + dlist_foreach_modify(iter, &table_states) > + { > + TableState *tstate = dlist_container(TableState, node, > iter.cur); > + bool start_worker; > + LogicalRepWorker *worker; > + > + /* > + * When the synchronization process is at the cachup phase we > need s/cachup/catchup/ > + * to ensure that we are not behind it (it's going to wait at > this > + * point for the change of state). Once we are infront or at > the same > + * position as the synchronization proccess we can signal it to > + * finish the catchup. > + */ > + if (tstate->state == SUBREL_STATE_SYNCWAIT) > + { > + if (end_lsn > tstate->lsn) > + { > + /* > + * Apply is infront, tell sync to catchup. and > wait until > + * it does. > + */ > + tstate->state = SUBREL_STATE_CATCHUP; > + tstate->lsn = end_lsn; > + StartTransactionCommand(); > + > SetSubscriptionRelState(MyLogicalRepWorker->subid, > + > tstate->relid, tstate->state, > + > tstate->lsn); > + CommitTransactionCommand(); > + > + /* Signal the worker as it may be waiting for > us. */ > + LWLockAcquire(LogicalRepWorkerLock, LW_SHARED); > + worker = > logicalrep_worker_find(MyLogicalRepWorker->subid, > + > tstate->relid); > + if (worker && worker->proc) > + SetLatch(&worker->proc->procLatch); > + LWLockRelease(LogicalRepWorkerLock); Different parts of this file use different lock level to set the latch. Why? > + if (wait_for_sync_status_change(tstate)) > + Assert(tstate->state == > SUBREL_STATE_READY); > + } > + else > + { > + /* > + * Apply is either behind in which case sync > worker is done > + * but apply needs to keep tracking the table > until it > + * catches up to where sync finished. > + * Or apply and sync are at the same position > in which case > + * table can be switched to standard > replication mode > + * immediately. > + */ > + if (end_lsn < tstate->lsn) > + tstate->state = SUBREL_STATE_SYNCDONE; > + else > + tstate->state = SUBREL_STATE_READY; > + What I'm failing to understand is how this can be done under concurrency. You probably thought about this, but it should really be explained somewhere. > + StartTransactionCommand(); > + > SetSubscriptionRelState(MyLogicalRepWorker->subid, > + > tstate->relid, tstate->state, > + > tstate->lsn); > + CommitTransactionCommand(); > + > + /* Signal the worker as it may be waiting for > us. */ > + LWLockAcquire(LogicalRepWorkerLock, LW_SHARED); > + worker = > logicalrep_worker_find(MyLogicalRepWorker->subid, > + > tstate->relid); > + if (worker && worker->proc) > + SetLatch(&worker->proc->procLatch); > + LWLockRelease(LogicalRepWorkerLock); Oh, and again, please set latches outside of the lock. > + else if (tstate->state == SUBREL_STATE_SYNCDONE && > + end_lsn >= tstate->lsn) > + { > + /* > + * Apply catched up to the position where table sync > finished, > + * mark the table as ready for normal replication. > + */ Sentence needs to be rephrased a bit. > + /* > + * In case table is supposed to be synchronizing but the > + * synchronization worker is not running, start it. > + * Limit the number of launched workers here to one (for now). > + */ Hm. That seems problematic for online upgrade type cases, we might never be catch up that way... > +/* > + * Start syncing the table in the sync worker. > + */ > +char * > +LogicalRepSyncTableStart(XLogRecPtr *origin_startpos) > +{ > + StringInfoData s; > + TableState tstate; > + MemoryContext oldctx; > + char *slotname; > + > + /* Check the state of the table synchronization. */ > + StartTransactionCommand(); > + tstate.relid = MyLogicalRepWorker->relid; > + tstate.state = GetSubscriptionRelState(MySubscription->oid, > tstate.relid, > + > &tstate.lsn, false); > + > + /* > + * Build unique slot name. > + * TODO: protect against too long slot name. > + */ > + oldctx = MemoryContextSwitchTo(CacheMemoryContext); > + initStringInfo(&s); > + appendStringInfo(&s, "%s_sync_%s", MySubscription->slotname, > + get_rel_name(tstate.relid)); > + slotname = s.data; Is this memory freed somewhere? > + /* > + * We want to do the table data sync in single > + * transaction so do not close the transaction > opened > + * above. > + * There will be no BEGIN or COMMIT messages > coming via > + * logical replication while the copy table > command is > + * running so start the transaction here. > + * Note the memory context for data handling > will still > + * be done using ensure_transaction called by > the insert > + * handler. > + */ > + StartTransactionCommand(); > + > + /* > + * Don't allow parallel access other than > SELECT while > + * the initial contents are being copied. > + */ > + rel = heap_open(tstate.relid, ExclusiveLock); Why do we want to allow access at all? > @@ -87,6 +92,8 @@ _PG_output_plugin_init(OutputPluginCallbacks *cb) > cb->commit_cb = pgoutput_commit_txn; > cb->filter_by_origin_cb = pgoutput_origin_filter; > cb->shutdown_cb = pgoutput_shutdown; > + cb->tuple_cb = pgoutput_tuple; > + cb->list_tables_cb = pgoutput_list_tables; > } What are these new, and undocumented callbacks actually doing? And why is this integrated into logical decoding? > /* > + * Handle LIST_TABLES command. > + */ > +static void > +SendTableList(ListTablesCmd *cmd) > +{ Ugh. I really dislike this kind of command. I think we should instead change things around, allowing to issue normal SQL via the replication command. We'll have to error out for running sql for non-database connected replication connections, but that seems fine. Andres -- Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org) To make changes to your subscription: http://www.postgresql.org/mailpref/pgsql-hackers