On Mon, Mar 8, 2021 at 7:17 AM Peter Smith <smithpb2...@gmail.com> wrote: > > Please find attached the latest patch set v52* > Few comments:
+logicalrep_read_begin_prepare(StringInfo in, LogicalRepBeginPrepareData *begin_data) +{ + /* read fields */ + begin_data->final_lsn = pq_getmsgint64(in); + if (begin_data->final_lsn == InvalidXLogRecPtr) + elog(ERROR, "final_lsn not set in begin message"); + begin_data->end_lsn = pq_getmsgint64(in); + if (begin_data->end_lsn == InvalidXLogRecPtr) + elog(ERROR, "end_lsn not set in begin message"); + begin_data->committime = pq_getmsgint64(in); + begin_data->xid = pq_getmsgint(in, 4); + + /* read gid (copy it into a pre-allocated buffer) */ + strcpy(begin_data->gid, pq_getmsgstring(in)); +} In logicalrep_read_begin_prepare we validate final_lsn & end_lsn. But this validation is not done in logicalrep_read_commit_prepared and logicalrep_read_rollback_prepared. Should we keep it consistent? @@ -170,5 +237,4 @@ extern void logicalrep_write_stream_abort(StringInfo out, TransactionId xid, TransactionId subxid); extern void logicalrep_read_stream_abort(StringInfo in, TransactionId *xid, TransactionId *subxid); - #endif /* LOGICAL_PROTO_H */ This change is not required. @@ -242,15 +244,16 @@ create_replication_slot: $$ = (Node *) cmd; } /* CREATE_REPLICATION_SLOT slot TEMPORARY LOGICAL plugin */ - | K_CREATE_REPLICATION_SLOT IDENT opt_temporary K_LOGICAL IDENT create_slot_opt_list + | K_CREATE_REPLICATION_SLOT IDENT opt_temporary opt_two_phase K_LOGICAL IDENT create_slot_opt_list { CreateReplicationSlotCmd *cmd; cmd = makeNode(CreateReplicationSlotCmd); cmd->kind = REPLICATION_KIND_LOGICAL; cmd->slotname = $2; cmd->temporary = $3; - cmd->plugin = $5; - cmd->options = $6; + cmd->two_phase = $4; + cmd->plugin = $6; + cmd->options = $7; $$ = (Node *) cmd; } Should we document two_phase in the below section: CREATE_REPLICATION_SLOT slot_name [ TEMPORARY ] { PHYSICAL [ RESERVE_WAL ] | LOGICAL output_plugin [ EXPORT_SNAPSHOT | NOEXPORT_SNAPSHOT | USE_SNAPSHOT ] } Create a physical or logical replication slot. See Section 27.2.6 for more about replication slots. + while (AnyTablesyncInProgress()) + { + process_syncing_tables(begin_data.final_lsn); + + /* This latch is to prevent 100% CPU looping. */ + (void) WaitLatch(MyLatch, + WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH, + 1000L, WAIT_EVENT_LOGICAL_SYNC_STATE_CHANGE); + ResetLatch(MyLatch); + } Should we have CHECK_FOR_INTERRUPTS inside the while loop? + if (begin_data.final_lsn < BiggestTablesyncLSN()) + { + char psfpath[MAXPGPATH]; + + /* + * Create the spoolfile. + */ + prepare_spoolfile_name(psfpath, sizeof(psfpath), + MyLogicalRepWorker->subid, begin_data.gid); + prepare_spoolfile_create(psfpath); We can make this as a single line comment. + if (!found) + { + elog(DEBUG1, "Not found file \"%s\". Create it.", path); + psf_cur.vfd = PathNameOpenFile(path, O_RDWR | O_CREAT | O_TRUNC | PG_BINARY); + if (psf_cur.vfd < 0) + { + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not create file \"%s\": %m", path))); + } + memcpy(psf_cur.name, path, sizeof(psf_cur.name)); + psf_cur.cur_offset = 0; + hentry->allow_delete = true; + } + else + { + /* + * Open the file and seek to the beginning because we always want to + * create/overwrite this file. + */ + elog(DEBUG1, "Found file \"%s\". Overwrite it.", path); + psf_cur.vfd = PathNameOpenFile(path, O_RDWR | O_CREAT | O_TRUNC | PG_BINARY); + if (psf_cur.vfd < 0) + { + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not open file \"%s\": %m", path))); + } + memcpy(psf_cur.name, path, sizeof(psf_cur.name)); + psf_cur.cur_offset = 0; + hentry->allow_delete = true; + } Except the elog message the rest of the code is the same in both if and else, we can move the common code outside. LOGICAL_REP_MSG_TYPE = 'Y', + LOGICAL_REP_MSG_BEGIN_PREPARE = 'b', + LOGICAL_REP_MSG_PREPARE = 'P', + LOGICAL_REP_MSG_COMMIT_PREPARED = 'K', + LOGICAL_REP_MSG_ROLLBACK_PREPARED = 'r', LOGICAL_REP_MSG_STREAM_START = 'S', LOGICAL_REP_MSG_STREAM_END = 'E', LOGICAL_REP_MSG_STREAM_COMMIT = 'c', - LOGICAL_REP_MSG_STREAM_ABORT = 'A' + LOGICAL_REP_MSG_STREAM_ABORT = 'A', + LOGICAL_REP_MSG_STREAM_PREPARE = 'p' } LogicalRepMsgType; As we start adding more and more features, we will have to start adding more message types, using meaningful characters might become difficult. Should we start using numeric instead for the new feature getting added? Regards. Vignesh