Hello!

+
+ SpinLockAcquire(&WalPipelineShm->mutex);
+
+ if (WalPipelineShm->initialized)
+ {
+ SpinLockRelease(&WalPipelineShm->mutex);
+ return;  /* Already started */
+ }
+

This doesn't seem to be a good use for a spinlock, as it guards a
longer operation. Spinlocks are supposed to guard "a few
instructions", not long initialization processes, according to their
documentation. Since the code already uses dsm segment, wouldn't it be
easier to use something like GetNamedDSMSegment which explicitly
supports this use case with an initialization callback?

Also see the next two more specific comments about errors and spinlocks.

+ case WAL_MSG_ERROR:
+ SpinLockAcquire(&WalPipelineShm->mutex);
+ ereport(ERROR,
+ (errcode(WalPipelineShm->error_code),
+ errmsg("[walpipeline] consumer: received error from the producer: %s",
+ WalPipelineShm->error_message)));
+ SpinLockRelease(&WalPipelineShm->mutex);
+ return NULL;

According to the documentation spinlocks are not automatically
released on errors, and ereport ERROR stops the code flow so
everything after that is dead code.

+ SpinLockAcquire(&WalPipelineShm->mutex);
+ elog(LOG, "[walpipeline] producer: exiting: sent=" UINT64_FORMAT "
received=" UINT64_FORMAT,
+ WalPipelineShm->records_sent, WalPipelineShm->records_received);
+ SpinLockRelease(&WalPipelineShm->mutex);

A LOG is not an error, but elog can call palloc, which can cause an
out of memory error, and then again we never release the spinlock.

+ if (msglen > WAL_PIPELINE_MAX_MSG_SIZE)
+ {
+ elog(WARNING, "[walpipeline] producer: wal record at %X/%X too large
(%zu bytes), skipping",
+ LSN_FORMAT_ARGS(record->ReadRecPtr), msglen);
+ pfree(buffer);
+ return true;
+ }

This doesn't seem like a good idea to me, won't skipping records cause
data corruption?

+ shm_mq_handle   *producer_mq_handle;
+ shm_mq_handle   *consumer_mq_handle;

Aren't these handles process local, yet stored in WalPipelineShmCtl?

+{ name => 'wal_pipeline', type => 'bool', context => 'PGC_SIGHUP',
group => 'WAL_RECOVERY',
+  short_desc => 'Use parallel workers to speedup recovery.',
+  variable => 'wal_pipeline_enabled',
+  boot_val => 'false',
+},

Is SIGHUP really useful for this feature? It only runs at startup.

+ elog(FATAL, "[walpipeline] consumer: either pipeline not active, or
no record available from pipeline.");
+ return record;

FATAL also stops the codeflow, so that return is never executed.

+/* Size of the shared memory queue (can be made configurable) */
+#define WAL_PIPELINE_QUEUE_SIZE  (128 * 1024 * 1024)  /* 8 MB */
+
+/* Maximum size of a single message */
+#define WAL_PIPELINE_MAX_MSG_SIZE  (2 * 1024 * 1024)  /* 1 MB */

The comments about the sizes seem to be off.

  if (reachedRecoveryTarget)
  {
+ if (wal_pipeline_enabled)
+ WalPipeline_Stop();

What if we didn't reach the recovery target, shouldn't we stop the
pipelines then?


+ /* Send shutdown message if queue is available */
+ if (consumer_mq_handle)
+ WalPipeline_SendShutdown();
+}

This seems wrong, WalPipeline_SendShutdown checks for the producer
handle inside it instead? What's the exact contract, who should call
these methods? By looking at the code I'm not sure if this shutdown
logic works as intended.


Reply via email to