On Mon, Dec 7, 2020 at 7:49 PM Amit Kapila <amit.kapil...@gmail.com> wrote: > As the slot of apply worker is created before all the tablesync > workers it should never miss any LSN which tablesync workers would > have processed. Also, the table sync workers should not process any > xact if the apply worker has not processed anything. I think tablesync > currently always processes one transaction (because we call > process_sync_tables at commit of a txn) even if that is not required > to be in sync with the apply worker. This should solve both the > problems (a) visibility of partial transactions (b) allow prepared > transactions because tablesync worker no longer needs to combine > multiple transactions data. > > I think the other advantages of this would be that it would reduce the > load (both CPU and I/O) on the publisher-side by allowing to decode > the data only once instead of for each table sync worker once and > separately for the apply worker. I think it will use fewer resources > to finish the work.
Yes, I observed this same behavior. IIUC the only way for the tablesync worker to go from CATCHUP mode to SYNCDONE is via the call to process_sync_tables. But a side-effect of this is, when messages arrive during this CATCHUP phase one tx will be getting handled by the tablesync worker before the process_sync_tables() is ever encountered. I have created and attached a simple patch which allows the tablesync to detect if there is anything to do *before* it enters the apply main loop. Calling process_sync_tables() before the apply main loop offers a quick way out so the message handling will not be split unnecessarily between the workers. ~ The result of the patch is demonstrated by the following test/logs which are also attached. Note: I added more logging (not in this patch) to make it easier to see what is going on. LOGS1. Current code. Test: 10 x INSERTS done at CATCHUP time. Result: tablesync worker does 1 x INSERT, then apply worker skips 1 and does remaining 9 x INSERTs. LOGS2. Patched code. Test: Same 10 x INSERTS done at CATCHUP time. Result: tablesync can exit early. apply worker handles all 10 x INSERTs LOGS3. Patched code. Test: 2PC PREPARE then COMMIT PREPARED [1] done at CATCHUP time psql -d test_pub -c "BEGIN;INSERT INTO test_tab VALUES(1, 'foo');PREPARE TRANSACTION 'test_prepared_tab';" psql -d test_pub -c "COMMIT PREPARED 'test_prepared_tab';" Result: The PREPARE and COMMIT PREPARED are both handle by apply worker. This avoids complications which the split otherwise causes. [1] 2PC prepare test requires v29 patch from https://www.postgresql.org/message-id/flat/CAMGcDxeqEpWj3fTXwqhSwBdXd2RS9jzwWscO-XbeCfso6ts3%2BQ%40mail.gmail.com --- Kind Regards, Peter Smith. Fujitsu Australia
## Not patched 2020-12-07 17:03:45.237 AEDT [26325] LOG: !!>> apply worker: called process_syncing_tables 2020-12-07 17:03:46.237 AEDT [26325] LOG: !!>> apply worker: LogicalRepApplyLoop 2020-12-07 17:03:46.238 AEDT [26325] LOG: !!>> apply worker: called process_syncing_tables 2020-12-07 17:03:47.240 AEDT [26325] LOG: !!>> apply worker: LogicalRepApplyLoop 2020-12-07 17:03:47.240 AEDT [26325] LOG: !!>> apply worker: called process_syncing_tables 2020-12-07 17:03:47.818 AEDT [27247] LOG: logical decoding found consistent point at 0/1624870 2020-12-07 17:03:47.818 AEDT [27247] DETAIL: There are no running transactions. 2020-12-07 17:03:47.818 AEDT [27247] STATEMENT: CREATE_REPLICATION_SLOT "tap_sub_16433_sync_16385" TEMPORARY LOGICAL pgoutput USE_SNAPSHOT 2020-12-07 17:03:47.820 AEDT [26325] LOG: !!>> apply worker: LogicalRepApplyLoop 2020-12-07 17:03:47.821 AEDT [26325] LOG: !!>> apply worker: called process_syncing_tables 2020-12-07 17:03:47.832 AEDT [26333] LOG: !!>> tablesync worker: wait for CATCHUP state notification 2020-12-07 17:03:47.832 AEDT [26325] LOG: !!>> apply worker: LogicalRepApplyLoop 2020-12-07 17:03:47.832 AEDT [26325] LOG: !!>> apply worker: called process_syncing_tables 2020-12-07 16:59:21.501 AEDT [19106] LOG: !!>> apply worker: called process_syncing_tables ## Attached and paused in debugger where the "tablesync" is told to CATCHUP. ### At this point we might execute any number of SQL commands e.g 10 inserts psql -d test_pub -c "INSERT INTO test_tab VALUES(1, 'foo');" psql -d test_pub -c "INSERT INTO test_tab VALUES(2, 'foo');" psql -d test_pub -c "INSERT INTO test_tab VALUES(3, 'foo');" psql -d test_pub -c "INSERT INTO test_tab VALUES(4, 'foo');" psql -d test_pub -c "INSERT INTO test_tab VALUES(5, 'foo');" psql -d test_pub -c "INSERT INTO test_tab VALUES(6, 'foo');" psql -d test_pub -c "INSERT INTO test_tab VALUES(7, 'foo');" psql -d test_pub -c "INSERT INTO test_tab VALUES(8, 'foo');" psql -d test_pub -c "INSERT INTO test_tab VALUES(9, 'foo');" psql -d test_pub -c "INSERT INTO test_tab VALUES(10, 'foo');" ## Now continue in the "tablesync" worker 2020-12-07 17:06:25.315 AEDT [26333] LOG: !!>> tablesync worker: LogicalRepApplyLoop 2020-12-07 17:06:52.606 AEDT [26333] LOG: !!>> tablesync worker: apply_dispatch for message kind 'B' 2020-12-07 17:07:12.815 AEDT [26333] LOG: !!>> tablesync worker: apply_dispatch for message kind 'R' 2020-12-07 17:07:29.047 AEDT [26333] LOG: !!>> tablesync worker: apply_dispatch for message kind 'I' 2020-12-07 17:07:33.650 AEDT [26333] LOG: !!>> tablesync worker: should_apply_changes_for_rel: true rel->state == READY: false rel->state == SYNCDONE: false rel->statelsn <= remote_final_lsn: true 2020-12-07 17:07:38.874 AEDT [26333] LOG: !!>> tablesync worker: apply_dispatch for message kind 'C' 2020-12-07 17:07:41.018 AEDT [26333] LOG: !!>> tablesync worker: called process_syncing_tables ## Here the "tablesync" finds that it CATCHUP stat and current_lsn >= MyLogicalRepWorker->relstate_lsn ## so it sets SYNCDONE assigned true and finishes. So "apply worker" handles all the other messages of the other 9 inserts 2020-12-07 17:07:42.192 AEDT [26333] LOG: logical replication table synchronization worker for subscription "tap_sub", table "test_tab" has finished 2020-12-07 17:07:42.193 AEDT [26325] LOG: !!>> apply worker: LogicalRepApplyLoop 2020-12-07 17:07:42.193 AEDT [26325] LOG: !!>> apply worker: apply_dispatch for message kind 'B' 2020-12-07 17:07:42.193 AEDT [26325] LOG: !!>> apply worker: apply_dispatch for message kind 'R' 2020-12-07 17:07:42.193 AEDT [26325] LOG: !!>> apply worker: apply_dispatch for message kind 'I' 2020-12-07 17:07:42.194 AEDT [26325] LOG: !!>> apply worker: should_apply_changes_for_rel: false rel->state == READY: false rel->state == SYNCDONE: true rel->statelsn <= remote_final_lsn: false 2020-12-07 17:07:42.194 AEDT [26325] LOG: !!>> apply worker: apply_dispatch for message kind 'C' 2020-12-07 17:07:42.194 AEDT [26325] LOG: !!>> apply worker: called process_syncing_tables 2020-12-07 17:07:42.194 AEDT [26325] LOG: !!>> apply worker: apply_dispatch for message kind 'B' 2020-12-07 17:07:42.194 AEDT [26325] LOG: !!>> apply worker: apply_dispatch for message kind 'I' 2020-12-07 17:07:42.195 AEDT [26325] LOG: !!>> apply worker: should_apply_changes_for_rel: true rel->state == READY: true rel->state == SYNCDONE: false rel->statelsn <= remote_final_lsn: true 2020-12-07 17:07:42.195 AEDT [26325] LOG: !!>> apply worker: apply_dispatch for message kind 'C' 2020-12-07 17:07:42.195 AEDT [26325] LOG: !!>> apply worker: called process_syncing_tables 2020-12-07 17:07:42.195 AEDT [26325] LOG: !!>> apply worker: apply_dispatch for message kind 'B' 2020-12-07 17:07:42.195 AEDT [26325] LOG: !!>> apply worker: apply_dispatch for message kind 'I' 2020-12-07 17:07:42.195 AEDT [26325] LOG: !!>> apply worker: should_apply_changes_for_rel: true rel->state == READY: true rel->state == SYNCDONE: false rel->statelsn <= remote_final_lsn: true 2020-12-07 17:07:42.195 AEDT [26325] LOG: !!>> apply worker: apply_dispatch for message kind 'C' 2020-12-07 17:07:42.195 AEDT [26325] LOG: !!>> apply worker: called process_syncing_tables 2020-12-07 17:07:42.195 AEDT [26325] LOG: !!>> apply worker: apply_dispatch for message kind 'B' 2020-12-07 17:07:42.195 AEDT [26325] LOG: !!>> apply worker: apply_dispatch for message kind 'I' 2020-12-07 17:07:42.195 AEDT [26325] LOG: !!>> apply worker: should_apply_changes_for_rel: true rel->state == READY: true rel->state == SYNCDONE: false rel->statelsn <= remote_final_lsn: true 2020-12-07 17:07:42.195 AEDT [26325] LOG: !!>> apply worker: apply_dispatch for message kind 'C' 2020-12-07 17:07:42.195 AEDT [26325] LOG: !!>> apply worker: called process_syncing_tables 2020-12-07 17:07:42.195 AEDT [26325] LOG: !!>> apply worker: apply_dispatch for message kind 'B' 2020-12-07 17:07:42.195 AEDT [26325] LOG: !!>> apply worker: apply_dispatch for message kind 'I' 2020-12-07 17:07:42.195 AEDT [26325] LOG: !!>> apply worker: should_apply_changes_for_rel: true rel->state == READY: true rel->state == SYNCDONE: false rel->statelsn <= remote_final_lsn: true 2020-12-07 17:07:42.195 AEDT [26325] LOG: !!>> apply worker: apply_dispatch for message kind 'C' 2020-12-07 17:07:42.195 AEDT [26325] LOG: !!>> apply worker: called process_syncing_tables 2020-12-07 17:07:42.195 AEDT [26325] LOG: !!>> apply worker: apply_dispatch for message kind 'B' 2020-12-07 17:07:42.195 AEDT [26325] LOG: !!>> apply worker: apply_dispatch for message kind 'I' 2020-12-07 17:07:42.195 AEDT [26325] LOG: !!>> apply worker: should_apply_changes_for_rel: true rel->state == READY: true rel->state == SYNCDONE: false rel->statelsn <= remote_final_lsn: true 2020-12-07 17:07:42.196 AEDT [26325] LOG: !!>> apply worker: apply_dispatch for message kind 'C' 2020-12-07 17:07:42.196 AEDT [26325] LOG: !!>> apply worker: called process_syncing_tables 2020-12-07 17:07:42.196 AEDT [26325] LOG: !!>> apply worker: apply_dispatch for message kind 'B' 2020-12-07 17:07:42.196 AEDT [26325] LOG: !!>> apply worker: apply_dispatch for message kind 'I' 2020-12-07 17:07:42.196 AEDT [26325] LOG: !!>> apply worker: should_apply_changes_for_rel: true rel->state == READY: true rel->state == SYNCDONE: false rel->statelsn <= remote_final_lsn: true 2020-12-07 17:07:42.196 AEDT [26325] LOG: !!>> apply worker: apply_dispatch for message kind 'C' 2020-12-07 17:07:42.196 AEDT [26325] LOG: !!>> apply worker: called process_syncing_tables 2020-12-07 17:07:42.196 AEDT [26325] LOG: !!>> apply worker: apply_dispatch for message kind 'B' 2020-12-07 17:07:42.196 AEDT [26325] LOG: !!>> apply worker: apply_dispatch for message kind 'I' 2020-12-07 17:07:42.196 AEDT [26325] LOG: !!>> apply worker: should_apply_changes_for_rel: true rel->state == READY: true rel->state == SYNCDONE: false rel->statelsn <= remote_final_lsn: true 2020-12-07 17:07:42.196 AEDT [26325] LOG: !!>> apply worker: apply_dispatch for message kind 'C' 2020-12-07 17:07:42.196 AEDT [26325] LOG: !!>> apply worker: called process_syncing_tables 2020-12-07 17:07:42.196 AEDT [26325] LOG: !!>> apply worker: apply_dispatch for message kind 'B' 2020-12-07 17:07:42.196 AEDT [26325] LOG: !!>> apply worker: apply_dispatch for message kind 'I' 2020-12-07 17:07:42.196 AEDT [26325] LOG: !!>> apply worker: should_apply_changes_for_rel: true rel->state == READY: true rel->state == SYNCDONE: false rel->statelsn <= remote_final_lsn: true 2020-12-07 17:07:42.196 AEDT [26325] LOG: !!>> apply worker: apply_dispatch for message kind 'C' 2020-12-07 17:07:42.196 AEDT [26325] LOG: !!>> apply worker: called process_syncing_tables 2020-12-07 17:07:42.196 AEDT [26325] LOG: !!>> apply worker: apply_dispatch for message kind 'B' 2020-12-07 17:07:42.196 AEDT [26325] LOG: !!>> apply worker: apply_dispatch for message kind 'I' 2020-12-07 17:07:42.196 AEDT [26325] LOG: !!>> apply worker: should_apply_changes_for_rel: true rel->state == READY: true rel->state == SYNCDONE: false rel->statelsn <= remote_final_lsn: true 2020-12-07 17:07:42.196 AEDT [26325] LOG: !!>> apply worker: apply_dispatch for message kind 'C' 2020-12-07 17:07:42.196 AEDT [26325] LOG: !!>> apply worker: called process_syncing_tables 2020-12-07 17:07:42.196 AEDT [26325] LOG: !!>> apply worker: called process_syncing_tables 2020-12-07 17:07:42.197 AEDT [26325] LOG: !!>> apply worker: LogicalRepApplyLoop 2020-12-07 17:07:42.197 AEDT [26325] LOG: !!>> apply worker: called process_syncing_tables 2020-12-07 17:07:42.397 AEDT [26325] LOG: !!>> apply worker: LogicalRepApplyLoop 2020-12-07 17:07:42.397 AEDT [26325] LOG: !!>> apply worker: called process_syncing_tables 2020-12-07 17:07:43.398 AEDT [26325] LOG: !!>> apply worker: LogicalRepApplyLoop 2020-12-07 17:07:43.398 AEDT [26325] LOG: !!>> apply worker: called process_syncing_tables
tablesync-early-exit.patch
Description: Binary data
## After the tablesync-early-exit patch is applied. ## Testing original PREPARED insert case (which was previously error prone) 2020-12-08 15:01:56.695 AEDT [4924] LOG: !!>> apply worker: called process_syncing_tables 2020-12-08 15:01:57.697 AEDT [4924] LOG: !!>> apply worker: LogicalRepApplyLoop 2020-12-08 15:01:57.697 AEDT [4924] LOG: !!>> apply worker: called process_syncing_tables 2020-12-08 15:01:58.699 AEDT [4924] LOG: !!>> apply worker: LogicalRepApplyLoop 2020-12-08 15:01:58.699 AEDT [4924] LOG: !!>> apply worker: called process_syncing_tables 2020-12-08 15:01:59.699 AEDT [4924] LOG: !!>> apply worker: LogicalRepApplyLoop 2020-12-08 15:01:59.699 AEDT [4924] LOG: !!>> apply worker: called process_syncing_tables 2020-12-08 15:02:00.699 AEDT [4924] LOG: !!>> apply worker: LogicalRepApplyLoop 2020-12-08 15:02:00.699 AEDT [4924] LOG: !!>> apply worker: called process_syncing_tables 2020-12-08 15:02:01.705 AEDT [4924] LOG: !!>> apply worker: LogicalRepApplyLoop 2020-12-08 15:02:01.705 AEDT [4924] LOG: !!>> apply worker: called process_syncing_tables 2020-12-08 15:02:02.637 AEDT [6134] LOG: logical decoding found consistent point at 0/1647FD0 2020-12-08 15:02:02.637 AEDT [6134] DETAIL: There are no running transactions. 2020-12-08 15:02:02.637 AEDT [6134] STATEMENT: CREATE_REPLICATION_SLOT "tap_sub_16458_sync_16385" TEMPORARY LOGICAL pgoutput USE_SNAPSHOT 2020-12-08 15:02:02.648 AEDT [4924] LOG: !!>> apply worker: LogicalRepApplyLoop 2020-12-08 15:02:02.648 AEDT [4924] LOG: !!>> apply worker: called process_syncing_tables 2020-12-08 15:02:02.649 AEDT [4931] LOG: !!>> tablesync worker: wait for CATCHUP state notification 2020-12-08 15:02:02.649 AEDT [4924] LOG: !!>> apply worker: LogicalRepApplyLoop 2020-12-08 15:02:02.649 AEDT [4924] LOG: !!>> apply worker: called process_syncing_tables ## In debugger during the tablesync CATCHUP mode send this: psql -d test_pub -c "BEGIN;INSERT INTO test_tab VALUES(1, 'foo');PREPARE TRANSACTION 'test_prepared_tab';" psql -d test_pub -c "COMMIT PREPARED 'test_prepared_tab';" ## Continue the tablesync to see what happens ## tablesync determines upfront there is nothing to do and it exits 2020-12-08 15:04:23.911 AEDT [4931] LOG: !!>> tablesync worker: called process_syncing_tables 2020-12-08 15:05:30.681 AEDT [4931] LOG: logical replication table synchronization worker for subscription "tap_sub", table "test_tab" has finished ## Apply worker then handles the PREPARE and also the COMMIT PREPARED 2020-12-08 15:05:30.682 AEDT [4924] LOG: !!>> apply worker: LogicalRepApplyLoop 2020-12-08 15:05:30.682 AEDT [4924] LOG: !!>> apply worker: apply_dispatch for message kind 'B' 2020-12-08 15:05:30.682 AEDT [4924] LOG: !!>> apply worker: apply_dispatch for message kind 'R' 2020-12-08 15:05:30.682 AEDT [4924] LOG: !!>> apply worker: apply_dispatch for message kind 'I' 2020-12-08 15:05:30.684 AEDT [4924] LOG: !!>> apply worker: should_apply_changes_for_rel: true rel->state == READY: false rel->state == SYNCDONE: true rel->statelsn <= remote_final_lsn: true 2020-12-08 15:05:30.684 AEDT [4924] LOG: !!>> apply worker: apply_dispatch for message kind 'P' 2020-12-08 15:05:30.686 AEDT [4924] LOG: !!>> apply worker: called process_syncing_tables 2020-12-08 15:05:30.686 AEDT [4924] LOG: !!>> apply worker: apply_dispatch for message kind 'P' 2020-12-08 15:05:30.688 AEDT [4924] LOG: !!>> apply worker: called process_syncing_tables 2020-12-08 15:05:30.688 AEDT [4924] LOG: !!>> apply worker: called process_syncing_tables 2020-12-08 15:05:31.690 AEDT [4924] LOG: !!>> apply worker: LogicalRepApplyLoop 2020-12-08 15:05:31.690 AEDT [4924] LOG: !!>> apply worker: called process_syncing_tables ... ## Pub/Sub records are in sync [postgres@CentOS7-x64 ~]$ psql -d test_pub -c "SELECT count(*) FROM test_tab;" count ------- 1 (1 row) [postgres@CentOS7-x64 ~]$ psql -d test_sub -p 54321 -c "SELECT count(*) FROM test_tab;" count ------- 1 (1 row)
TEST: After tablesyn-early-exit patch code change to avoid tablesync processing... ... 2020-12-08 14:25:12.820 AEDT [8895] LOG: !!>> apply worker: LogicalRepApplyLoop 2020-12-08 14:25:12.820 AEDT [8895] LOG: !!>> apply worker: called process_syncing_tables 2020-12-08 14:25:13.821 AEDT [8895] LOG: !!>> apply worker: LogicalRepApplyLoop 2020-12-08 14:25:13.821 AEDT [8895] LOG: !!>> apply worker: called process_syncing_tables 2020-12-08 14:25:14.125 AEDT [10125] LOG: logical decoding found consistent point at 0/162A760 2020-12-08 14:25:14.125 AEDT [10125] DETAIL: There are no running transactions. 2020-12-08 14:25:14.125 AEDT [10125] STATEMENT: CREATE_REPLICATION_SLOT "tap_sub_16438_sync_16385" TEMPORARY LOGICAL pgoutput USE_SNAPSHOT 2020-12-08 14:25:14.130 AEDT [8895] LOG: !!>> apply worker: LogicalRepApplyLoop 2020-12-08 14:25:14.130 AEDT [8895] LOG: !!>> apply worker: called process_syncing_tables 2020-12-08 14:25:14.144 AEDT [8904] LOG: !!>> tablesync worker: wait for CATCHUP state notification 2020-12-08 14:25:14.144 AEDT [8895] LOG: !!>> apply worker: LogicalRepApplyLoop 2020-12-08 14:25:14.144 AEDT [8895] LOG: !!>> apply worker: called process_syncing_tables ## Attached and paused in debugger where the "tablesync" is told to CATCHUP. ## At this point we might execute any number of SQL commands ## e.g 10 inserts psql -d test_pub -c "INSERT INTO test_tab VALUES(1, 'foo');" psql -d test_pub -c "INSERT INTO test_tab VALUES(2, 'foo');" psql -d test_pub -c "INSERT INTO test_tab VALUES(3, 'foo');" psql -d test_pub -c "INSERT INTO test_tab VALUES(4, 'foo');" psql -d test_pub -c "INSERT INTO test_tab VALUES(5, 'foo');" psql -d test_pub -c "INSERT INTO test_tab VALUES(6, 'foo');" psql -d test_pub -c "INSERT INTO test_tab VALUES(7, 'foo');" psql -d test_pub -c "INSERT INTO test_tab VALUES(8, 'foo');" psql -d test_pub -c "INSERT INTO test_tab VALUES(9, 'foo');" psql -d test_pub -c "INSERT INTO test_tab VALUES(10, 'foo');" ## Now continue in the "tablesync" worker ## But because of this patch the "tablesync" will exit before doing anything in the LogicalRepApplyLoop 2020-12-08 14:26:43.339 AEDT [8904] LOG: !!>> tablesync worker: called process_syncing_tables 2020-12-08 14:26:43.339 AEDT [8904] LOG: logical replication table synchronization worker for subscription "tap_sub", table "test_tab" has finished ## So the "apply" worker now will process all 10x INSERTs 2020-12-08 14:26:43.340 AEDT [8895] LOG: !!>> apply worker: LogicalRepApplyLoop 2020-12-08 14:26:43.340 AEDT [8895] LOG: !!>> apply worker: apply_dispatch for message kind 'B' 2020-12-08 14:26:43.340 AEDT [8895] LOG: !!>> apply worker: apply_dispatch for message kind 'R' 2020-12-08 14:26:43.340 AEDT [8895] LOG: !!>> apply worker: apply_dispatch for message kind 'I' 2020-12-08 14:26:43.341 AEDT [8895] LOG: !!>> apply worker: should_apply_changes_for_rel: true rel->state == READY: false rel->state == SYNCDONE: true rel->statelsn <= remote_final_lsn: true 2020-12-08 14:26:43.342 AEDT [8895] LOG: !!>> apply worker: apply_dispatch for message kind 'C' 2020-12-08 14:26:43.342 AEDT [8895] LOG: !!>> apply worker: called process_syncing_tables X 10 ... ## Then apply worker just waiting for next incoming message like normal 2020-12-08 14:26:43.344 AEDT [8895] LOG: !!>> apply worker: called process_syncing_tables 2020-12-08 14:26:43.344 AEDT [8895] LOG: !!>> apply worker: LogicalRepApplyLoop 2020-12-08 14:26:43.344 AEDT [8895] LOG: !!>> apply worker: called process_syncing_tables 2020-12-08 14:26:43.545 AEDT [8895] LOG: !!>> apply worker: LogicalRepApplyLoop 2020-12-08 14:26:43.545 AEDT [8895] LOG: !!>> apply worker: called process_syncing_tables 2020-12-08 14:26:43.748 AEDT [8895] LOG: !!>> apply worker: LogicalRepApplyLoop ...