hi,

> I wrote:
>> y...@mwd.biglobe.ne.jp (YAMAMOTO Takashi) writes:
>>> after systable_getnext_ordered returned NULL, is it ok to call it again?
> 
>> I wouldn't rely on it working.
> 
>>> i'm wondering because inv_truncate seems to do it and expecting NULL.
> 
>> Hmm, that may well be a bug.  Have you tested it?
> 
> I looked at this a bit more closely, and basically the answer is that
> inv_truncate is accidentally failing to fail.  What will actually happen
> if systable_getnext_ordered is called another time, at least when using
> a btree index, is that it will start the same requested scan over again,
> ie deliver all the tuples it did the first time (which is none, in this
> case).  That's an implementation artifact, but since the behavior is
> undefined in the first place, it's not wrong.
> 
> Now, if inv_truncate's initial call on systable_getnext_ordered returns
> NULL (ie, the truncation point is past the current EOF page), it will
> fall through to the "Write a brand new page" code, which will create and
> insert a partial page at the truncation point.  It then goes to the
> delete-all-remaining-pages loop.  Because that starts a fresh scan with
> the very same scan key conditions, you might expect that it would find
> and delete the page it just inserted --- causing the apparent EOF of the
> blob to be wrong afterwards.  It accidentally fails to do that because
> the new tuple postdates the snapshot it's scanning with.  So the loop
> terminates having found no matching tuples, and all is well.
> 
> So this code is confusing, inefficient (performing a useless search of
> the index), only works because of an obscure consideration not explained
> in the comments, and sets a bad precedent for people to follow.  I'm
> going to go change it to explicitly not do the final loop if the initial
> search failed.  It's not a bug, exactly, but it's sure lousy coding.
> Thanks for pointing it out.

thanks for the quick investigation and fix.

the attached patch is to avoid unnecessary detoast'ing and EOF marker pages
when possible.  does it make sense?

YAMAMOTO Takashi

> 
>                       regards, tom lane
> 
> -- 
> Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
> To make changes to your subscription:
> http://www.postgresql.org/mailpref/pgsql-hackers
diff --git a/src/backend/storage/large_object/inv_api.c 
b/src/backend/storage/large_object/inv_api.c
index 01e3492..60a9c69 100644
--- a/src/backend/storage/large_object/inv_api.c
+++ b/src/backend/storage/large_object/inv_api.c
@@ -178,10 +178,14 @@ myLargeObjectExists(Oid loid, Snapshot snapshot)
 static int32
 getbytealen(bytea *data)
 {
-       Assert(!VARATT_IS_EXTENDED(data));
-       if (VARSIZE(data) < VARHDRSZ)
-               elog(ERROR, "invalid VARSIZE(data)");
-       return (VARSIZE(data) - VARHDRSZ);
+       Size size;
+
+       size = toast_raw_datum_size(PointerGetDatum(data));
+       if (size < VARHDRSZ)
+               elog(ERROR, "invalid size");
+       if (size > VARHDRSZ + LOBLKSIZE)
+               elog(ERROR, "too large page");
+       return (size - VARHDRSZ);
 }
 
 
@@ -359,22 +363,12 @@ inv_getsize(LargeObjectDesc *obj_desc)
        {
                Form_pg_largeobject data;
                bytea      *datafield;
-               bool            pfreeit;
 
                if (HeapTupleHasNulls(tuple))   /* paranoia */
                        elog(ERROR, "null field found in pg_largeobject");
                data = (Form_pg_largeobject) GETSTRUCT(tuple);
                datafield = &(data->data);              /* see note at top of 
file */
-               pfreeit = false;
-               if (VARATT_IS_EXTENDED(datafield))
-               {
-                       datafield = (bytea *)
-                               heap_tuple_untoast_attr((struct varlena *) 
datafield);
-                       pfreeit = true;
-               }
                lastbyte = data->pageno * LOBLKSIZE + getbytealen(datafield);
-               if (pfreeit)
-                       pfree(datafield);
        }
 
        systable_endscan_ordered(sd);
@@ -724,8 +718,9 @@ inv_write(LargeObjectDesc *obj_desc, const char *buf, int 
nbytes)
 void
 inv_truncate(LargeObjectDesc *obj_desc, int len)
 {
-       int32           pageno = (int32) (len / LOBLKSIZE);
-       int                     off;
+       const int32     pageno = (int32) (len / LOBLKSIZE);
+       int32           reqpageno;
+       const int       off = len % LOBLKSIZE;  /* offset in the page */
        ScanKeyData skey[2];
        SysScanDesc sd;
        HeapTuple       oldtuple;
@@ -741,6 +736,7 @@ inv_truncate(LargeObjectDesc *obj_desc, int len)
        Datum           values[Natts_pg_largeobject];
        bool            nulls[Natts_pg_largeobject];
        bool            replace[Natts_pg_largeobject];
+       bool            prevpagefull;
        CatalogIndexState indstate;
 
        Assert(PointerIsValid(obj_desc));
@@ -770,10 +766,20 @@ inv_truncate(LargeObjectDesc *obj_desc, int len)
                                BTEqualStrategyNumber, F_OIDEQ,
                                ObjectIdGetDatum(obj_desc->id));
 
+       /*
+        * Request the previous page as well, if we are truncating to the non-0
+        * multiple of LOBLKSIZE, so that we can check if a 0-sized EOF marker 
page
+        * is necessary or not.
+        */
+       if (off == 0 && pageno > 0)
+               reqpageno = pageno - 1;
+       else
+               reqpageno = pageno;
+
        ScanKeyInit(&skey[1],
                                Anum_pg_largeobject_pageno,
                                BTGreaterEqualStrategyNumber, F_INT4GE,
-                               Int32GetDatum(pageno));
+                               Int32GetDatum(reqpageno));
 
        sd = systable_beginscan_ordered(lo_heap_r, lo_index_r,
                                                                        
obj_desc->snapshot, 2, skey);
@@ -781,6 +787,9 @@ inv_truncate(LargeObjectDesc *obj_desc, int len)
        /*
         * If possible, get the page the truncation point is in. The truncation
         * point may be beyond the end of the LO or in a hole.
+        *
+        * This can be the previous page in case we are truncating to the non-0
+        * multiple of LOBLKSIZE.
         */
        olddata = NULL;
        if ((oldtuple = systable_getnext_ordered(sd, ForwardScanDirection)) != 
NULL)
@@ -788,57 +797,97 @@ inv_truncate(LargeObjectDesc *obj_desc, int len)
                if (HeapTupleHasNulls(oldtuple))                /* paranoia */
                        elog(ERROR, "null field found in pg_largeobject");
                olddata = (Form_pg_largeobject) GETSTRUCT(oldtuple);
-               Assert(olddata->pageno >= pageno);
+               Assert(olddata->pageno >= reqpageno);
        }
+       /*
+        * Check if the previous page is full.
+        * In that case, we don't need to create the 0-sized EOF-marker page.
+        *
+        * It's safe to consider the page before pageno 0 full for our purpose.
+        */
+       prevpagefull = pageno == 0;
+       if (olddata != NULL && olddata->pageno + 1 == pageno)
+       {
+               bytea      *datafield = &(olddata->data);               /* see 
note at top of
+                                                                               
                                 * file */
+               Assert(pageno > 0);
+               Assert(off == 0);
+               if (getbytealen(datafield) == LOBLKSIZE)
+               {
+                       prevpagefull = true;
+               }
+               /*
+                * Get the next page.
+                */
+               olddata = NULL;
+               if ((oldtuple = systable_getnext_ordered(sd, 
ForwardScanDirection)) != NULL)
+               {
+                       if (HeapTupleHasNulls(oldtuple))                /* 
paranoia */
+                               elog(ERROR, "null field found in 
pg_largeobject");
+                       olddata = (Form_pg_largeobject) GETSTRUCT(oldtuple);
+                       Assert(olddata->pageno >= pageno);
+               }
+       }
+       Assert(olddata == NULL || olddata->pageno >= pageno);
 
        /*
-        * If we found the page of the truncation point we need to truncate the
-        * data in it.  Otherwise if we're in a hole, we need to create a page 
to
-        * mark the end of data.
+        * If we are truncating to a page boundary (off == 0) and the previous
+        * page is full, remove all tuples beyond the truncation point.
+        * Otherwise, if we found the page of the truncation point we need to
+        * truncate the data in it.      Otherwise if we're in a hole, we need 
to
+        * create a page to mark the end of data.
         */
-       if (olddata != NULL && olddata->pageno == pageno)
+       if (off == 0 && prevpagefull)
+       {
+               if (olddata != NULL) {
+                       Assert(olddata->pageno >= pageno);
+                       simple_heap_delete(lo_heap_r, &oldtuple->t_self);
+               }
+       }
+       else if (olddata != NULL && olddata->pageno == pageno)
        {
-               /* First, load old data into workbuf */
                bytea      *datafield = &(olddata->data);               /* see 
note at top of
                                                                                
                                 * file */
                bool            pfreeit = false;
                int                     pagelen;
 
-               if (VARATT_IS_EXTENDED(datafield))
-               {
-                       datafield = (bytea *)
-                               heap_tuple_untoast_attr((struct varlena *) 
datafield);
-                       pfreeit = true;
-               }
-               pagelen = getbytealen(datafield);
-               Assert(pagelen <= LOBLKSIZE);
-               memcpy(workb, VARDATA(datafield), pagelen);
-               if (pfreeit)
-                       pfree(datafield);
+               if (getbytealen(datafield) != off) {
+                       /* First, load old data into workbuf */
+                       if (VARATT_IS_EXTENDED(datafield))
+                       {
+                               datafield = (bytea *)
+                                       heap_tuple_untoast_attr((struct varlena 
*) datafield);
+                               pfreeit = true;
+                       }
+                       pagelen = getbytealen(datafield);
+                       Assert(pagelen <= LOBLKSIZE);
+                       memcpy(workb, VARDATA(datafield), pagelen);
+                       if (pfreeit)
+                               pfree(datafield);
 
-               /*
-                * Fill any hole
-                */
-               off = len % LOBLKSIZE;
-               if (off > pagelen)
-                       MemSet(workb + pagelen, 0, off - pagelen);
+                       /*
+                        * Fill any hole
+                        */
+                       if (off > pagelen)
+                               MemSet(workb + pagelen, 0, off - pagelen);
 
-               /* compute length of new page */
-               SET_VARSIZE(&workbuf.hdr, off + VARHDRSZ);
+                       /* compute length of new page */
+                       SET_VARSIZE(&workbuf.hdr, off + VARHDRSZ);
 
-               /*
-                * Form and insert updated tuple
-                */
-               memset(values, 0, sizeof(values));
-               memset(nulls, false, sizeof(nulls));
-               memset(replace, false, sizeof(replace));
-               values[Anum_pg_largeobject_data - 1] = 
PointerGetDatum(&workbuf);
-               replace[Anum_pg_largeobject_data - 1] = true;
-               newtup = heap_modify_tuple(oldtuple, 
RelationGetDescr(lo_heap_r),
-                                                                  values, 
nulls, replace);
-               simple_heap_update(lo_heap_r, &newtup->t_self, newtup);
-               CatalogIndexInsert(indstate, newtup);
-               heap_freetuple(newtup);
+                       /*
+                        * Form and insert updated tuple
+                        */
+                       memset(values, 0, sizeof(values));
+                       memset(nulls, false, sizeof(nulls));
+                       memset(replace, false, sizeof(replace));
+                       values[Anum_pg_largeobject_data - 1] = 
PointerGetDatum(&workbuf);
+                       replace[Anum_pg_largeobject_data - 1] = true;
+                       newtup = heap_modify_tuple(oldtuple, 
RelationGetDescr(lo_heap_r),
+                                                                          
values, nulls, replace);
+                       simple_heap_update(lo_heap_r, &newtup->t_self, newtup);
+                       CatalogIndexInsert(indstate, newtup);
+                       heap_freetuple(newtup);
+               }
        }
        else
        {
@@ -858,7 +907,6 @@ inv_truncate(LargeObjectDesc *obj_desc, int len)
                 *
                 * Fill the hole up to the truncation point
                 */
-               off = len % LOBLKSIZE;
                if (off > 0)
                        MemSet(workb, 0, off);
 
-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to