On Mon, Sep 13, 2021 at 4:30 PM Tom Lane <[email protected]> wrote:
>
> The direct cause of that is that SPI_execute() doesn't permit the called
> query to perform COMMIT/ROLLBACK, which is because most callers would fail
> to cope with that. You can instruct SPI to allow that by replacing the
> SPI_execute() call with something like
>
> SPIExecuteOptions options;
>
> ...
> memset(&options, 0, sizeof(options));
> options.allow_nonatomic = true;
>
> ret = SPI_execute_extended(buf.data, &options);
>
I completely forgot about the SPI execute options... Thanks for the
explanation!!!
> However, that's not enough to make this example work :-(.
> I find that it still fails inside the procedure's COMMIT,
> with
>
> 2021-09-13 15:14:54.775 EDT worker_spi[476310] ERROR: portal snapshots
(0) did not account for all active snapshots (1)
> 2021-09-13 15:14:54.775 EDT worker_spi[476310] CONTEXT: PL/pgSQL
function schema4.counted_proc() line 1 at COMMIT
> SQL statement "CALL "schema4"."counted_proc"()"
>
> I think what this indicates is that worker_spi_main's cavalier
> management of the active snapshot isn't up to snuff for this
> use-case. The error is coming from ForgetPortalSnapshots, which
> is expecting that all active snapshots are attached to Portals;
> but that one isn't.
>
That is exactly the root cause of all my investigation.
At Timescale we have a scheduler (background worker) that launches another
background worker to "execute a job", and by executing a job it means to
call a function [1] or a procedure [2] directly without a SPI.
But now a user raised an issue about snapshots [3] and when I saw the code
for the first time I tried to use SPI and it didn't work as expected.
Even tweaking worker_spi to execute the procedure without SPI by calling
ExecuteCallStmt (attached) we end up with the same situation about the
active snapshots:
2021-09-13 20:14:36.654 -03 [21483] LOG: worker_spi worker 2 initialized
with schema2.counted
2021-09-13 20:14:36.655 -03 [21484] LOG: worker_spi worker 1 initialized
with schema1.counted
2021-09-13 20:14:36.657 -03 [21483] ERROR: portal snapshots (0) did not
account for all active snapshots (1)
2021-09-13 20:14:36.657 -03 [21483] CONTEXT: PL/pgSQL function
schema2.counted_proc() line 1 at COMMIT
2021-09-13 20:14:36.657 -03 [21484] ERROR: portal snapshots (0) did not
account for all active snapshots (1)
2021-09-13 20:14:36.657 -03 [21484] CONTEXT: PL/pgSQL function
schema1.counted_proc() line 1 at COMMIT
2021-09-13 20:14:36.659 -03 [21476] LOG: background worker "worker_spi"
(PID 21483) exited with exit code 1
2021-09-13 20:14:36.659 -03 [21476] LOG: background worker "worker_spi"
(PID 21484) exited with exit code 1
> Probably the most appropriate fix is to make worker_spi_main
> set up a Portal to run the query inside of. There are other
> bits of code that are not happy if they're not inside a Portal,
> so if you're hoping to run arbitrary SQL this way, sooner or
> later you're going to have to cross that bridge.
>
I started digging with it [4] by creating a Portal from scratch to execute
the Function or Procedure and it worked.
We're wondering if we can avoid the parser for PortalRun, can we??
Regards,
[1]
https://github.com/timescale/timescaledb/blob/master/tsl/src/bgw_policy/job.c#L726
[2]
https://github.com/timescale/timescaledb/blob/master/tsl/src/bgw_policy/job.c#L741
[3] https://github.com/timescale/timescaledb/issues/3545
[4]
https://github.com/fabriziomello/timescaledb/blob/issue/3545/tsl/src/bgw_policy/job.c#L824
--
Fabrízio de Royes Mello
diff --git a/src/test/modules/worker_spi/worker_spi.c b/src/test/modules/worker_spi/worker_spi.c
index d0acef2652..dace150fa7 100644
--- a/src/test/modules/worker_spi/worker_spi.c
+++ b/src/test/modules/worker_spi/worker_spi.c
@@ -42,6 +42,12 @@
#include "utils/snapmgr.h"
#include "tcop/utility.h"
+#include "nodes/makefuncs.h"
+#include "nodes/nodes.h"
+#include "nodes/pg_list.h"
+#include "parser/parse_func.h"
+#include "commands/defrem.h"
+
PG_MODULE_MAGIC;
PG_FUNCTION_INFO_V1(worker_spi_launch);
@@ -59,6 +65,7 @@ typedef struct worktable
{
const char *schema;
const char *name;
+ const char *proc;
} worktable;
/*
@@ -108,8 +115,20 @@ initialize_worker_spi(worktable *table)
" type text CHECK (type IN ('total', 'delta')), "
" value integer)"
"CREATE UNIQUE INDEX \"%s_unique_total\" ON \"%s\" (type) "
- "WHERE type = 'total'",
- table->schema, table->name, table->name, table->name);
+ "WHERE type = 'total'; "
+ "CREATE PROCEDURE \"%s\".\"%s\"() AS $$ "
+ "DECLARE "
+ " i INTEGER; "
+ "BEGIN "
+ " FOR i IN 1..10 "
+ " LOOP "
+ " INSERT INTO \"%s\".\"%s\" VALUES ('delta', i); "
+ " COMMIT; "
+ " END LOOP; "
+ "END; "
+ "$$ LANGUAGE plpgsql; ",
+ table->schema, table->name, table->name, table->name,
+ table->schema, table->proc, table->schema, table->name);
/* set statement start time */
SetCurrentStatementStartTimestamp();
@@ -137,11 +156,16 @@ worker_spi_main(Datum main_arg)
worktable *table;
StringInfoData buf;
char name[20];
+ FuncExpr *funcexpr;
+ Oid proc;
+ ObjectWithArgs *object;
+ MemoryContext oldcontext = CurrentMemoryContext;
table = palloc(sizeof(worktable));
sprintf(name, "schema%d", index);
table->schema = pstrdup(name);
table->name = pstrdup("counted");
+ table->proc = pstrdup("counted_proc");
/* Establish signal handlers before unblocking signals. */
pqsignal(SIGHUP, SignalHandlerForConfigReload);
@@ -157,6 +181,27 @@ worker_spi_main(Datum main_arg)
MyBgworkerEntry->bgw_name, table->schema, table->name);
initialize_worker_spi(table);
+ StartTransactionCommand();
+
+ /* build a function expression call */
+ object = makeNode(ObjectWithArgs);
+ object->objname = list_make2(makeString((char *)table->schema),
+ makeString((char *)table->proc));
+ proc = LookupFuncWithArgs(OBJECT_ROUTINE, object, false);
+
+ CommitTransactionCommand();
+
+ MemoryContextSwitchTo(oldcontext);
+
+ funcexpr = makeFuncExpr(proc,
+ VOIDOID,
+ NIL,
+ InvalidOid,
+ InvalidOid,
+ COERCE_EXPLICIT_CALL);
+
+ MemoryContextSwitchTo(oldcontext);
+
/*
* Quote identifiers passed to us. Note that this must be done after
* initialize_worker_spi, because that routine assumes the names are not
@@ -166,22 +211,10 @@ worker_spi_main(Datum main_arg)
*/
table->schema = quote_identifier(table->schema);
table->name = quote_identifier(table->name);
+ table->proc = quote_identifier(table->proc);
initStringInfo(&buf);
- appendStringInfo(&buf,
- "WITH deleted AS (DELETE "
- "FROM %s.%s "
- "WHERE type = 'delta' RETURNING value), "
- "total AS (SELECT coalesce(sum(value), 0) as sum "
- "FROM deleted) "
- "UPDATE %s.%s "
- "SET value = %s.value + total.sum "
- "FROM total WHERE type = 'total' "
- "RETURNING %s.value",
- table->schema, table->name,
- table->schema, table->name,
- table->name,
- table->name);
+ appendStringInfo(&buf, "CALL %s.%s()", table->schema, table->proc);
/*
* Main loop: do this until SIGTERM is received and processed by
@@ -189,7 +222,8 @@ worker_spi_main(Datum main_arg)
*/
for (;;)
{
- int ret;
+ CallStmt *call;
+ DestReceiver *dest;
/*
* Background workers mustn't call usleep() or any direct equivalent:
@@ -232,39 +266,19 @@ worker_spi_main(Datum main_arg)
*/
SetCurrentStatementStartTimestamp();
StartTransactionCommand();
- SPI_connect();
PushActiveSnapshot(GetTransactionSnapshot());
- debug_query_string = buf.data;
pgstat_report_activity(STATE_RUNNING, buf.data);
- /* We can now execute queries via SPI */
- ret = SPI_execute(buf.data, false, 0);
-
- if (ret != SPI_OK_UPDATE_RETURNING)
- elog(FATAL, "cannot select from table %s.%s: error code %d",
- table->schema, table->name, ret);
-
- if (SPI_processed > 0)
- {
- bool isnull;
- int32 val;
-
- val = DatumGetInt32(SPI_getbinval(SPI_tuptable->vals[0],
- SPI_tuptable->tupdesc,
- 1, &isnull));
- if (!isnull)
- elog(LOG, "%s: count in %s.%s is now %d",
- MyBgworkerEntry->bgw_name,
- table->schema, table->name, val);
- }
+ call = makeNode(CallStmt);
+ call->funcexpr = funcexpr;
+ dest = CreateDestReceiver(DestNone);
+ ExecuteCallStmt(call, NULL, false, dest);
/*
* And finish our transaction.
*/
- SPI_finish();
PopActiveSnapshot();
CommitTransactionCommand();
- debug_query_string = NULL;
pgstat_report_stat(false);
pgstat_report_activity(STATE_IDLE, NULL);
}