(2010/05/25 12:19), Robert Haas wrote:
> On Mon, May 24, 2010 at 9:27 PM, Stephen Frost<sfr...@snowman.net>  wrote:
>> * KaiGai Kohei (kai...@ak.jp.nec.com) wrote:
>>> We have two options; If the checker function takes the list of 
>>> RangeTblEntry,
>>> it will be comfortable to ExecCheckRTPerms(), but not DoCopy(). Inversely,
>>> if the checker function takes arguments in my patch, it will be comfortable
>>> to DoCopy(), but ExecCheckRTPerms().
>>>
>>> In my patch, it takes 6 arguments, but we can reference all of them from
>>> the given RangeTblEntry. On the other hand, if DoCopy() has to set up
>>> a pseudo RangeTblEntry to call checker function, it entirely needs to set
>>> up similar or a bit large number of variables.
>>
>> I don't know that it's really all that difficult to set up an RT in
>> DoCopy or RI_Initial_Check().  In my opinion, those are the strange or
>> corner cases- not the Executor code, through which all 'regular' DML is
>> done.  It makes me wonder if COPY shouldn't have been implemented using
>> the Executor instead, but that's, again, a completely separate topic.
>> It wasn't, but it wants to play like it operates in the same kind of way
>> as INSERT, so it needs to pick up the slack.
> 
> I think this approach is definitely worth investigating.  KaiGai, can
> you please work up what the patch would look like if we do it this
> way?

OK, the attached patch reworks it according to the way.

* ExecCheckRTEPerms() becomes to take 2nd argument the caller to suggest
  behavior on access violation. The 'abort' argument is true, it raises
  an error using aclcheck_error() or ereport(). Otherwise, it returns
  false immediately without rest of checks.

* DoCopy() and RI_Initial_Check() were reworked to call ExecCheckRTEPerms()
  with locally built RangeTblEntry.

Thanks,
-- 
KaiGai Kohei <kai...@ak.jp.nec.com>
*** a/src/backend/commands/copy.c
--- b/src/backend/commands/copy.c
***************
*** 21,26 ****
--- 21,27 ----
  #include <arpa/inet.h>
  
  #include "access/heapam.h"
+ #include "access/sysattr.h"
  #include "access/xact.h"
  #include "catalog/namespace.h"
  #include "catalog/pg_type.h"
***************
*** 37,43 ****
  #include "rewrite/rewriteHandler.h"
  #include "storage/fd.h"
  #include "tcop/tcopprot.h"
- #include "utils/acl.h"
  #include "utils/builtins.h"
  #include "utils/lsyscache.h"
  #include "utils/memutils.h"
--- 38,43 ----
***************
*** 725,733 **** DoCopy(const CopyStmt *stmt, const char *queryString)
  	List	   *force_notnull = NIL;
  	bool		force_quote_all = false;
  	bool		format_specified = false;
- 	AclMode		required_access = (is_from ? ACL_INSERT : ACL_SELECT);
- 	AclMode		relPerms;
- 	AclMode		remainingPerms;
  	ListCell   *option;
  	TupleDesc	tupDesc;
  	int			num_phys_attrs;
--- 725,730 ----
***************
*** 988,993 **** DoCopy(const CopyStmt *stmt, const char *queryString)
--- 985,995 ----
  
  	if (stmt->relation)
  	{
+ 		RangeTblEntry	rte;
+ 		Bitmapset	   *columnsSet = NULL;
+ 		List		   *attnums;
+ 		ListCell	   *cur;
+ 
  		Assert(!stmt->query);
  		cstate->queryDesc = NULL;
  
***************
*** 998,1026 **** DoCopy(const CopyStmt *stmt, const char *queryString)
  		tupDesc = RelationGetDescr(cstate->rel);
  
  		/* Check relation permissions. */
! 		relPerms = pg_class_aclmask(RelationGetRelid(cstate->rel), GetUserId(),
! 									required_access, ACLMASK_ALL);
! 		remainingPerms = required_access & ~relPerms;
! 		if (remainingPerms != 0)
  		{
! 			/* We don't have table permissions, check per-column permissions */
! 			List	   *attnums;
! 			ListCell   *cur;
! 
! 			attnums = CopyGetAttnums(tupDesc, cstate->rel, attnamelist);
! 			foreach(cur, attnums)
! 			{
! 				int			attnum = lfirst_int(cur);
  
! 				if (pg_attribute_aclcheck(RelationGetRelid(cstate->rel),
! 										  attnum,
! 										  GetUserId(),
! 										  remainingPerms) != ACLCHECK_OK)
! 					aclcheck_error(ACLCHECK_NO_PRIV, ACL_KIND_CLASS,
! 								   RelationGetRelationName(cstate->rel));
! 			}
  		}
  
  		/* check read-only transaction */
  		if (XactReadOnly && is_from && !cstate->rel->rd_islocaltemp)
  			PreventCommandIfReadOnly("COPY FROM");
--- 1000,1025 ----
  		tupDesc = RelationGetDescr(cstate->rel);
  
  		/* Check relation permissions. */
! 		attnums = CopyGetAttnums(tupDesc, cstate->rel, attnamelist);
! 		foreach (cur, attnums)
  		{
! 			int	attnum = lfirst_int(cur) - FirstLowInvalidHeapAttributeNumber;
  
! 			columnsSet = bms_add_member(columnsSet, attnum);
  		}
  
+ 		memset(&rte, 0, sizeof(rte));
+ 		rte.type = T_RangeTblEntry;
+ 		rte.rtekind = RTE_RELATION;
+ 		rte.relid = RelationGetRelid(cstate->rel);
+ 		rte.requiredPerms = (is_from ? ACL_INSERT : ACL_SELECT);
+ 		if (is_from)
+ 			rte.modifiedCols = columnsSet;
+ 		else
+ 			rte.selectedCols = columnsSet;
+ 
+ 		ExecCheckRTEPerms(&rte, true);
+ 
  		/* check read-only transaction */
  		if (XactReadOnly && is_from && !cstate->rel->rd_islocaltemp)
  			PreventCommandIfReadOnly("COPY FROM");
*** a/src/backend/executor/execMain.c
--- b/src/backend/executor/execMain.c
***************
*** 63,68 **** ExecutorStart_hook_type ExecutorStart_hook = NULL;
--- 63,71 ----
  ExecutorRun_hook_type ExecutorRun_hook = NULL;
  ExecutorEnd_hook_type ExecutorEnd_hook = NULL;
  
+ /* Hook for plugin to get control in ExecCheckRTPerms() */
+ ExecutorCheckPerms_hook_type ExecutorCheckPerms_hook = NULL;
+ 
  /* decls for local routines only used within this module */
  static void InitPlan(QueryDesc *queryDesc, int eflags);
  static void ExecEndPlan(PlanState *planstate, EState *estate);
***************
*** 73,79 **** static void ExecutePlan(EState *estate, PlanState *planstate,
  			ScanDirection direction,
  			DestReceiver *dest);
  static void ExecCheckRTPerms(List *rangeTable);
- static void ExecCheckRTEPerms(RangeTblEntry *rte);
  static void ExecCheckXactReadOnly(PlannedStmt *plannedstmt);
  static void EvalPlanQualStart(EPQState *epqstate, EState *parentestate,
  				  Plan *planTree);
--- 76,81 ----
***************
*** 414,420 **** ExecCheckRTPerms(List *rangeTable)
  
  	foreach(l, rangeTable)
  	{
! 		ExecCheckRTEPerms((RangeTblEntry *) lfirst(l));
  	}
  }
  
--- 416,422 ----
  
  	foreach(l, rangeTable)
  	{
! 		ExecCheckRTEPerms((RangeTblEntry *) lfirst(l), true);
  	}
  }
  
***************
*** 422,429 **** ExecCheckRTPerms(List *rangeTable)
   * ExecCheckRTEPerms
   *		Check access permissions for a single RTE.
   */
! static void
! ExecCheckRTEPerms(RangeTblEntry *rte)
  {
  	AclMode		requiredPerms;
  	AclMode		relPerms;
--- 424,431 ----
   * ExecCheckRTEPerms
   *		Check access permissions for a single RTE.
   */
! bool
! ExecCheckRTEPerms(RangeTblEntry *rte, bool abort)
  {
  	AclMode		requiredPerms;
  	AclMode		relPerms;
***************
*** 432,437 **** ExecCheckRTEPerms(RangeTblEntry *rte)
--- 434,440 ----
  	Oid			userid;
  	Bitmapset  *tmpset;
  	int			col;
+ 	bool		retval = true;
  
  	/*
  	 * Only plain-relation RTEs need to be checked here.  Function RTEs are
***************
*** 439,452 **** ExecCheckRTEPerms(RangeTblEntry *rte)
  	 * Join, subquery, and special RTEs need no checks.
  	 */
  	if (rte->rtekind != RTE_RELATION)
! 		return;
  
  	/*
  	 * No work if requiredPerms is empty.
  	 */
  	requiredPerms = rte->requiredPerms;
  	if (requiredPerms == 0)
! 		return;
  
  	relOid = rte->relid;
  
--- 442,455 ----
  	 * Join, subquery, and special RTEs need no checks.
  	 */
  	if (rte->rtekind != RTE_RELATION)
! 		return true;
  
  	/*
  	 * No work if requiredPerms is empty.
  	 */
  	requiredPerms = rte->requiredPerms;
  	if (requiredPerms == 0)
! 		return true;
  
  	relOid = rte->relid;
  
***************
*** 474,481 **** ExecCheckRTEPerms(RangeTblEntry *rte)
  		 * we can fail straight away.
  		 */
  		if (remainingPerms & ~(ACL_SELECT | ACL_INSERT | ACL_UPDATE))
! 			aclcheck_error(ACLCHECK_NO_PRIV, ACL_KIND_CLASS,
! 						   get_rel_name(relOid));
  
  		/*
  		 * Check to see if we have the needed privileges at column level.
--- 477,488 ----
  		 * we can fail straight away.
  		 */
  		if (remainingPerms & ~(ACL_SELECT | ACL_INSERT | ACL_UPDATE))
! 		{
! 			if (abort)
! 				aclcheck_error(ACLCHECK_NO_PRIV, ACL_KIND_CLASS,
! 							   get_rel_name(relOid));
! 			return false;
! 		}
  
  		/*
  		 * Check to see if we have the needed privileges at column level.
***************
*** 495,502 **** ExecCheckRTEPerms(RangeTblEntry *rte)
  			{
  				if (pg_attribute_aclcheck_all(relOid, userid, ACL_SELECT,
  											  ACLMASK_ANY) != ACLCHECK_OK)
! 					aclcheck_error(ACLCHECK_NO_PRIV, ACL_KIND_CLASS,
! 								   get_rel_name(relOid));
  			}
  
  			tmpset = bms_copy(rte->selectedCols);
--- 502,513 ----
  			{
  				if (pg_attribute_aclcheck_all(relOid, userid, ACL_SELECT,
  											  ACLMASK_ANY) != ACLCHECK_OK)
! 				{
! 					if (abort)
! 						aclcheck_error(ACLCHECK_NO_PRIV, ACL_KIND_CLASS,
! 									   get_rel_name(relOid));
! 					return false;
! 				}
  			}
  
  			tmpset = bms_copy(rte->selectedCols);
***************
*** 509,523 **** ExecCheckRTEPerms(RangeTblEntry *rte)
  					/* Whole-row reference, must have priv on all cols */
  					if (pg_attribute_aclcheck_all(relOid, userid, ACL_SELECT,
  												  ACLMASK_ALL) != ACLCHECK_OK)
! 						aclcheck_error(ACLCHECK_NO_PRIV, ACL_KIND_CLASS,
! 									   get_rel_name(relOid));
  				}
  				else
  				{
  					if (pg_attribute_aclcheck(relOid, col, userid, ACL_SELECT)
  						!= ACLCHECK_OK)
! 						aclcheck_error(ACLCHECK_NO_PRIV, ACL_KIND_CLASS,
! 									   get_rel_name(relOid));
  				}
  			}
  			bms_free(tmpset);
--- 520,542 ----
  					/* Whole-row reference, must have priv on all cols */
  					if (pg_attribute_aclcheck_all(relOid, userid, ACL_SELECT,
  												  ACLMASK_ALL) != ACLCHECK_OK)
! 					{
! 						if (abort)
! 							aclcheck_error(ACLCHECK_NO_PRIV, ACL_KIND_CLASS,
! 										   get_rel_name(relOid));
! 						return false;
! 					}
  				}
  				else
  				{
  					if (pg_attribute_aclcheck(relOid, col, userid, ACL_SELECT)
  						!= ACLCHECK_OK)
! 					{
! 						if (abort)
! 							aclcheck_error(ACLCHECK_NO_PRIV, ACL_KIND_CLASS,
! 										   get_rel_name(relOid));
! 						return false;
! 					}
  				}
  			}
  			bms_free(tmpset);
***************
*** 540,547 **** ExecCheckRTEPerms(RangeTblEntry *rte)
  			{
  				if (pg_attribute_aclcheck_all(relOid, userid, remainingPerms,
  											  ACLMASK_ANY) != ACLCHECK_OK)
! 					aclcheck_error(ACLCHECK_NO_PRIV, ACL_KIND_CLASS,
! 								   get_rel_name(relOid));
  			}
  
  			tmpset = bms_copy(rte->modifiedCols);
--- 559,570 ----
  			{
  				if (pg_attribute_aclcheck_all(relOid, userid, remainingPerms,
  											  ACLMASK_ANY) != ACLCHECK_OK)
! 				{
! 					if (abort)
! 						aclcheck_error(ACLCHECK_NO_PRIV, ACL_KIND_CLASS,
! 									   get_rel_name(relOid));
! 					return false;
! 				}
  			}
  
  			tmpset = bms_copy(rte->modifiedCols);
***************
*** 558,570 **** ExecCheckRTEPerms(RangeTblEntry *rte)
  				{
  					if (pg_attribute_aclcheck(relOid, col, userid, remainingPerms)
  						!= ACLCHECK_OK)
! 						aclcheck_error(ACLCHECK_NO_PRIV, ACL_KIND_CLASS,
! 									   get_rel_name(relOid));
  				}
  			}
  			bms_free(tmpset);
  		}
  	}
  }
  
  /*
--- 581,602 ----
  				{
  					if (pg_attribute_aclcheck(relOid, col, userid, remainingPerms)
  						!= ACLCHECK_OK)
! 					{
! 						if (abort)
! 							aclcheck_error(ACLCHECK_NO_PRIV, ACL_KIND_CLASS,
! 										   get_rel_name(relOid));
! 						return false;
! 					}
  				}
  			}
  			bms_free(tmpset);
  		}
  	}
+ 
+ 	if (ExecutorCheckPerms_hook)
+ 		retval = (*ExecutorCheckPerms_hook)(rte, abort);
+ 
+ 	return retval;
  }
  
  /*
*** a/src/backend/utils/adt/ri_triggers.c
--- b/src/backend/utils/adt/ri_triggers.c
***************
*** 30,45 ****
  
  #include "postgres.h"
  
  #include "access/xact.h"
  #include "catalog/pg_constraint.h"
  #include "catalog/pg_operator.h"
  #include "catalog/pg_type.h"
  #include "commands/trigger.h"
  #include "executor/spi.h"
  #include "parser/parse_coerce.h"
  #include "parser/parse_relation.h"
  #include "miscadmin.h"
- #include "utils/acl.h"
  #include "utils/builtins.h"
  #include "utils/fmgroids.h"
  #include "utils/guc.h"
--- 30,46 ----
  
  #include "postgres.h"
  
+ #include "access/sysattr.h"
  #include "access/xact.h"
  #include "catalog/pg_constraint.h"
  #include "catalog/pg_operator.h"
  #include "catalog/pg_type.h"
  #include "commands/trigger.h"
+ #include "executor/executor.h"
  #include "executor/spi.h"
  #include "parser/parse_coerce.h"
  #include "parser/parse_relation.h"
  #include "miscadmin.h"
  #include "utils/builtins.h"
  #include "utils/fmgroids.h"
  #include "utils/guc.h"
***************
*** 2624,2629 **** RI_Initial_Check(Trigger *trigger, Relation fk_rel, Relation pk_rel)
--- 2625,2633 ----
  	char		fkrelname[MAX_QUOTED_REL_NAME_LEN];
  	char		pkattname[MAX_QUOTED_NAME_LEN + 3];
  	char		fkattname[MAX_QUOTED_NAME_LEN + 3];
+ 	Bitmapset  *pkColumns = NULL;
+ 	Bitmapset  *fkColumns = NULL;
+ 	RangeTblEntry	rte;
  	const char *sep;
  	int			i;
  	int			old_work_mem;
***************
*** 2635,2649 **** RI_Initial_Check(Trigger *trigger, Relation fk_rel, Relation pk_rel)
  	 * Check to make sure current user has enough permissions to do the test
  	 * query.  (If not, caller can fall back to the trigger method, which
  	 * works because it changes user IDs on the fly.)
- 	 *
- 	 * XXX are there any other show-stopper conditions to check?
  	 */
! 	if (pg_class_aclcheck(RelationGetRelid(fk_rel), GetUserId(), ACL_SELECT) != ACLCHECK_OK)
! 		return false;
! 	if (pg_class_aclcheck(RelationGetRelid(pk_rel), GetUserId(), ACL_SELECT) != ACLCHECK_OK)
  		return false;
  
! 	ri_FetchConstraintInfo(&riinfo, trigger, fk_rel, false);
  
  	/*----------
  	 * The query string built is:
--- 2639,2669 ----
  	 * Check to make sure current user has enough permissions to do the test
  	 * query.  (If not, caller can fall back to the trigger method, which
  	 * works because it changes user IDs on the fly.)
  	 */
! 	ri_FetchConstraintInfo(&riinfo, trigger, fk_rel, false);
! 
! 	for (i = 0; i < riinfo.nkeys; i++)
! 	{
! 		fkColumns = bms_add_member(fkColumns, riinfo.fk_attnums[i]
! 								   - FirstLowInvalidHeapAttributeNumber);
! 		pkColumns = bms_add_member(pkColumns, riinfo.pk_attnums[i]
! 								   - FirstLowInvalidHeapAttributeNumber);
! 	}
! 
! 	memset(&rte, 0, sizeof(rte));
! 	rte.type = T_RangeTblEntry;
! 	rte.rtekind = RTE_RELATION;
! 	rte.requiredPerms = ACL_SELECT;
! 
! 	rte.relid = RelationGetRelid(fk_rel);
! 	rte.selectedCols = fkColumns;
! 	if (!ExecCheckRTEPerms(&rte, false))
  		return false;
  
! 	rte.relid = RelationGetRelid(pk_rel);
! 	rte.selectedCols = pkColumns;
! 	if (!ExecCheckRTEPerms(&rte, false))
! 		return false;
  
  	/*----------
  	 * The query string built is:
*** a/src/include/executor/executor.h
--- b/src/include/executor/executor.h
***************
*** 74,79 **** extern PGDLLIMPORT ExecutorRun_hook_type ExecutorRun_hook;
--- 74,82 ----
  typedef void (*ExecutorEnd_hook_type) (QueryDesc *queryDesc);
  extern PGDLLIMPORT ExecutorEnd_hook_type ExecutorEnd_hook;
  
+ /* Hook for plugins to get control in ExecCheckRTPerms() */
+ typedef bool (*ExecutorCheckPerms_hook_type) (RangeTblEntry *, bool);
+ extern PGDLLIMPORT ExecutorCheckPerms_hook_type ExecutorCheckPerms_hook;
  
  /*
   * prototypes from functions in execAmi.c
***************
*** 157,162 **** extern void standard_ExecutorRun(QueryDesc *queryDesc,
--- 160,166 ----
  extern void ExecutorEnd(QueryDesc *queryDesc);
  extern void standard_ExecutorEnd(QueryDesc *queryDesc);
  extern void ExecutorRewind(QueryDesc *queryDesc);
+ extern bool ExecCheckRTEPerms(RangeTblEntry *rte, bool abort);
  extern void InitResultRelInfo(ResultRelInfo *resultRelInfo,
  				  Relation resultRelationDesc,
  				  Index resultRelationIndex,
-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to