Hi,

I'm not really convinced by RegisterStandbyMsgPrefix() et al. There's
not much documentation about what it actually is supposed to
acomplish. Afaics you're basically forced to use
shared_preload_libraries with it right now?  Also, iterating through a
linked list everytime something is logged doesn't seem very satisfying?


On 2016-02-24 18:35:16 +0100, Petr Jelinek wrote:
> +SET synchronous_commit = on;
> +SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 
> 'test_decoding');
> + ?column? 
> +----------
> + init
> +(1 row)

> +SELECT 'msg1' FROM pg_logical_send_message(true, 'test', 'msg1');
> + ?column? 
> +----------
> + msg1
> +(1 row)

Hm. Somehow 'sending' a message seems wrong here. Maybe 'emit'?

> +      <row>
> +       <entry id="pg-logical-send-message-text">
> +        <indexterm>
> +         <primary>pg_logical_send_message</primary>
> +        </indexterm>
> +        
> <literal><function>pg_logical_send_message(<parameter>transactional</parameter>
>  <type>bool</type>, <parameter>prefix</parameter> <type>text</type>, 
> <parameter>content</parameter> <type>text</type>)</function></literal>
> +       </entry>
> +       <entry>
> +        void
> +       </entry>
> +       <entry>
> +        Write text logical decoding message. This can be used to pass generic
> +        messages to logical decoding plugins through WAL. The parameter
> +        <parameter>transactional</parameter> specifies if the message should
> +        be part of current transaction or if it should be written and decoded
> +        immediately. The <parameter>prefix</parameter> has to be prefix which
> +        was registered by a plugin. The <parameter>content</parameter> is
> +        content of the message.
> +       </entry>
> +      </row>

It's not decoded immediately, even if emitted non-transactionally.

> +    <sect3 id="logicaldecoding-output-plugin-message">
> +     <title>Generic Message Callback</title>
> +
> +     <para>
> +      The optional <function>message_cb</function> callback is called 
> whenever
> +      a logical decoding message has been decoded.
> +<programlisting>
> +typedef void (*LogicalDecodeMessageCB) (
> +    struct LogicalDecodingContext *,
> +    ReorderBufferTXN *txn,
> +    XLogRecPtr message_lsn,
> +    bool transactional,
> +    const char *prefix,
> +    Size message_size,
> +    const char *message
> +);

We should at least document what txn is set to if not transactional.

> +void
> +logicalmsg_desc(StringInfo buf, XLogReaderState *record)
> +{
> +     char                       *rec = XLogRecGetData(record);
> +     xl_logical_message *xlrec = (xl_logical_message *) rec;
> +
> +     appendStringInfo(buf, "%s message size %zu bytes",
> +                                      xlrec->transactional ? "transactional" 
> : "nontransactional",
> +                                      xlrec->message_size);
> +}

Shouldn't we check
          uint8         info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
          if XLogRecGetInfo(record) == XLOG_LOGICAL_MESSAGE
here?

> +const char *
> +logicalmsg_identify(uint8 info)
> +{
> +     return NULL;
> +}

Huh?


> +void
> +ReorderBufferQueueMessage(ReorderBuffer *rb, TransactionId xid, XLogRecPtr 
> lsn,
> +                                               bool transactional, const 
> char *prefix, Size msg_sz,
> +                                               const char *msg)
> +{
> +     ReorderBufferTXN *txn = NULL;
> +
> +     if (transactional)
> +     {
> +             ReorderBufferChange *change;
> +
> +             txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
> +
> +             Assert(xid != InvalidTransactionId);
> +             Assert(txn != NULL);
> +
> +             change = ReorderBufferGetChange(rb);
> +             change->action = REORDER_BUFFER_CHANGE_MESSAGE;
> +             change->data.msg.transactional = true;
> +             change->data.msg.prefix = pstrdup(prefix);
> +             change->data.msg.message_size = msg_sz;
> +             change->data.msg.message = palloc(msg_sz);
> +             memcpy(change->data.msg.message, msg, msg_sz);
> +
> +             ReorderBufferQueueChange(rb, xid, lsn, change);
> +     }
> +     else
> +     {
> +             rb->message(rb, txn, lsn, transactional, prefix, msg_sz, msg);
> +     }
> +}


This approach prohibts catalog access when processing a nontransaction
message as there's no snapshot set up.

> +             case REORDER_BUFFER_CHANGE_MESSAGE:
> +                     {
> +                             char       *data;
> +                             size_t          prefix_size = 
> strlen(change->data.msg.prefix) + 1;
> +
> +                             sz += prefix_size + 
> change->data.msg.message_size;
> +                             ReorderBufferSerializeReserve(rb, sz);
> +
> +                             data = ((char *) rb->outbuf) + 
> sizeof(ReorderBufferDiskChange);
> +                             memcpy(data, change->data.msg.prefix,
> +                                        prefix_size);
> +                             memcpy(data + prefix_size, 
> change->data.msg.message,
> +                                        change->data.msg.message_size);
> +                             break;
> +                     }
>               case REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT:
>                       {
>                               Snapshot        snap;
> @@ -2354,6 +2415,18 @@ ReorderBufferRestoreChange(ReorderBuffer *rb, 
> ReorderBufferTXN *txn,
>                               data += len;
>                       }
>                       break;
> +             case REORDER_BUFFER_CHANGE_MESSAGE:
> +                     {
> +                             Size            message_size = 
> change->data.msg.message_size;
> +                             Size            prefix_size = strlen(data) + 1;
> +
> +                             change->data.msg.prefix = pstrdup(data);
> +                             change->data.msg.message = palloc(message_size);
> +                             memcpy(change->data.msg.message, data + 
> prefix_size,
> +                                        message_size);
> +
> +                             data += prefix_size + message_size;
> +                     }

Please add a test exercising these paths.


Greetings,

Andres Freund


-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to