On Fri, Nov 5, 2021 at 1:42 PM osumi.takami...@fujitsu.com <osumi.takami...@fujitsu.com> wrote: > > On Thursday, November 4, 2021 9:54 AM Greg Nancarrow <gregn4...@gmail.com> > wrote: > > On Tue, Nov 2, 2021 at 12:18 AM osumi.takami...@fujitsu.com > > <osumi.takami...@fujitsu.com> wrote: > > > > > > On Thursday, October 28, 2021 11:19 PM I wrote: > > > > I've created a new patch that extends pg_stat_subscription_workers > > > > to include other transaction statistics. > > > > > > > > Note that this patch depends on v18 patch-set in [1]... > > > Rebased based on the v19 in [1]. > > > Also, fixed documentation a little and made tests tidy. > > > FYI, the newly included TAP test(027_worker_xact_stats.pl) is stable > > > because I checked that 100 times of its execution in a tight loop all > > > passed. > > > > > > > I have done some basic testing of the patch and have some initial review > > comments: > Thanks for your review ! > > > (1) I think this patch needs to be in "git format-patch" format, with a > > proper > > commit message that describes the purpose of the patch and the functionality > > it adds, and any high-level design points (something like the overview > > given in > > the initial post, and accounting for the subsequent discussion points and > > updated functionality). > Fixed. > > > (2) doc/src/sgml/monitoring.sgml > > There are some grammatical issues in the current description. I suggest > > changing it to something like: > > BEFORE: > > + <entry>At least one row per subscription, showing about > > transaction statistics and error summary that > > AFTER: > > + <entry>At least one row per subscription, showing transaction > > statistics and information about errors that > Fixed. > > > (2) doc/src/sgml/monitoring.sgml > > The current description seems a little confusing. > > Per subscription, it shows the transaction statistics and any last error > > info from > > tablesync/apply workers? If this is the case, I'd suggest the following > > change: > > > > BEFORE: > > + one row per subscription for transaction statistics and summary of the > > last > > + error reported by workers applying logical replication changes and > > workers > > + handling the initial data copy of the subscribed tables. > > AFTER: > > + one row per subscription, showing corresponding transaction statistics > > and > > + information about the last error reported by workers applying > > logical replication > > + changes or by workers handling the initial data copy of the > > subscribed tables. > Fixed. > > > (3) xact_commit > > I think that the "xact_commit" column should be named "xact_commit_count" > > or "xact_commits". > > Similarly, I think "xact_error" should be named "xact_error_count" or > > "xact_errors", and "xact_aborts" should be named "xact_abort_count" or > > "xact_aborts". > I prefered *_count. Renamed. > > > (4) xact_commit_bytes > > > > + Amount of transactions data successfully applied in this > > subscription. > > + Consumed memory for xact_commit is displayed. > > > > I find this description a bit confusing. "Consumed memory for xact_commit" > > seems different to "transactions data". > > Could the description be something like: Amount of data (in bytes) > > successfully > > applied in this subscription, across "xact_commit_count" > > transactions. > Fixed. > > > (5) > > I'd suggest some minor rewording for the following: > > > > BEFORE: > > + Number of transactions failed to be applied and caught by table > > + sync worker or main apply worker in this subscription. > > AFTER: > > + Number of transactions that failed to be applied by the table > > + sync worker or main apply worker in this subscription. > Fixed. > > > (6) xact_error_bytes > > Again, it's a little confusing referring to "consumed memory" here. > > How about rewording this, something like: > > > > BEFORE: > > + Amount of transactions data unsuccessfully applied in this > > subscription. > > + Consumed memory that past failed transaction used is displayed. > > AFTER: > > + Amount of data (in bytes) unsuccessfully applied in this > > subscription by the last failed transaction. > xact_error_bytes (and other bytes columns as well) is cumulative > so when a new error happens, the size of this new bytes would be > added to the same. So here we shouldn't mention just the last error. > I simply applied your previous comments of 'xact_commit_bytes' > to 'xact_error_bytes' description. > > > (7) > > The additional information provided for "xact_abort_bytes" needs some > > rewording, something like: > > > > BEFORE: > > + Increase <literal>logical_decoding_work_mem</literal> on the > > publisher > > + so that it exceeds the size of whole streamed transaction > > + to suppress unnecessary consumed network bandwidth in addition to > > change > > + in memory of the subscriber, if unexpected amount of streamed > > transactions > > + are aborted. > > AFTER: > > + In order to suppress unnecessary consumed network bandwidth, > > increase > > + <literal>logical_decoding_work_mem</literal> on the publisher so > > that it > > + exceeds the size of the whole streamed transaction, and > > additionally increase > > + the available subscriber memory, if an unexpected amount of > > streamed transactions > > + are aborted. > > I'm not sure about the last part. > > additionally increase the available subscriber memory, > Which GUC parameter did you mean by this ? > Could we point out and enalrge the memory size only for > subscriber's apply processing intentionally ? > I incorporated (7) except for this last part. > Will revise according to your reply. > > I also added the explanation about > xact_abort_bytes itself to align with other bytes columns. > > > (8) > > Suggested update: > > > > BEFORE: > > + * Tell the collector that worker transaction has finished without > > problem. > > AFTER: > > + * Tell the collector that the worker transaction has successfully > > completed. > Fixed. > > > (9) src/backend/postmaster/pgstat.c > > I think that the GID copying is unnecessarily copying the whole GID buffer > > or > > using an additional strlen(). > > It should be changed to use strlcpy() to match other code: > > > > BEFORE: > > + /* get the gid for this two phase operation */ if (command == > > + LOGICAL_REP_MSG_PREPARE || > > + command == LOGICAL_REP_MSG_STREAM_PREPARE) > > + memcpy(msg.m_gid, prepare_data->gid, GIDSIZE); else if (command == > > + LOGICAL_REP_MSG_COMMIT_PREPARED) > > + memcpy(msg.m_gid, commit_data->gid, GIDSIZE); else /* rollback > > + prepared */ > > + memcpy(msg.m_gid, rollback_data->gid, GIDSIZE); > > AFTER: > > + /* get the gid for this two phase operation */ if (command == > > + LOGICAL_REP_MSG_PREPARE || > > + command == LOGICAL_REP_MSG_STREAM_PREPARE) > > + strlcpy(msg.m_gid, prepare_data->gid, sizeof(msg.m_gid)); else if > > + (command == LOGICAL_REP_MSG_COMMIT_PREPARED) > > + strlcpy(msg.m_gid, commit_data->gid, sizeof(msg.m_gid)); else /* > > + rollback prepared */ > > + strlcpy(msg.m_gid, rollback_data->gid, sizeof(msg.m_gid)); > Fixed. > > > > > BEFORE: > > + strlcpy(prepared_txn->gid, msg->m_gid, strlen(msg->m_gid) + 1); > > AFTER: > > + strlcpy(prepared_txn->gid, msg->m_gid, sizeof(prepared_txn->gid)); > > > > BEFORE: > > + memcpy(key.gid, msg->m_gid, strlen(msg->m_gid)); > > AFTER: > > + strlcpy(key.gid, msg->m_gid, sizeof(key.gid)); > > > > BEFORE: > > + memcpy(key.gid, gid, strlen(gid)); > > AFTER: > > + strlcpy(key.gid, gid, sizeof(key.gid)); > Fixed. > > > (10) src/backend/replication/logical/worker.c > > Some suggested rewording: > > > > BEFORE: > > + * size of streaming transaction resources because it have used the > > AFTER: > > + * size of streaming transaction resources because it has used the > Fixed. > > > BEFORE: > > + * tradeoff should not be good. Also, add multiple values > > + * at once in order to reduce the number of this function call. > > AFTER: > > + * tradeoff would not be good. Also, add multiple values > > + * at once in order to reduce the number of calls to this function. > Fixed. > > > (11) update_apply_change_size() > > Shouldn't this function be declared static? > Fixed. > > > (12) stream_write_change() > > > > + streamed_entry->xact_size = streamed_entry->xact_size + total_len; > > /* update */ > > > > could be simply written as: > > > > + streamed_entry->xact_size += total_len; /* update */ > Fixed. > > Lastly, I removed one unnecessary test that > checked publisher's stats in the TAP tests. > Also I introduced ApplyTxnExtraData structure to > remove void* argument of update_apply_change_size > that might worsen the readability of codes > in the previous version.
Thanks for the updated patch. Few comments: 1) You could remove LogicalRepPreparedTxnData, LogicalRepCommitPreparedTxnData & LogicalRepRollbackPreparedTxnData and change it to char *gid to reduce the function parameter and simiplify the assignment: + */ +void +pgstat_report_subworker_twophase_xact(Oid subid, LogicalRepMsgType command, + PgStat_Counter xact_size, + LogicalRepPreparedTxnData *prepare_data, + LogicalRepCommitPreparedTxnData *commit_data, + LogicalRepRollbackPreparedTxnData *rollback_data) 2) Shouldn't this change be part of skip xid patch? - TupleDescInitEntry(tupdesc, (AttrNumber) 4, "command", + TupleDescInitEntry(tupdesc, (AttrNumber) 10, "last_error_command", TEXTOID, -1, 0); - TupleDescInitEntry(tupdesc, (AttrNumber) 5, "xid", + TupleDescInitEntry(tupdesc, (AttrNumber) 11, "last_error_xid", XIDOID, -1, 0); - TupleDescInitEntry(tupdesc, (AttrNumber) 6, "error_count", + TupleDescInitEntry(tupdesc, (AttrNumber) 12, "last_error_count", INT8OID, -1, 0); - TupleDescInitEntry(tupdesc, (AttrNumber) 7, "error_message", + TupleDescInitEntry(tupdesc, (AttrNumber) 13, "last_error_message", TEXTOID, -1, 0); - TupleDescInitEntry(tupdesc, (AttrNumber) 8, "last_error_time", + TupleDescInitEntry(tupdesc, (AttrNumber) 14, "last_error_time", 3) This newly added structures should be added to typedefs.list: ApplyTxnExtraData XactSizeEntry PgStat_MsgSubWorkerXactEnd PgStat_MsgSubWorkerTwophaseXact PgStat_StatSubWorkerPreparedXact PgStat_StatSubWorkerPreparedXactSize 4) We are not sending the transaction size in case of table sync, is this intentional, if so I felt we should document this in pg_stat_subscription_workers + /* Report the success of table sync. */ + pgstat_report_subworker_xact_end(MyLogicalRepWorker->subid, + MyLogicalRepWorker->relid, + 0 /* no logical message type */, + 0 /* xact size */); + 5) pg_stat_subscription_workers has a lot of columns, if we can reduce the column size the readability will improve, like xact_commit_count to commit_count, xact_commit_bytes to commit_bytes, etc + w.xact_commit_count, + w.xact_commit_bytes, + w.xact_error_count, + w.xact_error_bytes, + w.xact_abort_count, + w.xact_abort_bytes, Regards, Vignesh