On Tue, Jun 23, 2026 at 4:09 AM Bharath Rupireddy
<[email protected]> wrote:
>
> On Fri, Jun 19, 2026 at 3:34 PM Masahiko Sawada <[email protected]> wrote:
> >
> > Hi all,
> >
> > Commit ac4645c015 allows pgoutput to send logical decoding messages,
> > but it's limited to applications that use the pgoutput plugin -- the
> > built-in logical replication doesn't use it. I'd like to propose
> > introducing a hook to the logical replication message handling so that
> > extensions can plug in their own handling routine. This feature can be
> > used for extensions to implement DDL replication, function
> > replication, or trigger user-specific routines on the subscriber side.
>
> Thanks for working on this!
>
> > I've attached the PoC patch; it adds a hook function, and adds a new
> > 'message' subscription option that allows the user to request the
> > publisher to send logical decoding messages. Therefore, users need to
> > enable the 'message' option and set up the hook function at server
> > startup in order to receive the messages and trigger the hook
> > function.
>
> I understand the intent of the proposal, but I'd like to get the
> bigger picture first.
>
> Do we have any external modules that actually implement DDL
> replication (or any of the listed use-cases) with a similar hook? Or
> any existing discussion? I could be missing something because I
> haven't looked at all the DDL replication related threads.
>
> Another thing I'm curious about - why a hook? Is the plan to implement
> DDL replication as an external module rather than in core? If DDL
> replication eventually gets into core, I'd expect it to be apply-side
> logic executing the decoded DDL messages directly, not something going
> through a hook.
>

I think it is important to have some example extension implementation
to see how the hook could be utilized. One more use of such a hook
could be to use for audit of DDLs replayed on subscribers. BTW, can we
also consider it as a solution implementing basic DDL replication for
tables? The key question is what if someday we have in-core DDL
replication. I think extensions can still be used to implement
filtering or transformation of DDL. We can implement capture of DDL
using JSON format [1] so that it is forward compatible with in-core
DDL replication. So considering that, the extension handlers will look
like:

 _PG_init(void)
  {
      /* Publisher side */
      prev_ProcessUtility = ProcessUtility_hook;
      ProcessUtility_hook = ddlrep_ProcessUtility;

.....
      /* Subscriber side */
      prev_message_handler  = logical_message_handler;
      logical_message_handler = ddlrep_message_handler;

static void
  ddlrep_ProcessUtility(PlannedStmt *pstmt,
                         const char *queryString,
                         bool readOnlyTree,
                         ProcessUtilityContext context,
                         ParamListInfo params,
                         QueryEnvironment *queryEnv,
                         DestReceiver *dest,
                         QueryCompletion *qc)
  {
      bool replicate = should_replicate(pstmt);

      /*
       * Execute the DDL first.
       */
      if (prev_ProcessUtility)
          prev_ProcessUtility(pstmt, queryString, readOnlyTree, context,
                              params, queryEnv, dest, qc);
      else
          standard_ProcessUtility(pstmt, queryString, readOnlyTree, context,
                                  params, queryEnv, dest, qc);

      if (replicate)
      {
          const char *tag =
GetCommandTagName(CreateCommandTag(pstmt->utilityStmt));
          char       *msg = build_ddl_message(tag, queryString);

          /*
           * transactional = true  → message held in ReorderBuffer, emitted
           * to subscribers at COMMIT in WAL order relative to any DML in
           * the same transaction.
           *
           * omit_lsn = false → include the LSN so the subscriber can log
           * exactly which WAL position a given DDL came from.
           */
          LogLogicalMessage("pg_ddl", msg, strlen(msg),
                            true  /* transactional */,
                            false /* omit_lsn */);

...

 static void
  ddlrep_message_handler(const char *prefix,
                          Size sz,
                          const char *message,
                          bool transactional,
                          XLogRecPtr lsn)
  {
      char           *payload;
      char           *ddl;
      char           *search_path;
      StringInfoData  cmd;
      int             spi_rc;

      /*
       * Always pass through to the previous handler first.  This ensures
       * correct behaviour when chained with other extensions.
       */
      if (prev_message_handler)
          prev_message_handler(prefix, sz, message, transactional, lsn);

      if (strcmp(prefix, "pg_ddl") != 0)
          return;

    /* Write code to execute/perform DDL. */

When we have a built-in handler then the apply worker carefully
registers the same and gives an ERROR if the extension one is already
registered.

void
  ApplyWorkerMain(Datum main_arg)
  {
      /* ... existing initialisation ... */

      /*
       * If the subscription requests built-in DDL replication and an
       * extension has also registered a logical message hook, both would
       * process the same "pg_ddl" messages and execute DDL twice.
       * Refuse to start rather than silently corrupt.
       */
      if (MySubscription->ddloption != DDL_OPTION_NONE &&
          logical_message_hook != NULL)
          ereport(ERROR,
                  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
                   ....

      /*
       * No conflict — register the built-in DDL handler when the
       * subscription requests DDL replication and no extension owns
       * the hook.
       */
      if (MySubscription->ddloption != DDL_OPTION_NONE)
          logical_message_hook = builtin_ddl_message_handler;

[1]
The JSON format for WAL could be of form to keep it extendable:

{
    "version": 1,
    "command_tag": "CREATE TABLE",
    "object_type": "table",
    "schema": "public",
    "identity": "public.foo",
    "ddl_text": "CREATE TABLE public.foo (id int PRIMARY KEY)",
    "search_path": "public"
 }


>
> Why not a hook at apply_dispatch to give external modules more freedom
> with the pgoutput plugin?
>

What advantage do you see with the same?

-- 
With Regards,
Amit Kapila.


Reply via email to