On 2016-03-22 14:03:06 +0100, Petr Jelinek wrote: > On 22/03/16 12:47, Andres Freund wrote: > >On 2016-03-21 18:10:55 +0100, Petr Jelinek wrote: > > > >>+ > >>+ <sect3 id="logicaldecoding-output-plugin-message"> > >>+ <title>Generic Message Callback</title> > >>+ > >>+ <para> > >>+ The optional <function>message_cb</function> callback is called > >>whenever > >>+ a logical decoding message has been decoded. > >>+<programlisting> > >>+typedef void (*LogicalDecodeMessageCB) ( > >>+ struct LogicalDecodingContext *, > >>+ ReorderBufferTXN *txn, > >>+ XLogRecPtr message_lsn, > >>+ const char *prefix, > >>+ Size message_size, > >>+ const char *message > >>+); > > > >I see you removed the transactional parameter. I'm doubtful that that's > >a good idea: It seems like it'd be rather helpful to pass the > >transaction for a nontransaction message that's emitted while an xid was > >assigned? > > > > Hmm but won't that give the output plugin even transactions that were later > aborted? That seems quite different behavior from how the txn parameter > works everywhere else.
Seems harmless to me if called out. > > > >>+/* > >>+ * Handle rmgr LOGICALMSG_ID records for DecodeRecordIntoReorderBuffer(). > >>+ */ > >>+static void > >>+DecodeLogicalMsgOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) > >>+{ > >>+ SnapBuild *builder = ctx->snapshot_builder; > >>+ XLogReaderState *r = buf->record; > >>+ uint8 info = XLogRecGetInfo(r) & ~XLR_INFO_MASK; > >>+ xl_logical_message *message; > >>+ > >>+ if (info != XLOG_LOGICAL_MESSAGE) > >>+ elog(ERROR, "unexpected RM_LOGICALMSG_ID record type: %u", > >>info); > >>+ > >>+ message = (xl_logical_message *) XLogRecGetData(r); > >>+ > >>+ if (message->transactional) > >>+ { > >>+ if (!SnapBuildProcessChange(builder, XLogRecGetXid(r), > >>buf->origptr)) > >>+ return; > >>+ > >>+ ReorderBufferQueueMessage(ctx->reorder, XLogRecGetXid(r), > >>+ buf->endptr, > >>+ > >>message->message, /* first part of message is prefix */ > >>+ > >>message->message_size, > >>+ > >>message->message + message->prefix_size); > >>+ } > >>+ else if (SnapBuildCurrentState(builder) == SNAPBUILD_CONSISTENT && > >>+ !SnapBuildXactNeedsSkip(builder, buf->origptr)) > >>+ { > >>+ volatile Snapshot snapshot_now; > >>+ ReorderBuffer *rb = ctx->reorder; > >>+ > >>+ /* setup snapshot to allow catalog access */ > >>+ snapshot_now = SnapBuildGetOrBuildSnapshot(builder, > >>XLogRecGetXid(r)); > >>+ SetupHistoricSnapshot(snapshot_now, NULL); > >>+ rb->message(rb, NULL, buf->origptr, message->message, > >>+ message->message_size, > >>+ message->message + > >>message->prefix_size); > >>+ TeardownHistoricSnapshot(false); > >>+ } > >>+} > > > >A number of things: > >1) The SnapBuildProcessChange needs to be toplevel, not just for > > transactional messages - we can't yet necessarily build a snapshot. > > Nope, the snapshot state is checked in the else if. > > >2) I'm inclined to move even the non-transactional stuff to reorderbuffer. > > Well, it's not doing anything with reorderbuffer but sure it can be done > (didn't do it in the attached though). I think there'll be some interactions if we ever do some parts in parallel and such. I'd rather keep decode.c to only do the lowest level stuff. Greetings, Andres Freund -- Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org) To make changes to your subscription: http://www.postgresql.org/mailpref/pgsql-hackers