PG Gurus, I have a table like this:
CREATE TABLE filemods ( guid BIGINT NOT NULL UNIQUE, filepath_guid BIGINT NOT NULL, createtime TIMESTAMP WITH TIME ZONE DEFAULT NULL, writetime TIMESTAMP WITH TIME ZONE DEFAULT NULL, deletetime TIMESTAMP WITH TIME ZONE DEFAULT NULL, ); One "event" might have (1, '2012-01-25 11:00:00', NULL, NULL) and another event might have (1, NULL, '2012-01-25 11:05:00', NULL) and the end result should be: (1, '2012-01-25 11:00:00', '2012-01-25 11:05:00', NULL). I'm trying to modify pg_bulkload to "merge" two rows together. The changes I have made seem to be working, although I would like input on what I am doing that is unsafe or terribly wrong. You can be brutal. I've seen incredible write speed with using pg_bulkload. If I can just get it to "consolidate" our rows based on the unique key it will remove a lot of complexity in our software. Also, I'm not entirely sure this mailing list is the correct one, but with all the internals you all know, I'm hoping you can help point out nasty flaws in my algorithm. This is the first legitimate attempt I have made at modifying PG source, so I'm not real familiar with the proper way of loading pages and tuples and updating heaps and all that pg core stuff. Here's the modifications to pg_btree.c (from pg_bulkload HEAD): http://pastebin.com/U23CapvR I also attached the patch. Thank you!! Ben -- Benjamin Johnson http://getcarbonblack.com/ | @getcarbonblack cell: 312.933.3612
diff -r 3f065ec72ab8 pgbulkload/lib/pg_btree.c --- a/pgbulkload/lib/pg_btree.c Fri Jan 20 09:26:20 2012 -0600 +++ b/pgbulkload/lib/pg_btree.c Wed Jan 25 13:37:43 2012 -0600 @@ -398,6 +398,8 @@ BTReaderTerm(&reader); } +void merge_tuples(Relation heap, IndexTuple itup_dst, IndexTuple itup_src); + /* * _bt_mergeload - Merge two streams of index tuples into new index files. */ @@ -462,7 +464,6 @@ } else { - // TODO -- BSJ if (on_duplicate == ON_DUPLICATE_KEEP_NEW) { self->dup_old++; @@ -470,7 +471,21 @@ RelationGetRelationName(wstate->index)); itup2 = BTReaderGetNextItem(btspool2); } - else + else if (on_duplicate == ON_DUPLICATE_MERGE) + { + self->dup_old++; + + // merge from itup into itup2 where itup's col[i] is not null + // but itup2's col[i] IS null + merge_tuples(heapRel, itup2, itup); + + ItemPointerCopy(&t_tid2, &itup2->t_tid); + self->dup_new++; + remove_duplicate(self, heapRel, itup, + RelationGetRelationName(wstate->index)); + itup = BTSpoolGetNextItem(btspool, itup, &should_free); + } + else { ItemPointerCopy(&t_tid2, &itup2->t_tid); self->dup_new++; @@ -950,6 +965,113 @@ self->dup_old + self->dup_new, relname); } +// returns Buffer after locking it (BUFFER_LOCK_SHARE then BUFFER_LOCK_UNLOCK) +Buffer load_buffer(Relation heap, IndexTuple itup, HeapTupleData *tuple /*OUT */, ItemId *itemid /*OUT */ ) +{ + BlockNumber blknum; + BlockNumber offnum; + Buffer buffer; + Page page; + + blknum = ItemPointerGetBlockNumber(&itup->t_tid); + offnum = ItemPointerGetOffsetNumber(&itup->t_tid); + buffer = ReadBuffer(heap, blknum); + + LockBuffer(buffer, BUFFER_LOCK_SHARE); + page = BufferGetPage(buffer); + *itemid = PageGetItemId(page, offnum); + tuple->t_data = ItemIdIsNormal(*itemid) + ? (HeapTupleHeader) PageGetItem(page, *itemid) + : NULL; + LockBuffer(buffer, BUFFER_LOCK_UNLOCK); + return buffer; +} + +void load_tuple(Relation heap, + HeapTuple tuple, + IndexTuple itup, + ItemId itemid, + TupleDesc * tupdesc /* OUT */, + int * ncolumns /* OUT */, + Datum ** values /* OUT */, + bool ** nulls /* OUT */) +{ + *tupdesc = RelationGetDescr(heap); + + tuple->t_len = ItemIdGetLength(itemid); + tuple->t_self = itup->t_tid; + + *ncolumns = (*tupdesc)->natts; + *values = (Datum *) palloc(*ncolumns * sizeof(Datum)); + *nulls = (bool *) palloc(*ncolumns * sizeof(bool)); + + /* Break down the tuple into fields */ + heap_deform_tuple(tuple, *tupdesc, *values, *nulls); +} + +void merge_tuples(Relation heap, IndexTuple itup_dst, IndexTuple itup_src) +{ + HeapTupleData tuple_src; + HeapTupleData tuple_dst; + Buffer buffer_src; + Buffer buffer_dst; + ItemId itemid_src; + ItemId itemid_dst; + + // load buffers + buffer_src = load_buffer(heap, itup_src, &tuple_src, &itemid_src); + buffer_dst = load_buffer(heap, itup_dst, &tuple_dst, &itemid_dst); + + if (tuple_src.t_data != NULL) + { + int ncolumns_src, ncolumns_dst; + int i; + Datum *values_src = NULL, *values_dst = NULL; + TupleDesc tupdesc_dst, tupdesc_src; + bool *nulls_src = NULL, *nulls_dst = NULL; + bool * do_replace = NULL; + bool tuple_updated = false; + + // load source + load_tuple(heap, &tuple_src, itup_src, itemid_src, &tupdesc_src, &ncolumns_src, &values_src, &nulls_src); + + // load destination + load_tuple(heap, &tuple_dst, itup_dst, itemid_dst, &tupdesc_dst, &ncolumns_dst, &values_dst, &nulls_dst); + + do_replace = (bool *) palloc(ncolumns_dst * sizeof(bool)); + + for (i = 0; i < ncolumns_dst && i < ncolumns_src; ++i) + { + do_replace[i] = false; + + if (nulls_dst[i] && !nulls_src[i]) + { + values_dst[i] = values_src[i]; + nulls_dst[i] = nulls_src[i]; + do_replace[i] = true; + + // update new row + tuple_updated = true; + } + } + + if (tuple_updated) + { + HeapTuple new_tuple = heap_modify_tuple(&tuple_dst, tupdesc_dst, values_dst, nulls_dst, do_replace); + simple_heap_update(heap, &(tuple_dst.t_self), new_tuple); + } + + pfree(do_replace); + pfree(values_src); + pfree(nulls_src); + pfree(values_dst); + pfree(nulls_dst); + } + + ReleaseBuffer(buffer_src); + ReleaseBuffer(buffer_dst); +} + char * tuple_to_cstring(TupleDesc tupdesc, HeapTuple tuple) {
-- Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org) To make changes to your subscription: http://www.postgresql.org/mailpref/pgsql-hackers