PG Gurus,

I have a table like this:

CREATE TABLE filemods (
  guid                  BIGINT NOT NULL UNIQUE,
  filepath_guid         BIGINT NOT NULL,
  createtime            TIMESTAMP WITH TIME ZONE DEFAULT NULL,
  writetime             TIMESTAMP WITH TIME ZONE DEFAULT NULL,
  deletetime            TIMESTAMP WITH TIME ZONE DEFAULT NULL,
);

One "event" might have (1, '2012-01-25 11:00:00', NULL, NULL) and
another event might have (1, NULL, '2012-01-25 11:05:00', NULL) and the
end result should be:
(1, '2012-01-25 11:00:00', '2012-01-25 11:05:00', NULL).


I'm trying to modify pg_bulkload to "merge" two rows together.  The
changes I have made seem to be working, although I would like input on
what I am doing that is unsafe or terribly wrong.  You can be brutal.

I've seen incredible write speed with using pg_bulkload.  If I can just
get it to "consolidate" our rows based on the unique key it will remove
a lot of complexity in our software.

Also, I'm not entirely sure this mailing list is the correct one, but
with all the internals you all know, I'm hoping you can help point out
nasty flaws in my algorithm.  This is the first legitimate attempt I
have made at modifying PG source, so I'm not real familiar with the
proper way of loading pages and tuples and updating heaps and all that
pg core stuff.

Here's the modifications to pg_btree.c (from pg_bulkload HEAD):

http://pastebin.com/U23CapvR

I also attached the patch.

Thank you!!

Ben


-- 
Benjamin Johnson
http://getcarbonblack.com/ | @getcarbonblack
cell: 312.933.3612
diff -r 3f065ec72ab8 pgbulkload/lib/pg_btree.c
--- a/pgbulkload/lib/pg_btree.c Fri Jan 20 09:26:20 2012 -0600
+++ b/pgbulkload/lib/pg_btree.c Wed Jan 25 13:37:43 2012 -0600
@@ -398,6 +398,8 @@
        BTReaderTerm(&reader);
 }
 
+void merge_tuples(Relation heap, IndexTuple itup_dst, IndexTuple itup_src);
+
 /*
  * _bt_mergeload - Merge two streams of index tuples into new index files.
  */
@@ -462,7 +464,6 @@
                                }
                                else
                                {
-                                       // TODO -- BSJ
                                        if (on_duplicate == 
ON_DUPLICATE_KEEP_NEW)
                                        {
                                                self->dup_old++;
@@ -470,7 +471,21 @@
                                                        
RelationGetRelationName(wstate->index));
                                                itup2 = 
BTReaderGetNextItem(btspool2);
                                        }
-                                       else 
+                                       else if (on_duplicate == 
ON_DUPLICATE_MERGE)
+                                       {
+                                               self->dup_old++;
+
+                                               // merge from itup into itup2 
where itup's col[i] is not null
+                                               // but itup2's col[i] IS null
+                                               merge_tuples(heapRel, itup2, 
itup);                             
+
+                                               ItemPointerCopy(&t_tid2, 
&itup2->t_tid);
+                                               self->dup_new++;
+                                               remove_duplicate(self, heapRel, 
itup,
+                                                       
RelationGetRelationName(wstate->index));
+                                               itup = 
BTSpoolGetNextItem(btspool, itup, &should_free);
+                                       }
+                                       else
                                        {
                                                ItemPointerCopy(&t_tid2, 
&itup2->t_tid);
                                                self->dup_new++;
@@ -950,6 +965,113 @@
                self->dup_old + self->dup_new, relname);
 }
 
+// returns Buffer after locking it (BUFFER_LOCK_SHARE then BUFFER_LOCK_UNLOCK)
+Buffer load_buffer(Relation heap, IndexTuple itup, HeapTupleData *tuple /*OUT 
*/, ItemId *itemid /*OUT */ )
+{
+       BlockNumber             blknum;
+       BlockNumber             offnum;
+       Buffer                  buffer;
+       Page                    page;
+
+       blknum = ItemPointerGetBlockNumber(&itup->t_tid);
+       offnum = ItemPointerGetOffsetNumber(&itup->t_tid);
+       buffer = ReadBuffer(heap, blknum);
+
+       LockBuffer(buffer, BUFFER_LOCK_SHARE);
+       page = BufferGetPage(buffer);
+       *itemid = PageGetItemId(page, offnum);
+       tuple->t_data = ItemIdIsNormal(*itemid)
+               ? (HeapTupleHeader) PageGetItem(page, *itemid)
+               : NULL;
+       LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
+       return buffer;
+}
+
+void load_tuple(Relation               heap, 
+                               HeapTuple               tuple, 
+                               IndexTuple              itup, 
+                               ItemId                  itemid,
+                               TupleDesc *             tupdesc         /* OUT 
*/,                              
+                               int *                   ncolumns        /* OUT 
*/, 
+                               Datum **                values          /* OUT 
*/, 
+                               bool **                 nulls           /* OUT 
*/)
+{
+       *tupdesc = RelationGetDescr(heap);
+
+       tuple->t_len = ItemIdGetLength(itemid);
+       tuple->t_self = itup->t_tid;
+
+       *ncolumns = (*tupdesc)->natts;
+       *values = (Datum *) palloc(*ncolumns * sizeof(Datum));
+       *nulls = (bool *) palloc(*ncolumns * sizeof(bool));
+
+       /* Break down the tuple into fields */
+       heap_deform_tuple(tuple, *tupdesc, *values, *nulls);
+}
+
+void merge_tuples(Relation heap, IndexTuple itup_dst, IndexTuple itup_src)
+{
+       HeapTupleData   tuple_src;
+       HeapTupleData   tuple_dst;
+       Buffer                  buffer_src;
+       Buffer                  buffer_dst;
+       ItemId                  itemid_src;
+       ItemId                  itemid_dst;
+
+       // load buffers
+       buffer_src = load_buffer(heap, itup_src, &tuple_src, &itemid_src);
+       buffer_dst = load_buffer(heap, itup_dst, &tuple_dst, &itemid_dst);
+
+       if (tuple_src.t_data != NULL)
+       {
+               int                     ncolumns_src, ncolumns_dst;
+               int                     i;
+               Datum           *values_src = NULL, *values_dst = NULL;
+               TupleDesc       tupdesc_dst, tupdesc_src;
+               bool            *nulls_src = NULL, *nulls_dst = NULL;           
                
+               bool            * do_replace = NULL;
+               bool            tuple_updated = false;
+               
+               // load source
+               load_tuple(heap, &tuple_src, itup_src, itemid_src, 
&tupdesc_src, &ncolumns_src, &values_src, &nulls_src);
+               
+               // load destination
+               load_tuple(heap, &tuple_dst, itup_dst, itemid_dst, 
&tupdesc_dst, &ncolumns_dst, &values_dst, &nulls_dst);
+
+               do_replace = (bool *) palloc(ncolumns_dst * sizeof(bool));
+
+               for (i = 0; i < ncolumns_dst && i < ncolumns_src; ++i)
+               {               
+                       do_replace[i] = false;
+
+                       if (nulls_dst[i] && !nulls_src[i])
+                       {
+                               values_dst[i] = values_src[i];
+                               nulls_dst[i] = nulls_src[i];
+                               do_replace[i] = true;
+
+                               // update new row
+                               tuple_updated = true;
+                       }
+               }
+               
+               if (tuple_updated)
+               {
+                       HeapTuple new_tuple = heap_modify_tuple(&tuple_dst, 
tupdesc_dst, values_dst, nulls_dst, do_replace);                    
+                       simple_heap_update(heap, &(tuple_dst.t_self), 
new_tuple);
+               }
+
+               pfree(do_replace);
+               pfree(values_src);
+               pfree(nulls_src);
+               pfree(values_dst);
+               pfree(nulls_dst);
+       }
+
+       ReleaseBuffer(buffer_src);
+       ReleaseBuffer(buffer_dst);
+}
+
 char *
 tuple_to_cstring(TupleDesc tupdesc, HeapTuple tuple)
 {
-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to