Hi

This is a little bit of an enhanced version of the previous patch. The
worst case overhead is reduced almost to zero. The local resource owner is
created only when routine is executed in non-atomic mode, and when routine
contains a CALL statement.

Regards

Pavel
diff --git a/doc/src/sgml/spi.sgml b/doc/src/sgml/spi.sgml
index f5e0a35da0..8cf66f8c7b 100644
--- a/doc/src/sgml/spi.sgml
+++ b/doc/src/sgml/spi.sgml
@@ -1722,6 +1722,138 @@ int SPI_execute_plan(SPIPlanPtr <parameter>plan</parameter>, Datum * <parameter>
 
 <!-- *********************************************** -->
 
+<refentry id="spi-spi-execute-plan-extended">
+ <indexterm><primary>SPI_execute_plan_extended</primary></indexterm>
+
+ <refmeta>
+  <refentrytitle>SPI_execute_plan_extended</refentrytitle>
+  <manvolnum>3</manvolnum>
+ </refmeta>
+
+ <refnamediv>
+  <refname>SPI_execute_plan_extended</refname>
+  <refpurpose>execute a statement prepared by <function>SPI_prepare</function></refpurpose>
+ </refnamediv>
+
+ <refsynopsisdiv>
+<synopsis>
+int SPI_execute_plan_extended(SPIPlanPtr <parameter>plan</parameter>,
+                              ParamListInfo <parameter>params</parameter>,
+                              bool <parameter>read_only</parameter>,
+                              long <parameter>count</parameter>,
+                              const SPIExecuteOptions * <parameter>options</parameter>)
+</synopsis>
+ </refsynopsisdiv>
+
+ <refsect1>
+  <title>Description</title>
+
+  <para>
+   <function>SPI_execute_plan_extended</function> executes a statement
+   prepared by <function>SPI_prepare</function>.
+   This function is equivalent to <function>SPI_execute_plan_with_paramlist</function>,
+   but allows to pass additional options.
+  </para>
+ </refsect1>
+
+ <refsect1>
+  <title>Arguments</title>
+
+  <variablelist>
+   <varlistentry>
+    <term><literal>SPIPlanPtr <parameter>plan</parameter></literal></term>
+    <listitem>
+     <para>
+      prepared statement (returned by <function>SPI_prepare</function>)
+     </para>
+    </listitem>
+   </varlistentry>
+
+   <varlistentry>
+    <term><literal>ParamListInfo <parameter>params</parameter></literal></term>
+    <listitem>
+     <para>
+      data structure containing parameter types and values; NULL if none
+     </para>
+    </listitem>
+   </varlistentry>
+
+   <varlistentry>
+    <term><literal>bool <parameter>read_only</parameter></literal></term>
+    <listitem>
+     <para><literal>true</literal> for read-only execution</para>
+    </listitem>
+   </varlistentry>
+
+   <varlistentry>
+    <term><literal>long <parameter>count</parameter></literal></term>
+    <listitem>
+     <para>
+      maximum number of rows to return,
+      or <literal>0</literal> for no limit
+     </para>
+    </listitem>
+   </varlistentry>
+
+   <varlistentry>
+    <term><literal>const SPIPExecuteOptions * <parameter>options</parameter></literal></term>
+    <listitem>
+     <para>
+      struct containing optional arguments
+     </para>
+    </listitem>
+   </varlistentry>
+  </variablelist>
+
+  <para>
+   Callers should always zero out the entire <parameter>options</parameter>
+   struct, then fill whichever fields they want to set.  This ensures forward
+   compatibility of code, since any fields that are added to the struct in
+   future will be defined to behave backwards-compatibly if they are zero.
+   The currently available <parameter>options</parameter> fields are:
+  </para>
+
+  <variablelist>
+   <varlistentry>
+    <term><literal>DestReceiver * <parameter>dest</parameter></literal></term>
+    <listitem>
+     <para>
+      Query target
+     </para>
+    </listitem>
+   </varlistentry>
+
+   <varlistentry>
+    <term><literal>ResourceOwner <parameter>owner</parameter></literal></term>
+    <listitem>
+     <para>
+      The resource owner that will be used for plan execution. Now it is used
+      as cached query plan owner only. This resource owner should to have longer
+      life cycle than executed command. Current resource owner is used when
+      this option is NULL.
+     </para>
+    </listitem>
+   </varlistentry>
+  </variablelist>
+ </refsect1>
+
+ <refsect1>
+  <title>Return Value</title>
+
+  <para>
+   The return value is the same as for <function>SPI_execute_plan</function>.
+  </para>
+
+  <para>
+   <varname>SPI_processed</varname> and
+   <varname>SPI_tuptable</varname> are set as in
+   <function>SPI_execute_plan</function> if successful.
+  </para>
+ </refsect1>
+</refentry>
+
+<!-- *********************************************** -->
+
 <refentry id="spi-spi-execute-plan-with-paramlist">
  <indexterm><primary>SPI_execute_plan_with_paramlist</primary></indexterm>
 
diff --git a/src/backend/commands/prepare.c b/src/backend/commands/prepare.c
index 653ef8e41a..3de2932a03 100644
--- a/src/backend/commands/prepare.c
+++ b/src/backend/commands/prepare.c
@@ -230,7 +230,7 @@ ExecuteQuery(ParseState *pstate,
 									   entry->plansource->query_string);
 
 	/* Replan if needed, and increment plan refcount for portal */
-	cplan = GetCachedPlan(entry->plansource, paramLI, false, NULL);
+	cplan = GetCachedPlan(entry->plansource, paramLI, NULL, NULL);
 	plan_list = cplan->stmt_list;
 
 	/*
@@ -651,7 +651,7 @@ ExplainExecuteQuery(ExecuteStmt *execstmt, IntoClause *into, ExplainState *es,
 	}
 
 	/* Replan if needed, and acquire a transient refcount */
-	cplan = GetCachedPlan(entry->plansource, paramLI, true, queryEnv);
+	cplan = GetCachedPlan(entry->plansource, paramLI, CurrentResourceOwner, queryEnv);
 
 	INSTR_TIME_SET_CURRENT(planduration);
 	INSTR_TIME_SUBTRACT(planduration, planstart);
@@ -687,7 +687,7 @@ ExplainExecuteQuery(ExecuteStmt *execstmt, IntoClause *into, ExplainState *es,
 	if (estate)
 		FreeExecutorState(estate);
 
-	ReleaseCachedPlan(cplan, true);
+	ReleaseCachedPlan(cplan, CurrentResourceOwner);
 }
 
 /*
diff --git a/src/backend/executor/spi.c b/src/backend/executor/spi.c
index e28d242922..17df852959 100644
--- a/src/backend/executor/spi.c
+++ b/src/backend/executor/spi.c
@@ -67,7 +67,7 @@ static void _SPI_prepare_oneshot_plan(const char *src, SPIPlanPtr plan);
 static int	_SPI_execute_plan(SPIPlanPtr plan, ParamListInfo paramLI,
 							  Snapshot snapshot, Snapshot crosscheck_snapshot,
 							  bool read_only, bool fire_triggers, uint64 tcount,
-							  DestReceiver *caller_dest);
+							  DestReceiver *caller_dest, ResourceOwner owner);
 
 static ParamListInfo _SPI_convert_params(int nargs, Oid *argtypes,
 										 Datum *Values, const char *Nulls);
@@ -521,7 +521,7 @@ SPI_execute(const char *src, bool read_only, long tcount)
 
 	res = _SPI_execute_plan(&plan, NULL,
 							InvalidSnapshot, InvalidSnapshot,
-							read_only, true, tcount, NULL);
+							read_only, true, tcount, NULL, NULL);
 
 	_SPI_end_call(true);
 	return res;
@@ -555,7 +555,7 @@ SPI_execute_plan(SPIPlanPtr plan, Datum *Values, const char *Nulls,
 							_SPI_convert_params(plan->nargs, plan->argtypes,
 												Values, Nulls),
 							InvalidSnapshot, InvalidSnapshot,
-							read_only, true, tcount, NULL);
+							read_only, true, tcount, NULL, NULL);
 
 	_SPI_end_call(true);
 	return res;
@@ -573,21 +573,15 @@ int
 SPI_execute_plan_with_paramlist(SPIPlanPtr plan, ParamListInfo params,
 								bool read_only, long tcount)
 {
-	int			res;
-
-	if (plan == NULL || plan->magic != _SPI_PLAN_MAGIC || tcount < 0)
-		return SPI_ERROR_ARGUMENT;
+	SPIExecuteOptions opts;
 
-	res = _SPI_begin_call(true);
-	if (res < 0)
-		return res;
+	opts.dest = NULL;
+	opts.owner = NULL;
 
-	res = _SPI_execute_plan(plan, params,
-							InvalidSnapshot, InvalidSnapshot,
-							read_only, true, tcount, NULL);
-
-	_SPI_end_call(true);
-	return res;
+	return SPI_execute_plan_extended(plan,
+									 params,
+									 read_only, tcount,
+									 &opts);
 }
 
 /*
@@ -601,6 +595,27 @@ SPI_execute_plan_with_receiver(SPIPlanPtr plan,
 							   ParamListInfo params,
 							   bool read_only, long tcount,
 							   DestReceiver *dest)
+{
+	SPIExecuteOptions opts;
+
+	opts.dest = dest;
+	opts.owner = NULL;
+
+	return SPI_execute_plan_extended(plan,
+									 params,
+									 read_only, tcount,
+									 &opts);
+}
+
+/*
+ * Execute a previously prepared plan with possibility to
+ * specify other options.
+ */
+int
+SPI_execute_plan_extended(SPIPlanPtr plan,
+						  ParamListInfo params,
+						  bool read_only, long tcount,
+						  SPIExecuteOptions *options)
 {
 	int			res;
 
@@ -613,7 +628,8 @@ SPI_execute_plan_with_receiver(SPIPlanPtr plan,
 
 	res = _SPI_execute_plan(plan, params,
 							InvalidSnapshot, InvalidSnapshot,
-							read_only, true, tcount, dest);
+							read_only, true, tcount,
+							options->dest, options->owner);
 
 	_SPI_end_call(true);
 	return res;
@@ -654,7 +670,7 @@ SPI_execute_snapshot(SPIPlanPtr plan,
 							_SPI_convert_params(plan->nargs, plan->argtypes,
 												Values, Nulls),
 							snapshot, crosscheck_snapshot,
-							read_only, fire_triggers, tcount, NULL);
+							read_only, fire_triggers, tcount, NULL, NULL);
 
 	_SPI_end_call(true);
 	return res;
@@ -702,7 +718,7 @@ SPI_execute_with_args(const char *src,
 
 	res = _SPI_execute_plan(&plan, paramLI,
 							InvalidSnapshot, InvalidSnapshot,
-							read_only, true, tcount, NULL);
+							read_only, true, tcount, NULL, NULL);
 
 	_SPI_end_call(true);
 	return res;
@@ -746,7 +762,7 @@ SPI_execute_with_receiver(const char *src,
 
 	res = _SPI_execute_plan(&plan, params,
 							InvalidSnapshot, InvalidSnapshot,
-							read_only, true, tcount, dest);
+							read_only, true, tcount, dest, NULL);
 
 	_SPI_end_call(true);
 	return res;
@@ -1554,7 +1570,7 @@ SPI_cursor_open_internal(const char *name, SPIPlanPtr plan,
 	 */
 
 	/* Replan if needed, and increment plan refcount for portal */
-	cplan = GetCachedPlan(plansource, paramLI, false, _SPI_current->queryEnv);
+	cplan = GetCachedPlan(plansource, paramLI, NULL, _SPI_current->queryEnv);
 	stmt_list = cplan->stmt_list;
 
 	if (!plan->saved)
@@ -1568,7 +1584,7 @@ SPI_cursor_open_internal(const char *name, SPIPlanPtr plan,
 		oldcontext = MemoryContextSwitchTo(portal->portalContext);
 		stmt_list = copyObject(stmt_list);
 		MemoryContextSwitchTo(oldcontext);
-		ReleaseCachedPlan(cplan, false);
+		ReleaseCachedPlan(cplan, NULL);
 		cplan = NULL;			/* portal shouldn't depend on cplan */
 	}
 
@@ -1984,7 +2000,8 @@ SPI_plan_get_cached_plan(SPIPlanPtr plan)
 	error_context_stack = &spierrcontext;
 
 	/* Get the generic plan for the query */
-	cplan = GetCachedPlan(plansource, NULL, plan->saved,
+	cplan = GetCachedPlan(plansource, NULL,
+						  plan->saved ? CurrentResourceOwner : NULL,
 						  _SPI_current->queryEnv);
 	Assert(cplan == plansource->gplan);
 
@@ -2269,12 +2286,15 @@ _SPI_prepare_oneshot_plan(const char *src, SPIPlanPtr plan)
  *		false means any AFTER triggers are postponed to end of outer query
  * tcount: execution tuple-count limit, or 0 for none
  * caller_dest: DestReceiver to receive output, or NULL for normal SPI output
+ * called_owner: ResourceOwner that will be used as cached plan's owner. When
+ *		NULL is passed and resource owner is necessary because plan is cached
+ *		plan, then CurrentResourceOwner is used.
  */
 static int
 _SPI_execute_plan(SPIPlanPtr plan, ParamListInfo paramLI,
 				  Snapshot snapshot, Snapshot crosscheck_snapshot,
 				  bool read_only, bool fire_triggers, uint64 tcount,
-				  DestReceiver *caller_dest)
+				  DestReceiver *caller_dest, ResourceOwner caller_owner)
 {
 	int			my_res = 0;
 	uint64		my_processed = 0;
@@ -2284,6 +2304,7 @@ _SPI_execute_plan(SPIPlanPtr plan, ParamListInfo paramLI,
 	SPICallbackArg spicallbackarg;
 	ErrorContextCallback spierrcontext;
 	CachedPlan *cplan = NULL;
+	ResourceOwner plan_owner = NULL;
 	ListCell   *lc1;
 
 	/*
@@ -2333,6 +2354,14 @@ _SPI_execute_plan(SPIPlanPtr plan, ParamListInfo paramLI,
 		}
 	}
 
+	/* Saved plan requires resource owner */
+	if (!plan->saved)
+		plan_owner = NULL;
+	else if (caller_owner)
+		plan_owner = caller_owner;
+	else
+		plan_owner = CurrentResourceOwner;
+
 	foreach(lc1, plan->plancache_list)
 	{
 		CachedPlanSource *plansource = (CachedPlanSource *) lfirst(lc1);
@@ -2388,9 +2417,13 @@ _SPI_execute_plan(SPIPlanPtr plan, ParamListInfo paramLI,
 
 		/*
 		 * Replan if needed, and increment plan refcount.  If it's a saved
-		 * plan, the refcount must be backed by the CurrentResourceOwner.
+		 * plan, the refcount must be backed by the plan_owner.
 		 */
-		cplan = GetCachedPlan(plansource, paramLI, plan->saved, _SPI_current->queryEnv);
+		cplan = GetCachedPlan(plansource,
+							  paramLI,
+							  plan_owner,
+							  _SPI_current->queryEnv);
+
 		stmt_list = cplan->stmt_list;
 
 		/*
@@ -2586,7 +2619,7 @@ _SPI_execute_plan(SPIPlanPtr plan, ParamListInfo paramLI,
 		}
 
 		/* Done with this plan, so release refcount */
-		ReleaseCachedPlan(cplan, plan->saved);
+		ReleaseCachedPlan(cplan, plan_owner);
 		cplan = NULL;
 
 		/*
@@ -2606,7 +2639,7 @@ fail:
 
 	/* We no longer need the cached plan refcount, if any */
 	if (cplan)
-		ReleaseCachedPlan(cplan, plan->saved);
+		ReleaseCachedPlan(cplan, plan_owner);
 
 	/*
 	 * Pop the error context stack
diff --git a/src/backend/tcop/postgres.c b/src/backend/tcop/postgres.c
index 8dab9fd578..cb5a96117f 100644
--- a/src/backend/tcop/postgres.c
+++ b/src/backend/tcop/postgres.c
@@ -1963,7 +1963,7 @@ exec_bind_message(StringInfo input_message)
 	 * will be generated in MessageContext.  The plan refcount will be
 	 * assigned to the Portal, so it will be released at portal destruction.
 	 */
-	cplan = GetCachedPlan(psrc, params, false, NULL);
+	cplan = GetCachedPlan(psrc, params, NULL, NULL);
 
 	/*
 	 * Now we can define the portal.
diff --git a/src/backend/utils/cache/plancache.c b/src/backend/utils/cache/plancache.c
index cc04b5b4be..01e6b13ea0 100644
--- a/src/backend/utils/cache/plancache.c
+++ b/src/backend/utils/cache/plancache.c
@@ -533,7 +533,7 @@ ReleaseGenericPlan(CachedPlanSource *plansource)
 
 		Assert(plan->magic == CACHEDPLAN_MAGIC);
 		plansource->gplan = NULL;
-		ReleaseCachedPlan(plan, false);
+		ReleaseCachedPlan(plan, NULL);
 	}
 }
 
@@ -1131,15 +1131,16 @@ cached_plan_cost(CachedPlan *plan, bool include_planner)
  *
  * On return, the refcount of the plan has been incremented; a later
  * ReleaseCachedPlan() call is expected.  The refcount has been reported
- * to the CurrentResourceOwner if useResOwner is true (note that that must
- * only be true if it's a "saved" CachedPlanSource).
+ * to the ResourceOwner owner when it is specified (note that that must
+ * only be if it's a "saved" CachedPlanSource).
  *
  * Note: if any replanning activity is required, the caller's memory context
  * is used for that work.
  */
 CachedPlan *
 GetCachedPlan(CachedPlanSource *plansource, ParamListInfo boundParams,
-			  bool useResOwner, QueryEnvironment *queryEnv)
+			  ResourceOwner owner,
+			  QueryEnvironment *queryEnv)
 {
 	CachedPlan *plan = NULL;
 	List	   *qlist;
@@ -1149,7 +1150,7 @@ GetCachedPlan(CachedPlanSource *plansource, ParamListInfo boundParams,
 	Assert(plansource->magic == CACHEDPLANSOURCE_MAGIC);
 	Assert(plansource->is_complete);
 	/* This seems worth a real test, though */
-	if (useResOwner && !plansource->is_saved)
+	if (owner && !plansource->is_saved)
 		elog(ERROR, "cannot apply ResourceOwner to non-saved cached plan");
 
 	/* Make sure the querytree list is valid and we have parse-time locks */
@@ -1228,11 +1229,11 @@ GetCachedPlan(CachedPlanSource *plansource, ParamListInfo boundParams,
 	Assert(plan != NULL);
 
 	/* Flag the plan as in use by caller */
-	if (useResOwner)
-		ResourceOwnerEnlargePlanCacheRefs(CurrentResourceOwner);
+	if (owner)
+		ResourceOwnerEnlargePlanCacheRefs(owner);
 	plan->refcount++;
-	if (useResOwner)
-		ResourceOwnerRememberPlanCacheRef(CurrentResourceOwner, plan);
+	if (owner)
+		ResourceOwnerRememberPlanCacheRef(owner, plan);
 
 	/*
 	 * Saved plans should be under CacheMemoryContext so they will not go away
@@ -1253,21 +1254,20 @@ GetCachedPlan(CachedPlanSource *plansource, ParamListInfo boundParams,
  * ReleaseCachedPlan: release active use of a cached plan.
  *
  * This decrements the reference count, and frees the plan if the count
- * has thereby gone to zero.  If useResOwner is true, it is assumed that
- * the reference count is managed by the CurrentResourceOwner.
+ * has thereby gone to zero.
  *
- * Note: useResOwner = false is used for releasing references that are in
+ * Note: owner = NULL is used for releasing references that are in
  * persistent data structures, such as the parent CachedPlanSource or a
  * Portal.  Transient references should be protected by a resource owner.
  */
 void
-ReleaseCachedPlan(CachedPlan *plan, bool useResOwner)
+ReleaseCachedPlan(CachedPlan *plan, ResourceOwner owner)
 {
 	Assert(plan->magic == CACHEDPLAN_MAGIC);
-	if (useResOwner)
+	if (owner)
 	{
 		Assert(plan->is_saved);
-		ResourceOwnerForgetPlanCacheRef(CurrentResourceOwner, plan);
+		ResourceOwnerForgetPlanCacheRef(owner, plan);
 	}
 	Assert(plan->refcount > 0);
 	plan->refcount--;
diff --git a/src/backend/utils/mmgr/portalmem.c b/src/backend/utils/mmgr/portalmem.c
index 22c6d2ba5b..66e3181815 100644
--- a/src/backend/utils/mmgr/portalmem.c
+++ b/src/backend/utils/mmgr/portalmem.c
@@ -310,7 +310,7 @@ PortalReleaseCachedPlan(Portal portal)
 {
 	if (portal->cplan)
 	{
-		ReleaseCachedPlan(portal->cplan, false);
+		ReleaseCachedPlan(portal->cplan, NULL);
 		portal->cplan = NULL;
 
 		/*
diff --git a/src/backend/utils/resowner/resowner.c b/src/backend/utils/resowner/resowner.c
index 10f15f6a35..a171df573c 100644
--- a/src/backend/utils/resowner/resowner.c
+++ b/src/backend/utils/resowner/resowner.c
@@ -652,7 +652,7 @@ ResourceOwnerReleaseInternal(ResourceOwner owner,
 
 			if (isCommit)
 				PrintPlanCacheLeakWarning(res);
-			ReleaseCachedPlan(res, true);
+			ReleaseCachedPlan(res, owner);
 		}
 
 		/* Ditto for tupdesc references */
@@ -703,18 +703,14 @@ ResourceOwnerReleaseInternal(ResourceOwner owner,
 void
 ResourceOwnerReleaseAllPlanCacheRefs(ResourceOwner owner)
 {
-	ResourceOwner save;
 	Datum		foundres;
 
-	save = CurrentResourceOwner;
-	CurrentResourceOwner = owner;
 	while (ResourceArrayGetAny(&(owner->planrefarr), &foundres))
 	{
 		CachedPlan *res = (CachedPlan *) DatumGetPointer(foundres);
 
-		ReleaseCachedPlan(res, true);
+		ReleaseCachedPlan(res, owner);
 	}
-	CurrentResourceOwner = save;
 }
 
 /*
diff --git a/src/include/executor/spi.h b/src/include/executor/spi.h
index 9c70603434..dc91cf891e 100644
--- a/src/include/executor/spi.h
+++ b/src/include/executor/spi.h
@@ -42,6 +42,13 @@ typedef struct SPIPrepareOptions
 	int			cursorOptions;
 } SPIPrepareOptions;
 
+/* Optional arguments for SPI_execute_extended */
+typedef struct SPIExecuteOptions
+{
+	DestReceiver *dest;
+	ResourceOwner owner;
+} SPIExecuteOptions;
+
 /* Plans are opaque structs for standard users of SPI */
 typedef struct _SPI_plan *SPIPlanPtr;
 
@@ -103,6 +110,14 @@ extern int	SPI_execute_plan_with_receiver(SPIPlanPtr plan,
 										   ParamListInfo params,
 										   bool read_only, long tcount,
 										   DestReceiver *dest);
+extern int	SPI_execute_plan_extended(SPIPlanPtr plan,
+									  ParamListInfo params,
+									  bool read_only, long tcount,
+									  SPIExecuteOptions *options);
+extern int	SPI_execute_plan_with_resowner(SPIPlanPtr plan,
+										   ParamListInfo params,
+										   bool read_only, long tcount,
+										   ResourceOwner owner);
 extern int	SPI_exec(const char *src, long tcount);
 extern int	SPI_execp(SPIPlanPtr plan, Datum *Values, const char *Nulls,
 					  long tcount);
diff --git a/src/include/utils/plancache.h b/src/include/utils/plancache.h
index 79d96e5ed0..ff09c63a02 100644
--- a/src/include/utils/plancache.h
+++ b/src/include/utils/plancache.h
@@ -219,9 +219,9 @@ extern List *CachedPlanGetTargetList(CachedPlanSource *plansource,
 
 extern CachedPlan *GetCachedPlan(CachedPlanSource *plansource,
 								 ParamListInfo boundParams,
-								 bool useResOwner,
+								 ResourceOwner owner,
 								 QueryEnvironment *queryEnv);
-extern void ReleaseCachedPlan(CachedPlan *plan, bool useResOwner);
+extern void ReleaseCachedPlan(CachedPlan *plan, ResourceOwner owner);
 
 extern bool CachedPlanAllowsSimpleValidityCheck(CachedPlanSource *plansource,
 												CachedPlan *plan,
diff --git a/src/pl/plpgsql/src/pl_comp.c b/src/pl/plpgsql/src/pl_comp.c
index 5336793a93..6517d738d8 100644
--- a/src/pl/plpgsql/src/pl_comp.c
+++ b/src/pl/plpgsql/src/pl_comp.c
@@ -368,6 +368,8 @@ do_compile(FunctionCallInfo fcinfo,
 
 	function->fn_prokind = procStruct->prokind;
 
+	function->require_local_resowner = false;
+
 	function->nstatements = 0;
 
 	/*
diff --git a/src/pl/plpgsql/src/pl_exec.c b/src/pl/plpgsql/src/pl_exec.c
index 3a9349b724..c6c5b07114 100644
--- a/src/pl/plpgsql/src/pl_exec.c
+++ b/src/pl/plpgsql/src/pl_exec.c
@@ -329,8 +329,7 @@ static void plpgsql_estate_setup(PLpgSQL_execstate *estate,
 static void exec_eval_cleanup(PLpgSQL_execstate *estate);
 
 static void exec_prepare_plan(PLpgSQL_execstate *estate,
-							  PLpgSQL_expr *expr, int cursorOptions,
-							  bool keepplan);
+							  PLpgSQL_expr *expr, int cursorOptions);
 static void exec_simple_check_plan(PLpgSQL_execstate *estate, PLpgSQL_expr *expr);
 static void exec_save_simple_expr(PLpgSQL_expr *expr, CachedPlan *cplan);
 static void exec_check_rw_parameter(PLpgSQL_expr *expr);
@@ -446,7 +445,8 @@ static char *format_expr_params(PLpgSQL_execstate *estate,
 								const PLpgSQL_expr *expr);
 static char *format_preparedparamsdata(PLpgSQL_execstate *estate,
 									   ParamListInfo paramLI);
-
+static PLpgSQL_variable *make_callstmt_target(PLpgSQL_execstate *estate,
+											  PLpgSQL_expr *expr);
 
 /* ----------
  * plpgsql_exec_function	Called by the call handler for
@@ -460,12 +460,18 @@ static char *format_preparedparamsdata(PLpgSQL_execstate *estate,
  * shared_simple_eval_resowner.  (When using a private simple_eval_estate,
  * we must also use a private cast hashtable, but that's taken care of
  * within plpgsql_estate_setup.)
+ * local_resowner is used only when function is executed as non atomic
+ * and it is created by and passed by caller. It is used like cached plan
+ * owner for CALL statements instead CurrentResourceOwner, than cannot be
+ * used safely, because it is transaction limited. local_resowner is
+ * assigned with routine execution.
  * ----------
  */
 Datum
 plpgsql_exec_function(PLpgSQL_function *func, FunctionCallInfo fcinfo,
 					  EState *simple_eval_estate,
 					  ResourceOwner simple_eval_resowner,
+					  ResourceOwner local_resowner,
 					  bool atomic)
 {
 	PLpgSQL_execstate estate;
@@ -480,6 +486,10 @@ plpgsql_exec_function(PLpgSQL_function *func, FunctionCallInfo fcinfo,
 						 simple_eval_estate, simple_eval_resowner);
 	estate.atomic = atomic;
 
+	/* nonatomic execution requires valid local_resowner */
+	Assert(atomic || local_resowner);
+	estate.local_resowner = local_resowner;
+
 	/*
 	 * Setup error traceback support for ereport()
 	 */
@@ -2150,231 +2160,67 @@ static int
 exec_stmt_call(PLpgSQL_execstate *estate, PLpgSQL_stmt_call *stmt)
 {
 	PLpgSQL_expr *expr = stmt->expr;
-	SPIPlanPtr	orig_plan = expr->plan;
-	bool		local_plan;
-	PLpgSQL_variable *volatile cur_target = stmt->target;
-	volatile LocalTransactionId before_lxid;
+	LocalTransactionId before_lxid;
 	LocalTransactionId after_lxid;
-	volatile bool pushed_active_snap = false;
+	bool pushed_active_snap = false;
 	volatile int rc;
+	ParamListInfo paramLI;
+	SPIExecuteOptions opts;
 
 	/*
-	 * If not in atomic context, we make a local plan that we'll just use for
-	 * this invocation, and will free at the end.  Otherwise, transaction ends
-	 * would cause errors about plancache leaks.
-	 *
-	 * XXX This would be fixable with some plancache/resowner surgery
-	 * elsewhere, but for now we'll just work around this here.
+	 * Make a plan if we don't have one, or if we need a local one.  Note
+	 * that we'll overwrite expr->plan either way; the PG_TRY block will
+	 * ensure we undo that on the way out, if the plan is local.
 	 */
-	local_plan = !estate->atomic;
-
-	/* PG_TRY to ensure we clear the plan link, if needed, on failure */
-	PG_TRY();
+	if (expr->plan == NULL)
 	{
-		SPIPlanPtr	plan = expr->plan;
-		ParamListInfo paramLI;
+		/* Don't let SPI save the plan if it's going to be local */
+		exec_prepare_plan(estate, expr, 0);
 
 		/*
-		 * Make a plan if we don't have one, or if we need a local one.  Note
-		 * that we'll overwrite expr->plan either way; the PG_TRY block will
-		 * ensure we undo that on the way out, if the plan is local.
+		 * A CALL or DO can never be a simple expression.  (If it could
+		 * be, we'd have to worry about saving/restoring the previous
+		 * values of the related expr fields, not just expr->plan.)
 		 */
-		if (plan == NULL || local_plan)
-		{
-			/* Don't let SPI save the plan if it's going to be local */
-			exec_prepare_plan(estate, expr, 0, !local_plan);
-			plan = expr->plan;
+		Assert(!expr->expr_simple_expr);
 
-			/*
-			 * A CALL or DO can never be a simple expression.  (If it could
-			 * be, we'd have to worry about saving/restoring the previous
-			 * values of the related expr fields, not just expr->plan.)
-			 */
-			Assert(!expr->expr_simple_expr);
-
-			/*
-			 * The procedure call could end transactions, which would upset
-			 * the snapshot management in SPI_execute*, so don't let it do it.
-			 * Instead, we set the snapshots ourselves below.
-			 */
-			plan->no_snapshots = true;
-
-			/*
-			 * Force target to be recalculated whenever the plan changes, in
-			 * case the procedure's argument list has changed.
-			 */
-			stmt->target = NULL;
-			cur_target = NULL;
-		}
+		/*
+		 * The procedure call could end transactions, which would upset
+		 * the snapshot management in SPI_execute*, so don't let it do it.
+		 * Instead, we set the snapshots ourselves below.
+		 */
+		expr->plan->no_snapshots = true;
 
 		/*
 		 * We construct a DTYPE_ROW datum representing the plpgsql variables
 		 * associated with the procedure's output arguments.  Then we can use
 		 * exec_move_row() to do the assignments.
-		 *
-		 * If we're using a local plan, also make a local target; otherwise,
-		 * since the above code will force a new plan each time through, we'd
-		 * repeatedly leak the memory for the target.  (Note: we also leak the
-		 * target when a plan change is forced, but that isn't so likely to
-		 * cause excessive memory leaks.)
 		 */
-		if (stmt->is_call && cur_target == NULL)
-		{
-			Node	   *node;
-			FuncExpr   *funcexpr;
-			HeapTuple	func_tuple;
-			List	   *funcargs;
-			Oid		   *argtypes;
-			char	  **argnames;
-			char	   *argmodes;
-			MemoryContext oldcontext;
-			PLpgSQL_row *row;
-			int			nfields;
-			int			i;
-			ListCell   *lc;
-
-			/* Use stmt_mcontext for any cruft accumulated here */
-			oldcontext = MemoryContextSwitchTo(get_stmt_mcontext(estate));
-
-			/*
-			 * Get the parsed CallStmt, and look up the called procedure
-			 */
-			node = linitial_node(Query,
-								 ((CachedPlanSource *) linitial(plan->plancache_list))->query_list)->utilityStmt;
-			if (node == NULL || !IsA(node, CallStmt))
-				elog(ERROR, "query for CALL statement is not a CallStmt");
-
-			funcexpr = ((CallStmt *) node)->funcexpr;
-
-			func_tuple = SearchSysCache1(PROCOID,
-										 ObjectIdGetDatum(funcexpr->funcid));
-			if (!HeapTupleIsValid(func_tuple))
-				elog(ERROR, "cache lookup failed for function %u",
-					 funcexpr->funcid);
-
-			/*
-			 * Extract function arguments, and expand any named-arg notation
-			 */
-			funcargs = expand_function_arguments(funcexpr->args,
-												 funcexpr->funcresulttype,
-												 func_tuple);
-
-			/*
-			 * Get the argument names and modes, too
-			 */
-			get_func_arg_info(func_tuple, &argtypes, &argnames, &argmodes);
-
-			ReleaseSysCache(func_tuple);
-
-			/*
-			 * Begin constructing row Datum; keep it in fn_cxt if it's to be
-			 * long-lived.
-			 */
-			if (!local_plan)
-				MemoryContextSwitchTo(estate->func->fn_cxt);
-
-			row = (PLpgSQL_row *) palloc0(sizeof(PLpgSQL_row));
-			row->dtype = PLPGSQL_DTYPE_ROW;
-			row->refname = "(unnamed row)";
-			row->lineno = -1;
-			row->varnos = (int *) palloc(sizeof(int) * list_length(funcargs));
-
-			if (!local_plan)
-				MemoryContextSwitchTo(get_stmt_mcontext(estate));
-
-			/*
-			 * Examine procedure's argument list.  Each output arg position
-			 * should be an unadorned plpgsql variable (Datum), which we can
-			 * insert into the row Datum.
-			 */
-			nfields = 0;
-			i = 0;
-			foreach(lc, funcargs)
-			{
-				Node	   *n = lfirst(lc);
-
-				if (argmodes &&
-					(argmodes[i] == PROARGMODE_INOUT ||
-					 argmodes[i] == PROARGMODE_OUT))
-				{
-					if (IsA(n, Param))
-					{
-						Param	   *param = (Param *) n;
-
-						/* paramid is offset by 1 (see make_datum_param()) */
-						row->varnos[nfields++] = param->paramid - 1;
-					}
-					else
-					{
-						/* report error using parameter name, if available */
-						if (argnames && argnames[i] && argnames[i][0])
-							ereport(ERROR,
-									(errcode(ERRCODE_SYNTAX_ERROR),
-									 errmsg("procedure parameter \"%s\" is an output parameter but corresponding argument is not writable",
-											argnames[i])));
-						else
-							ereport(ERROR,
-									(errcode(ERRCODE_SYNTAX_ERROR),
-									 errmsg("procedure parameter %d is an output parameter but corresponding argument is not writable",
-											i + 1)));
-					}
-				}
-				i++;
-			}
-
-			row->nfields = nfields;
-
-			cur_target = (PLpgSQL_variable *) row;
-
-			/* We can save and re-use the target datum, if it's not local */
-			if (!local_plan)
-				stmt->target = cur_target;
-
-			MemoryContextSwitchTo(oldcontext);
-		}
-
-		paramLI = setup_param_list(estate, expr);
-
-		before_lxid = MyProc->lxid;
+		if (stmt->is_call)
+			stmt->target = make_callstmt_target(estate, expr);
+	}
 
-		/*
-		 * Set snapshot only for non-read-only procedures, similar to SPI
-		 * behavior.
-		 */
-		if (!estate->readonly_func)
-		{
-			PushActiveSnapshot(GetTransactionSnapshot());
-			pushed_active_snap = true;
-		}
+	paramLI = setup_param_list(estate, expr);
 
-		rc = SPI_execute_plan_with_paramlist(expr->plan, paramLI,
-											 estate->readonly_func, 0);
-	}
-	PG_CATCH();
-	{
-		/*
-		 * If we are using a local plan, restore the old plan link.
-		 */
-		if (local_plan)
-			expr->plan = orig_plan;
-		PG_RE_THROW();
-	}
-	PG_END_TRY();
+	before_lxid = MyProc->lxid;
 
 	/*
-	 * If we are using a local plan, restore the old plan link; then free the
-	 * local plan to avoid memory leaks.  (Note that the error exit path above
-	 * just clears the link without risking calling SPI_freeplan; we expect
-	 * that xact cleanup will take care of the mess in that case.)
+	 * Set snapshot only for non-read-only procedures, similar to SPI
+	 * behavior.
 	 */
-	if (local_plan)
+	if (!estate->readonly_func)
 	{
-		SPIPlanPtr	plan = expr->plan;
-
-		expr->plan = orig_plan;
-		SPI_freeplan(plan);
+		PushActiveSnapshot(GetTransactionSnapshot());
+		pushed_active_snap = true;
 	}
 
+	opts.dest = NULL;
+	opts.owner = estate->local_resowner;
+
+	rc = SPI_execute_plan_extended(expr->plan, paramLI,
+									   estate->readonly_func, 0,
+									   &opts);
+
 	if (rc < 0)
 		elog(ERROR, "SPI_execute_plan_with_paramlist failed executing query \"%s\": %s",
 			 expr->query, SPI_result_code_string(rc));
@@ -2410,10 +2256,10 @@ exec_stmt_call(PLpgSQL_execstate *estate, PLpgSQL_stmt_call *stmt)
 	{
 		SPITupleTable *tuptab = SPI_tuptable;
 
-		if (!cur_target)
+		if (!stmt->is_call)
 			elog(ERROR, "DO statement returned a row");
 
-		exec_move_row(estate, cur_target, tuptab->vals[0], tuptab->tupdesc);
+		exec_move_row(estate, stmt->target, tuptab->vals[0], tuptab->tupdesc);
 	}
 	else if (SPI_processed > 1)
 		elog(ERROR, "procedure call returned more than one row");
@@ -2424,6 +2270,121 @@ exec_stmt_call(PLpgSQL_execstate *estate, PLpgSQL_stmt_call *stmt)
 	return PLPGSQL_RC_OK;
 }
 
+/*
+ * We construct a DTYPE_ROW datum representing the plpgsql variables
+ * associated with the procedure's output arguments.  Then we can use
+ * exec_move_row() to do the assignments.
+ */
+static PLpgSQL_variable *
+make_callstmt_target(PLpgSQL_execstate *estate, PLpgSQL_expr *expr)
+{
+	Node	   *node;
+	FuncExpr   *funcexpr;
+	HeapTuple	func_tuple;
+	List	   *funcargs;
+	Oid		   *argtypes;
+	char	  **argnames;
+	char	   *argmodes;
+	MemoryContext oldcontext;
+	PLpgSQL_row *row;
+	int			nfields;
+	int			i;
+	ListCell   *lc;
+
+	/* Use eval_mcontext for any cruft accumulated here */
+	oldcontext = MemoryContextSwitchTo(get_eval_mcontext(estate));
+
+	/*
+	 * Get the parsed CallStmt, and look up the called procedure
+	 */
+	node = linitial_node(Query,
+						 ((CachedPlanSource *) linitial(expr->plan->plancache_list))->query_list)->utilityStmt;
+	if (node == NULL || !IsA(node, CallStmt))
+		elog(ERROR, "query for CALL statement is not a CallStmt");
+
+	funcexpr = ((CallStmt *) node)->funcexpr;
+
+	func_tuple = SearchSysCache1(PROCOID,
+								 ObjectIdGetDatum(funcexpr->funcid));
+	if (!HeapTupleIsValid(func_tuple))
+		elog(ERROR, "cache lookup failed for function %u",
+			 funcexpr->funcid);
+
+	/*
+	 * Extract function arguments, and expand any named-arg notation
+	 */
+	funcargs = expand_function_arguments(funcexpr->args,
+										 funcexpr->funcresulttype,
+										 func_tuple);
+
+	/*
+	 * Get the argument names and modes, too
+	 */
+	get_func_arg_info(func_tuple, &argtypes, &argnames, &argmodes);
+
+	ReleaseSysCache(func_tuple);
+
+	/*
+	 * Begin constructing row Datum; keep it in fn_cxt if it's to be
+	 * long-lived.
+	 */
+	MemoryContextSwitchTo(estate->func->fn_cxt);
+
+	row = (PLpgSQL_row *) palloc0(sizeof(PLpgSQL_row));
+	row->dtype = PLPGSQL_DTYPE_ROW;
+	row->refname = "(unnamed row)";
+	row->lineno = -1;
+	row->varnos = (int *) palloc(sizeof(int) * list_length(funcargs));
+
+	MemoryContextSwitchTo(get_eval_mcontext(estate));
+
+	/*
+	 * Examine procedure's argument list.  Each output arg position
+	 * should be an unadorned plpgsql variable (Datum), which we can
+	 * insert into the row Datum.
+	 */
+	nfields = 0;
+	i = 0;
+	foreach(lc, funcargs)
+	{
+		Node	   *n = lfirst(lc);
+
+		if (argmodes &&
+			(argmodes[i] == PROARGMODE_INOUT ||
+			 argmodes[i] == PROARGMODE_OUT))
+		{
+			if (IsA(n, Param))
+			{
+				Param	   *param = (Param *) n;
+
+				/* paramid is offset by 1 (see make_datum_param()) */
+				row->varnos[nfields++] = param->paramid - 1;
+			}
+			else
+			{
+				/* report error using parameter name, if available */
+				if (argnames && argnames[i] && argnames[i][0])
+					ereport(ERROR,
+							(errcode(ERRCODE_SYNTAX_ERROR),
+							 errmsg("procedure parameter \"%s\" is an output parameter but corresponding argument is not writable",
+									argnames[i])));
+				else
+					ereport(ERROR,
+							(errcode(ERRCODE_SYNTAX_ERROR),
+							 errmsg("procedure parameter %d is an output parameter but corresponding argument is not writable",
+									i + 1)));
+			}
+		}
+		i++;
+	}
+
+	row->nfields = nfields;
+
+	MemoryContextSwitchTo(oldcontext);
+
+	return (PLpgSQL_variable *) row;
+}
+
 /* ----------
  * exec_stmt_getdiag					Put internal PG information into
  *										specified variables.
@@ -2960,7 +2921,7 @@ exec_stmt_forc(PLpgSQL_execstate *estate, PLpgSQL_stmt_forc *stmt)
 	Assert(query);
 
 	if (query->plan == NULL)
-		exec_prepare_plan(estate, query, curvar->cursor_options, true);
+		exec_prepare_plan(estate, query, curvar->cursor_options);
 
 	/*
 	 * Set up ParamListInfo for this query
@@ -3612,7 +3573,7 @@ exec_stmt_return_query(PLpgSQL_execstate *estate,
 		 * On the first call for this expression generate the plan.
 		 */
 		if (expr->plan == NULL)
-			exec_prepare_plan(estate, expr, CURSOR_OPT_PARALLEL_OK, true);
+			exec_prepare_plan(estate, expr, CURSOR_OPT_PARALLEL_OK);
 
 		/*
 		 * Set up ParamListInfo to pass to executor
@@ -4091,6 +4052,8 @@ plpgsql_estate_setup(PLpgSQL_execstate *estate,
 	else
 		estate->simple_eval_resowner = shared_simple_eval_resowner;
 
+	estate->local_resowner = NULL;
+
 	/*
 	 * We start with no stmt_mcontext; one will be created only if needed.
 	 * That context will be a direct child of the function's main execution
@@ -4159,8 +4122,7 @@ exec_eval_cleanup(PLpgSQL_execstate *estate)
  */
 static void
 exec_prepare_plan(PLpgSQL_execstate *estate,
-				  PLpgSQL_expr *expr, int cursorOptions,
-				  bool keepplan)
+				  PLpgSQL_expr *expr, int cursorOptions)
 {
 	SPIPlanPtr	plan;
 	SPIPrepareOptions options;
@@ -4183,8 +4145,8 @@ exec_prepare_plan(PLpgSQL_execstate *estate,
 	if (plan == NULL)
 		elog(ERROR, "SPI_prepare_extended failed for \"%s\": %s",
 			 expr->query, SPI_result_code_string(SPI_result));
-	if (keepplan)
-		SPI_keepplan(plan);
+
+	SPI_keepplan(plan);
 	expr->plan = plan;
 
 	/* Check to see if it's a simple expression */
@@ -4222,7 +4184,7 @@ exec_stmt_execsql(PLpgSQL_execstate *estate,
 	{
 		ListCell   *l;
 
-		exec_prepare_plan(estate, expr, CURSOR_OPT_PARALLEL_OK, true);
+		exec_prepare_plan(estate, expr, CURSOR_OPT_PARALLEL_OK);
 		stmt->mod_stmt = false;
 		foreach(l, SPI_plan_get_plan_sources(expr->plan))
 		{
@@ -4681,7 +4643,7 @@ exec_stmt_open(PLpgSQL_execstate *estate, PLpgSQL_stmt_open *stmt)
 		 */
 		query = stmt->query;
 		if (query->plan == NULL)
-			exec_prepare_plan(estate, query, stmt->cursor_options, true);
+			exec_prepare_plan(estate, query, stmt->cursor_options);
 	}
 	else if (stmt->dynquery != NULL)
 	{
@@ -4752,7 +4714,7 @@ exec_stmt_open(PLpgSQL_execstate *estate, PLpgSQL_stmt_open *stmt)
 
 		query = curvar->cursor_explicit_expr;
 		if (query->plan == NULL)
-			exec_prepare_plan(estate, query, curvar->cursor_options, true);
+			exec_prepare_plan(estate, query, curvar->cursor_options);
 	}
 
 	/*
@@ -4989,7 +4951,7 @@ exec_stmt_set(PLpgSQL_execstate *estate, PLpgSQL_stmt_set *stmt)
 
 	if (expr->plan == NULL)
 	{
-		exec_prepare_plan(estate, expr, 0, true);
+		exec_prepare_plan(estate, expr, 0);
 		expr->plan->no_snapshots = true;
 	}
 
@@ -5032,7 +4994,7 @@ exec_assign_expr(PLpgSQL_execstate *estate, PLpgSQL_datum *target,
 		else
 			expr->target_param = -1;	/* should be that already */
 
-		exec_prepare_plan(estate, expr, 0, true);
+		exec_prepare_plan(estate, expr, 0);
 	}
 
 	value = exec_eval_expr(estate, expr, &isnull, &valtype, &valtypmod);
@@ -5040,7 +5002,6 @@ exec_assign_expr(PLpgSQL_execstate *estate, PLpgSQL_datum *target,
 	exec_eval_cleanup(estate);
 }
 
-
 /* ----------
  * exec_assign_c_string		Put a C string into a text variable.
  *
@@ -5697,7 +5658,7 @@ exec_eval_expr(PLpgSQL_execstate *estate,
 	 * If first time through, create a plan for this expression.
 	 */
 	if (expr->plan == NULL)
-		exec_prepare_plan(estate, expr, CURSOR_OPT_PARALLEL_OK, true);
+		exec_prepare_plan(estate, expr, CURSOR_OPT_PARALLEL_OK);
 
 	/*
 	 * If this is a simple expression, bypass SPI and use the executor
@@ -5783,7 +5744,7 @@ exec_run_select(PLpgSQL_execstate *estate,
 	 */
 	if (expr->plan == NULL)
 		exec_prepare_plan(estate, expr,
-						  portalP == NULL ? CURSOR_OPT_PARALLEL_OK : 0, true);
+						  portalP == NULL ? CURSOR_OPT_PARALLEL_OK : 0);
 
 	/*
 	 * Set up ParamListInfo to pass to executor
@@ -6056,11 +6017,7 @@ exec_eval_simple_expr(PLpgSQL_execstate *estate,
 		 */
 		if (expr->expr_simple_plan_lxid == curlxid)
 		{
-			ResourceOwner saveResourceOwner = CurrentResourceOwner;
-
-			CurrentResourceOwner = estate->simple_eval_resowner;
-			ReleaseCachedPlan(expr->expr_simple_plan, true);
-			CurrentResourceOwner = saveResourceOwner;
+			ReleaseCachedPlan(expr->expr_simple_plan, estate->simple_eval_resowner);
 			expr->expr_simple_plan = NULL;
 			expr->expr_simple_plan_lxid = InvalidLocalTransactionId;
 		}
@@ -6094,7 +6051,7 @@ exec_eval_simple_expr(PLpgSQL_execstate *estate,
 		else
 		{
 			/* Release SPI_plan_get_cached_plan's refcount */
-			ReleaseCachedPlan(cplan, true);
+			ReleaseCachedPlan(cplan, CurrentResourceOwner);
 			/* Mark expression as non-simple, and fail */
 			expr->expr_simple_expr = NULL;
 			expr->expr_rw_param = NULL;
@@ -6105,7 +6062,7 @@ exec_eval_simple_expr(PLpgSQL_execstate *estate,
 		 * SPI_plan_get_cached_plan acquired a plan refcount stored in the
 		 * active resowner.  We don't need that anymore, so release it.
 		 */
-		ReleaseCachedPlan(cplan, true);
+		ReleaseCachedPlan(cplan, CurrentResourceOwner);
 
 		/* Extract desired scalar expression from cached plan */
 		exec_save_simple_expr(expr, cplan);
@@ -8023,7 +7980,7 @@ exec_simple_check_plan(PLpgSQL_execstate *estate, PLpgSQL_expr *expr)
 	 * Release the plan refcount obtained by SPI_plan_get_cached_plan.  (This
 	 * refcount is held by the wrong resowner, so we can't just repurpose it.)
 	 */
-	ReleaseCachedPlan(cplan, true);
+	ReleaseCachedPlan(cplan, CurrentResourceOwner);
 }
 
 /*
diff --git a/src/pl/plpgsql/src/pl_gram.y b/src/pl/plpgsql/src/pl_gram.y
index 0b0ff4e5de..13838ee690 100644
--- a/src/pl/plpgsql/src/pl_gram.y
+++ b/src/pl/plpgsql/src/pl_gram.y
@@ -951,6 +951,9 @@ stmt_call		: K_CALL
 						new->expr = read_sql_stmt();
 						new->is_call = true;
 
+						/* This statement can require local resource owner */
+						plpgsql_curr_compile->require_local_resowner = true;
+
 						$$ = (PLpgSQL_stmt *)new;
 
 					}
diff --git a/src/pl/plpgsql/src/pl_handler.c b/src/pl/plpgsql/src/pl_handler.c
index 52e6c69cc5..109ce08468 100644
--- a/src/pl/plpgsql/src/pl_handler.c
+++ b/src/pl/plpgsql/src/pl_handler.c
@@ -225,6 +225,7 @@ plpgsql_call_handler(PG_FUNCTION_ARGS)
 	PLpgSQL_function *func;
 	PLpgSQL_execstate *save_cur_estate;
 	Datum		retval;
+	ResourceOwner local_resowner = NULL;
 	int			rc;
 
 	nonatomic = fcinfo->context &&
@@ -246,6 +247,11 @@ plpgsql_call_handler(PG_FUNCTION_ARGS)
 	/* Mark the function as busy, so it can't be deleted from under us */
 	func->use_count++;
 
+	/* Create local resource owner only for nonatomic mode */
+	if (nonatomic && func->require_local_resowner)
+		local_resowner = ResourceOwnerCreate(NULL,
+											 "PL/pgSQL local resource owner");
+
 	PG_TRY();
 	{
 		/*
@@ -264,6 +270,7 @@ plpgsql_call_handler(PG_FUNCTION_ARGS)
 		else
 			retval = plpgsql_exec_function(func, fcinfo,
 										   NULL, NULL,
+										   local_resowner,
 										   !nonatomic);
 	}
 	PG_FINALLY();
@@ -271,6 +278,12 @@ plpgsql_call_handler(PG_FUNCTION_ARGS)
 		/* Decrement use-count, restore cur_estate */
 		func->use_count--;
 		func->cur_estate = save_cur_estate;
+
+		if (local_resowner)
+		{
+			ResourceOwnerReleaseAllPlanCacheRefs(local_resowner);
+			ResourceOwnerDelete(local_resowner);
+		}
 	}
 	PG_END_TRY();
 
@@ -300,7 +313,9 @@ plpgsql_inline_handler(PG_FUNCTION_ARGS)
 	FmgrInfo	flinfo;
 	EState	   *simple_eval_estate;
 	ResourceOwner simple_eval_resowner;
+	ResourceOwner local_resowner = NULL;
 	Datum		retval;
+	bool		nonatomic;
 	int			rc;
 
 	/*
@@ -338,13 +353,20 @@ plpgsql_inline_handler(PG_FUNCTION_ARGS)
 	simple_eval_resowner =
 		ResourceOwnerCreate(NULL, "PL/pgSQL DO block simple expressions");
 
+	nonatomic = !codeblock->atomic;
+
+	if (nonatomic)
+		local_resowner = ResourceOwnerCreate(NULL,
+											 "PL/pgSQL local resource owner");
+
 	/* And run the function */
 	PG_TRY();
 	{
 		retval = plpgsql_exec_function(func, fake_fcinfo,
 									   simple_eval_estate,
 									   simple_eval_resowner,
-									   codeblock->atomic);
+									   local_resowner,
+									   !nonatomic);
 	}
 	PG_CATCH();
 	{
@@ -371,6 +393,13 @@ plpgsql_inline_handler(PG_FUNCTION_ARGS)
 		ResourceOwnerReleaseAllPlanCacheRefs(simple_eval_resowner);
 		ResourceOwnerDelete(simple_eval_resowner);
 
+		/* Clean local resource owner */
+		if (nonatomic)
+		{
+			ResourceOwnerReleaseAllPlanCacheRefs(local_resowner);
+			ResourceOwnerDelete(local_resowner);
+		}
+
 		/* Function should now have no remaining use-counts ... */
 		func->use_count--;
 		Assert(func->use_count == 0);
@@ -388,6 +417,13 @@ plpgsql_inline_handler(PG_FUNCTION_ARGS)
 	ResourceOwnerReleaseAllPlanCacheRefs(simple_eval_resowner);
 	ResourceOwnerDelete(simple_eval_resowner);
 
+	/* Clean local resource owner */
+	if (!codeblock->atomic)
+	{
+		ResourceOwnerReleaseAllPlanCacheRefs(local_resowner);
+		ResourceOwnerDelete(local_resowner);
+	}
+
 	/* Function should now have no remaining use-counts ... */
 	func->use_count--;
 	Assert(func->use_count == 0);
diff --git a/src/pl/plpgsql/src/plpgsql.h b/src/pl/plpgsql/src/plpgsql.h
index 416fda7f3b..2a1a43d388 100644
--- a/src/pl/plpgsql/src/plpgsql.h
+++ b/src/pl/plpgsql/src/plpgsql.h
@@ -1004,6 +1004,7 @@ typedef struct PLpgSQL_function
 	PLpgSQL_resolve_option resolve_option;
 
 	bool		print_strict_params;
+	bool		require_local_resowner;
 
 	/* extra checks */
 	int			extra_warnings;
@@ -1081,6 +1082,9 @@ typedef struct PLpgSQL_execstate
 	EState	   *simple_eval_estate;
 	ResourceOwner simple_eval_resowner;
 
+	/* routine execution resource owner */
+	ResourceOwner local_resowner;
+
 	/* lookup table to use for executing type casts */
 	HTAB	   *cast_hash;
 	MemoryContext cast_hash_context;
@@ -1265,6 +1269,7 @@ extern Datum plpgsql_exec_function(PLpgSQL_function *func,
 								   FunctionCallInfo fcinfo,
 								   EState *simple_eval_estate,
 								   ResourceOwner simple_eval_resowner,
+								   ResourceOwner local_resowner,
 								   bool atomic);
 extern HeapTuple plpgsql_exec_trigger(PLpgSQL_function *func,
 									  TriggerData *trigdata);

Reply via email to