Here are some review comments for the patch v19-0004: ======
1. doc/src/sgml/ref/create_subscription.sgml @@ -244,6 +244,11 @@ CREATE SUBSCRIPTION <replaceable class="parameter">subscription_name</replaceabl column in the relation on the subscriber-side should also be the unique column on the publisher-side; 2) there cannot be any non-immutable functions used by the subscriber-side replicated table. + When applying a streaming transaction, if either requirement is not + met, the background worker will exit with an error. + <literal>parallel</literal> mode is disregarded when retrying; + instead the transaction will be applied using <literal>on</literal> + mode. </para> That last sentence starting with lowercase seems odd - that's why I thought saying "The parallel mode..." might be better. IMO "on mode" seems strange too. Hence my previous [1] (#4.3) suggestion for this SUGGESTION The <literal>parallel</literal> mode is disregarded when retrying; instead the transaction will be applied using <literal>streaming = on</literal>. ====== 2. src/backend/replication/logical/worker.c - start_table_sync @@ -3902,20 +3925,28 @@ start_table_sync(XLogRecPtr *origin_startpos, char **myslotname) } PG_CATCH(); { + /* + * Emit the error message, and recover from the error state to an idle + * state + */ + HOLD_INTERRUPTS(); + + EmitErrorReport(); + AbortOutOfAnyTransaction(); + FlushErrorState(); + + RESUME_INTERRUPTS(); + + /* Report the worker failed during table synchronization */ + pgstat_report_subscription_error(MySubscription->oid, false); + + /* Set the retry flag. */ + set_subscription_retry(true); + if (MySubscription->disableonerr) DisableSubscriptionAndExit(); - else - { - /* - * Report the worker failed during table synchronization. Abort - * the current transaction so that the stats message is sent in an - * idle state. - */ - AbortOutOfAnyTransaction(); - pgstat_report_subscription_error(MySubscription->oid, false); - PG_RE_THROW(); - } + proc_exit(0); } But is it correct to set the 'retry' flag even if the MySubscription->disableonerr is true? Won’t that mean even after the user corrects the problem and then re-enabled the subscription it still won't let the streaming=parallel work, because that retry flag is set? Also, Something seems wrong to me here - IIUC the patch changed this code because of the potential risk of an error within the set_subscription_retry function, but now if such an error happens the current code will bypass even getting to DisableSubscriptionAndExit, so the subscription won't have a chance to get disabled as the user might have wanted. ~~~ 3. src/backend/replication/logical/worker.c - start_apply @@ -3940,20 +3971,27 @@ start_apply(XLogRecPtr origin_startpos) } PG_CATCH(); { + /* + * Emit the error message, and recover from the error state to an idle + * state + */ + HOLD_INTERRUPTS(); + + EmitErrorReport(); + AbortOutOfAnyTransaction(); + FlushErrorState(); + + RESUME_INTERRUPTS(); + + /* Report the worker failed while applying changes */ + pgstat_report_subscription_error(MySubscription->oid, + !am_tablesync_worker()); + + /* Set the retry flag. */ + set_subscription_retry(true); + if (MySubscription->disableonerr) DisableSubscriptionAndExit(); - else - { - /* - * Report the worker failed while applying changes. Abort the - * current transaction so that the stats message is sent in an - * idle state. - */ - AbortOutOfAnyTransaction(); - pgstat_report_subscription_error(MySubscription->oid, !am_tablesync_worker()); - - PG_RE_THROW(); - } } (Same as previous review comment #2) But is it correct to set the 'retry' flag even if the MySubscription->disableonerr is true? Won’t that mean even after the user corrects the problem and then re-enabled the subscription it still won't let the streaming=parallel work, because that retry flag is set? Also, Something seems wrong to me here - IIUC the patch changed this code because of the potential risk of an error within the set_subscription_retry function, but now if such an error happens the current code will bypass even getting to DisableSubscriptionAndExit, so the subscription won't have a chance to get disabled as the user might have wanted. ~~~ 4. src/backend/replication/logical/worker.c - DisableSubscriptionAndExit /* - * After error recovery, disable the subscription in a new transaction - * and exit cleanly. + * Disable the subscription in a new transaction. */ static void DisableSubscriptionAndExit(void) { - /* - * Emit the error message, and recover from the error state to an idle - * state - */ - HOLD_INTERRUPTS(); - - EmitErrorReport(); - AbortOutOfAnyTransaction(); - FlushErrorState(); - - RESUME_INTERRUPTS(); - - /* Report the worker failed during either table synchronization or apply */ - pgstat_report_subscription_error(MyLogicalRepWorker->subid, - !am_tablesync_worker()); - /* Disable the subscription */ StartTransactionCommand(); DisableSubscription(MySubscription->oid); @@ -4231,8 +4252,6 @@ DisableSubscriptionAndExit(void) ereport(LOG, errmsg("logical replication subscription \"%s\" has been disabled due to an error", MySubscription->name)); - - proc_exit(0); } 4a. Hmm, I think it is a bad idea to remove the "exiting" code from the function but still leave the function name the same as before saying "AndExit". 4b. Also, now the patch is unconditionally doing proc_exit(0) in the calling code where previously it would do PG_RE_THROW. So it's a subtle difference from the path the code used to take for worker errors.. ~~~ 5. src/backend/replication/logical/worker.c - set_subscription_retry @@ -4467,3 +4486,63 @@ reset_apply_error_context_info(void) apply_error_callback_arg.remote_attnum = -1; set_apply_error_context_xact(InvalidTransactionId, InvalidXLogRecPtr); } + +/* + * Set subretry of pg_subscription catalog. + * + * If retry is true, subscriber is about to exit with an error. Otherwise, it + * means that the transaction was applied successfully. + */ +static void +set_subscription_retry(bool retry) +{ + Relation rel; + HeapTuple tup; + bool started_tx = false; + bool nulls[Natts_pg_subscription]; + bool replaces[Natts_pg_subscription]; + Datum values[Natts_pg_subscription]; + + if (MySubscription->retry == retry || + am_apply_bgworker()) + return; Currently, I think this new 'subretry' field is only used to decide whether a retry can use an apply background worker or not. I think all this logic is *only* used when streaming=parallel. But AFAICT the logic for setting/clearing the retry flag is executed *always* regardless of the streaming mode. So for all the times when the user did not ask for streaming=parallel doesn't this just cause unnecessary overhead for every transaction? ------ [1] https://www.postgresql.org/message-id/OS3PR01MB62758A6AAED27B3A848CEB7A9E8F9%40OS3PR01MB6275.jpnprd01.prod.outlook.com Kind Regards, Peter Smith. Fujitsu Australia