Hi all,
I'm trying to execute a PROCEDURE (with COMMIT inside) called from a
background worker using SPI but I'm always getting the error below:
2021-09-13 09:36:43.568 -03 [23845] LOG: worker_spi worker 2 initialized
with schema2.counted
2021-09-13 09:36:43.568 -03 [23846] LOG: worker_spi worker 1 initialized
with schema1.counted
2021-09-13 09:36:43.571 -03 [23846] ERROR: invalid transaction termination
2021-09-13 09:36:43.571 -03 [23846] CONTEXT: PL/pgSQL function
schema1.counted_proc() line 1 at COMMIT
SQL statement "CALL "schema1"."counted_proc"()"
2021-09-13 09:36:43.571 -03 [23846] STATEMENT: CALL
"schema1"."counted_proc"()
2021-09-13 09:36:43.571 -03 [23845] ERROR: invalid transaction termination
2021-09-13 09:36:43.571 -03 [23845] CONTEXT: PL/pgSQL function
schema2.counted_proc() line 1 at COMMIT
SQL statement "CALL "schema2"."counted_proc"()"
2021-09-13 09:36:43.571 -03 [23845] STATEMENT: CALL
"schema2"."counted_proc"()
2021-09-13 09:36:43.571 -03 [23838] LOG: background worker "worker_spi"
(PID 23845) exited with exit code 1
2021-09-13 09:36:43.571 -03 [23838] LOG: background worker "worker_spi"
(PID 23846) exited with exit code 1
I changed the worker_spi example (attached) a bit to execute a simple
procedure. Even using SPI_connect_ext(SPI_OPT_NONATOMIC) I'm getting the
error "invalid transaction termination".
There are something wrong with the attached example or am I missing
something?
Regards,
--
Fabrízio de Royes Mello
diff --git a/src/test/modules/worker_spi/worker_spi.c b/src/test/modules/worker_spi/worker_spi.c
index d0acef2652..0b9e4e9335 100644
--- a/src/test/modules/worker_spi/worker_spi.c
+++ b/src/test/modules/worker_spi/worker_spi.c
@@ -108,8 +108,20 @@ initialize_worker_spi(worktable *table)
" type text CHECK (type IN ('total', 'delta')), "
" value integer)"
"CREATE UNIQUE INDEX \"%s_unique_total\" ON \"%s\" (type) "
- "WHERE type = 'total'",
- table->schema, table->name, table->name, table->name);
+ "WHERE type = 'total'; "
+ "CREATE PROCEDURE \"%s\".\"%s_proc\"() AS $$ "
+ "DECLARE "
+ " i INTEGER; "
+ "BEGIN "
+ " FOR i IN 1..10 "
+ " LOOP "
+ " INSERT INTO \"%s\".\"%s\" VALUES ('delta', i); "
+ " COMMIT; "
+ " END LOOP; "
+ "END; "
+ "$$ LANGUAGE plpgsql; ",
+ table->schema, table->name, table->name, table->name,
+ table->schema, table->name, table->schema, table->name);
/* set statement start time */
SetCurrentStatementStartTimestamp();
@@ -168,20 +180,8 @@ worker_spi_main(Datum main_arg)
table->name = quote_identifier(table->name);
initStringInfo(&buf);
- appendStringInfo(&buf,
- "WITH deleted AS (DELETE "
- "FROM %s.%s "
- "WHERE type = 'delta' RETURNING value), "
- "total AS (SELECT coalesce(sum(value), 0) as sum "
- "FROM deleted) "
- "UPDATE %s.%s "
- "SET value = %s.value + total.sum "
- "FROM total WHERE type = 'total' "
- "RETURNING %s.value",
- table->schema, table->name,
- table->schema, table->name,
- table->name,
- table->name);
+
+ appendStringInfo(&buf, "CALL \"%s\".\"%s_proc\"()", table->schema, table->name);
/*
* Main loop: do this until SIGTERM is received and processed by
@@ -232,7 +232,7 @@ worker_spi_main(Datum main_arg)
*/
SetCurrentStatementStartTimestamp();
StartTransactionCommand();
- SPI_connect();
+ SPI_connect_ext(SPI_OPT_NONATOMIC);
PushActiveSnapshot(GetTransactionSnapshot());
debug_query_string = buf.data;
pgstat_report_activity(STATE_RUNNING, buf.data);
@@ -240,30 +240,15 @@ worker_spi_main(Datum main_arg)
/* We can now execute queries via SPI */
ret = SPI_execute(buf.data, false, 0);
- if (ret != SPI_OK_UPDATE_RETURNING)
- elog(FATAL, "cannot select from table %s.%s: error code %d",
- table->schema, table->name, ret);
-
- if (SPI_processed > 0)
- {
- bool isnull;
- int32 val;
-
- val = DatumGetInt32(SPI_getbinval(SPI_tuptable->vals[0],
- SPI_tuptable->tupdesc,
- 1, &isnull));
- if (!isnull)
- elog(LOG, "%s: count in %s.%s is now %d",
- MyBgworkerEntry->bgw_name,
- table->schema, table->name, val);
- }
+ if (ret != SPI_OK_UTILITY)
+ elog(FATAL, "failed to call procedure");
/*
* And finish our transaction.
*/
- SPI_finish();
PopActiveSnapshot();
CommitTransactionCommand();
+ SPI_finish();
debug_query_string = NULL;
pgstat_report_stat(false);
pgstat_report_activity(STATE_IDLE, NULL);