On 2016-03-22 14:03:06 +0100, Petr Jelinek wrote:
> On 22/03/16 12:47, Andres Freund wrote:
> >On 2016-03-21 18:10:55 +0100, Petr Jelinek wrote:
> >
> >>+
> >>+    <sect3 id="logicaldecoding-output-plugin-message">
> >>+     <title>Generic Message Callback</title>
> >>+
> >>+     <para>
> >>+      The optional <function>message_cb</function> callback is called 
> >>whenever
> >>+      a logical decoding message has been decoded.
> >>+<programlisting>
> >>+typedef void (*LogicalDecodeMessageCB) (
> >>+    struct LogicalDecodingContext *,
> >>+    ReorderBufferTXN *txn,
> >>+    XLogRecPtr message_lsn,
> >>+    const char *prefix,
> >>+    Size message_size,
> >>+    const char *message
> >>+);
> >
> >I see you removed the transactional parameter. I'm doubtful that that's
> >a good idea: It seems like it'd be rather helpful to pass the
> >transaction for a nontransaction message that's emitted while an xid was
> >assigned?
> >
> 
> Hmm but won't that give the output plugin even transactions that were later
> aborted? That seems quite different behavior from how the txn parameter
> works everywhere else.

Seems harmless to me if called out.


> >
> >>+/*
> >>+ * Handle rmgr LOGICALMSG_ID records for DecodeRecordIntoReorderBuffer().
> >>+ */
> >>+static void
> >>+DecodeLogicalMsgOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
> >>+{
> >>+   SnapBuild  *builder = ctx->snapshot_builder;
> >>+   XLogReaderState *r = buf->record;
> >>+   uint8           info = XLogRecGetInfo(r) & ~XLR_INFO_MASK;
> >>+   xl_logical_message *message;
> >>+
> >>+   if (info != XLOG_LOGICAL_MESSAGE)
> >>+           elog(ERROR, "unexpected RM_LOGICALMSG_ID record type: %u", 
> >>info);
> >>+
> >>+   message = (xl_logical_message *) XLogRecGetData(r);
> >>+
> >>+   if (message->transactional)
> >>+   {
> >>+           if (!SnapBuildProcessChange(builder, XLogRecGetXid(r), 
> >>buf->origptr))
> >>+                   return;
> >>+
> >>+           ReorderBufferQueueMessage(ctx->reorder, XLogRecGetXid(r),
> >>+                                                             buf->endptr,
> >>+                                                             
> >>message->message, /* first part of message is prefix */
> >>+                                                             
> >>message->message_size,
> >>+                                                             
> >>message->message + message->prefix_size);
> >>+   }
> >>+   else if (SnapBuildCurrentState(builder) == SNAPBUILD_CONSISTENT &&
> >>+                    !SnapBuildXactNeedsSkip(builder, buf->origptr))
> >>+   {
> >>+           volatile Snapshot       snapshot_now;
> >>+           ReorderBuffer      *rb = ctx->reorder;
> >>+
> >>+           /* setup snapshot to allow catalog access */
> >>+           snapshot_now = SnapBuildGetOrBuildSnapshot(builder, 
> >>XLogRecGetXid(r));
> >>+           SetupHistoricSnapshot(snapshot_now, NULL);
> >>+           rb->message(rb, NULL, buf->origptr, message->message,
> >>+                                   message->message_size,
> >>+                                   message->message + 
> >>message->prefix_size);
> >>+           TeardownHistoricSnapshot(false);
> >>+   }
> >>+}
> >
> >A number of things:
> >1) The SnapBuildProcessChange needs to be toplevel, not just for
> >    transactional messages - we can't yet necessarily build a snapshot.
> 
> Nope, the snapshot state is checked in the else if.
> 
> >2) I'm inclined to move even the non-transactional stuff to reorderbuffer.
> 
> Well, it's not doing anything with reorderbuffer but sure it can be done
> (didn't do it in the attached though).

I think there'll be some interactions if we ever do some parts in
parallel and such.  I'd rather keep decode.c to only do the lowest level
stuff.

Greetings,

Andres Freund


-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to