hi, > I wrote: >> y...@mwd.biglobe.ne.jp (YAMAMOTO Takashi) writes: >>> after systable_getnext_ordered returned NULL, is it ok to call it again? > >> I wouldn't rely on it working. > >>> i'm wondering because inv_truncate seems to do it and expecting NULL. > >> Hmm, that may well be a bug. Have you tested it? > > I looked at this a bit more closely, and basically the answer is that > inv_truncate is accidentally failing to fail. What will actually happen > if systable_getnext_ordered is called another time, at least when using > a btree index, is that it will start the same requested scan over again, > ie deliver all the tuples it did the first time (which is none, in this > case). That's an implementation artifact, but since the behavior is > undefined in the first place, it's not wrong. > > Now, if inv_truncate's initial call on systable_getnext_ordered returns > NULL (ie, the truncation point is past the current EOF page), it will > fall through to the "Write a brand new page" code, which will create and > insert a partial page at the truncation point. It then goes to the > delete-all-remaining-pages loop. Because that starts a fresh scan with > the very same scan key conditions, you might expect that it would find > and delete the page it just inserted --- causing the apparent EOF of the > blob to be wrong afterwards. It accidentally fails to do that because > the new tuple postdates the snapshot it's scanning with. So the loop > terminates having found no matching tuples, and all is well. > > So this code is confusing, inefficient (performing a useless search of > the index), only works because of an obscure consideration not explained > in the comments, and sets a bad precedent for people to follow. I'm > going to go change it to explicitly not do the final loop if the initial > search failed. It's not a bug, exactly, but it's sure lousy coding. > Thanks for pointing it out.
thanks for the quick investigation and fix. the attached patch is to avoid unnecessary detoast'ing and EOF marker pages when possible. does it make sense? YAMAMOTO Takashi > > regards, tom lane > > -- > Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org) > To make changes to your subscription: > http://www.postgresql.org/mailpref/pgsql-hackers
diff --git a/src/backend/storage/large_object/inv_api.c b/src/backend/storage/large_object/inv_api.c index 01e3492..60a9c69 100644 --- a/src/backend/storage/large_object/inv_api.c +++ b/src/backend/storage/large_object/inv_api.c @@ -178,10 +178,14 @@ myLargeObjectExists(Oid loid, Snapshot snapshot) static int32 getbytealen(bytea *data) { - Assert(!VARATT_IS_EXTENDED(data)); - if (VARSIZE(data) < VARHDRSZ) - elog(ERROR, "invalid VARSIZE(data)"); - return (VARSIZE(data) - VARHDRSZ); + Size size; + + size = toast_raw_datum_size(PointerGetDatum(data)); + if (size < VARHDRSZ) + elog(ERROR, "invalid size"); + if (size > VARHDRSZ + LOBLKSIZE) + elog(ERROR, "too large page"); + return (size - VARHDRSZ); } @@ -359,22 +363,12 @@ inv_getsize(LargeObjectDesc *obj_desc) { Form_pg_largeobject data; bytea *datafield; - bool pfreeit; if (HeapTupleHasNulls(tuple)) /* paranoia */ elog(ERROR, "null field found in pg_largeobject"); data = (Form_pg_largeobject) GETSTRUCT(tuple); datafield = &(data->data); /* see note at top of file */ - pfreeit = false; - if (VARATT_IS_EXTENDED(datafield)) - { - datafield = (bytea *) - heap_tuple_untoast_attr((struct varlena *) datafield); - pfreeit = true; - } lastbyte = data->pageno * LOBLKSIZE + getbytealen(datafield); - if (pfreeit) - pfree(datafield); } systable_endscan_ordered(sd); @@ -724,8 +718,9 @@ inv_write(LargeObjectDesc *obj_desc, const char *buf, int nbytes) void inv_truncate(LargeObjectDesc *obj_desc, int len) { - int32 pageno = (int32) (len / LOBLKSIZE); - int off; + const int32 pageno = (int32) (len / LOBLKSIZE); + int32 reqpageno; + const int off = len % LOBLKSIZE; /* offset in the page */ ScanKeyData skey[2]; SysScanDesc sd; HeapTuple oldtuple; @@ -741,6 +736,7 @@ inv_truncate(LargeObjectDesc *obj_desc, int len) Datum values[Natts_pg_largeobject]; bool nulls[Natts_pg_largeobject]; bool replace[Natts_pg_largeobject]; + bool prevpagefull; CatalogIndexState indstate; Assert(PointerIsValid(obj_desc)); @@ -770,10 +766,20 @@ inv_truncate(LargeObjectDesc *obj_desc, int len) BTEqualStrategyNumber, F_OIDEQ, ObjectIdGetDatum(obj_desc->id)); + /* + * Request the previous page as well, if we are truncating to the non-0 + * multiple of LOBLKSIZE, so that we can check if a 0-sized EOF marker page + * is necessary or not. + */ + if (off == 0 && pageno > 0) + reqpageno = pageno - 1; + else + reqpageno = pageno; + ScanKeyInit(&skey[1], Anum_pg_largeobject_pageno, BTGreaterEqualStrategyNumber, F_INT4GE, - Int32GetDatum(pageno)); + Int32GetDatum(reqpageno)); sd = systable_beginscan_ordered(lo_heap_r, lo_index_r, obj_desc->snapshot, 2, skey); @@ -781,6 +787,9 @@ inv_truncate(LargeObjectDesc *obj_desc, int len) /* * If possible, get the page the truncation point is in. The truncation * point may be beyond the end of the LO or in a hole. + * + * This can be the previous page in case we are truncating to the non-0 + * multiple of LOBLKSIZE. */ olddata = NULL; if ((oldtuple = systable_getnext_ordered(sd, ForwardScanDirection)) != NULL) @@ -788,57 +797,97 @@ inv_truncate(LargeObjectDesc *obj_desc, int len) if (HeapTupleHasNulls(oldtuple)) /* paranoia */ elog(ERROR, "null field found in pg_largeobject"); olddata = (Form_pg_largeobject) GETSTRUCT(oldtuple); - Assert(olddata->pageno >= pageno); + Assert(olddata->pageno >= reqpageno); } + /* + * Check if the previous page is full. + * In that case, we don't need to create the 0-sized EOF-marker page. + * + * It's safe to consider the page before pageno 0 full for our purpose. + */ + prevpagefull = pageno == 0; + if (olddata != NULL && olddata->pageno + 1 == pageno) + { + bytea *datafield = &(olddata->data); /* see note at top of + * file */ + Assert(pageno > 0); + Assert(off == 0); + if (getbytealen(datafield) == LOBLKSIZE) + { + prevpagefull = true; + } + /* + * Get the next page. + */ + olddata = NULL; + if ((oldtuple = systable_getnext_ordered(sd, ForwardScanDirection)) != NULL) + { + if (HeapTupleHasNulls(oldtuple)) /* paranoia */ + elog(ERROR, "null field found in pg_largeobject"); + olddata = (Form_pg_largeobject) GETSTRUCT(oldtuple); + Assert(olddata->pageno >= pageno); + } + } + Assert(olddata == NULL || olddata->pageno >= pageno); /* - * If we found the page of the truncation point we need to truncate the - * data in it. Otherwise if we're in a hole, we need to create a page to - * mark the end of data. + * If we are truncating to a page boundary (off == 0) and the previous + * page is full, remove all tuples beyond the truncation point. + * Otherwise, if we found the page of the truncation point we need to + * truncate the data in it. Otherwise if we're in a hole, we need to + * create a page to mark the end of data. */ - if (olddata != NULL && olddata->pageno == pageno) + if (off == 0 && prevpagefull) + { + if (olddata != NULL) { + Assert(olddata->pageno >= pageno); + simple_heap_delete(lo_heap_r, &oldtuple->t_self); + } + } + else if (olddata != NULL && olddata->pageno == pageno) { - /* First, load old data into workbuf */ bytea *datafield = &(olddata->data); /* see note at top of * file */ bool pfreeit = false; int pagelen; - if (VARATT_IS_EXTENDED(datafield)) - { - datafield = (bytea *) - heap_tuple_untoast_attr((struct varlena *) datafield); - pfreeit = true; - } - pagelen = getbytealen(datafield); - Assert(pagelen <= LOBLKSIZE); - memcpy(workb, VARDATA(datafield), pagelen); - if (pfreeit) - pfree(datafield); + if (getbytealen(datafield) != off) { + /* First, load old data into workbuf */ + if (VARATT_IS_EXTENDED(datafield)) + { + datafield = (bytea *) + heap_tuple_untoast_attr((struct varlena *) datafield); + pfreeit = true; + } + pagelen = getbytealen(datafield); + Assert(pagelen <= LOBLKSIZE); + memcpy(workb, VARDATA(datafield), pagelen); + if (pfreeit) + pfree(datafield); - /* - * Fill any hole - */ - off = len % LOBLKSIZE; - if (off > pagelen) - MemSet(workb + pagelen, 0, off - pagelen); + /* + * Fill any hole + */ + if (off > pagelen) + MemSet(workb + pagelen, 0, off - pagelen); - /* compute length of new page */ - SET_VARSIZE(&workbuf.hdr, off + VARHDRSZ); + /* compute length of new page */ + SET_VARSIZE(&workbuf.hdr, off + VARHDRSZ); - /* - * Form and insert updated tuple - */ - memset(values, 0, sizeof(values)); - memset(nulls, false, sizeof(nulls)); - memset(replace, false, sizeof(replace)); - values[Anum_pg_largeobject_data - 1] = PointerGetDatum(&workbuf); - replace[Anum_pg_largeobject_data - 1] = true; - newtup = heap_modify_tuple(oldtuple, RelationGetDescr(lo_heap_r), - values, nulls, replace); - simple_heap_update(lo_heap_r, &newtup->t_self, newtup); - CatalogIndexInsert(indstate, newtup); - heap_freetuple(newtup); + /* + * Form and insert updated tuple + */ + memset(values, 0, sizeof(values)); + memset(nulls, false, sizeof(nulls)); + memset(replace, false, sizeof(replace)); + values[Anum_pg_largeobject_data - 1] = PointerGetDatum(&workbuf); + replace[Anum_pg_largeobject_data - 1] = true; + newtup = heap_modify_tuple(oldtuple, RelationGetDescr(lo_heap_r), + values, nulls, replace); + simple_heap_update(lo_heap_r, &newtup->t_self, newtup); + CatalogIndexInsert(indstate, newtup); + heap_freetuple(newtup); + } } else { @@ -858,7 +907,6 @@ inv_truncate(LargeObjectDesc *obj_desc, int len) * * Fill the hole up to the truncation point */ - off = len % LOBLKSIZE; if (off > 0) MemSet(workb, 0, off);
-- Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org) To make changes to your subscription: http://www.postgresql.org/mailpref/pgsql-hackers