Instead of relying on the git object_id hash to partition,
round-robin to these partitions based on the NNTP article
number.  This reduces the partition pipes as a source of
contention when two (or more) sequential messages end up
going to the same partition.
---
 lib/PublicInbox/V2Writable.pm | 34 +++++++++++++++++++++-------------
 1 file changed, 21 insertions(+), 13 deletions(-)

diff --git a/lib/PublicInbox/V2Writable.pm b/lib/PublicInbox/V2Writable.pm
index cb74ab1..cf19c76 100644
--- a/lib/PublicInbox/V2Writable.pm
+++ b/lib/PublicInbox/V2Writable.pm
@@ -17,6 +17,9 @@ $Email::MIME::ContentType::STRICT_PARAMS = 0;
 # an estimate of the post-packed size to the raw uncompressed size
 my $PACKING_FACTOR = 0.4;
 
+# assume 2 cores if GNU nproc(1) is not available
+my $NPROC = int($ENV{NPROC} || `nproc 2>/dev/null` || 2);
+
 sub new {
        my ($class, $v2ibx, $creat) = @_;
        my $dir = $v2ibx->{mainrepo} or die "no mainrepo in inbox\n";
@@ -33,7 +36,7 @@ sub new {
                im => undef, #  PublicInbox::Import
                xap_rw => undef, # PublicInbox::V2SearchIdx
                xap_ro => undef,
-               partitions => 4,
+               partitions => $NPROC,
                transact_bytes => 0,
                # limit each repo to 1GB or so
                rotate_bytes => int((1024 * 1024 * 1024) / $PACKING_FACTOR),
@@ -59,11 +62,11 @@ sub add {
        my $oid = $im->{last_object_id};
        my ($len, $msgref) = @{$im->{last_object}};
 
+       $self->idx_init;
+       my $num = $self->{all}->index_mm($mime);
        my $nparts = $self->{partitions};
-       my $part = hex(substr($oid, 0, 8)) % $nparts;
+       my $part = $num % $nparts;
        my $idx = $self->idx_part($part);
-       my $all = $self->{all};
-       my $num = $all->index_mm($mime);
        $idx->index_raw($len, $msgref, $num, $oid);
        my $n = $self->{transact_bytes} += $len;
        if ($n > (PublicInbox::SearchIdx::BATCH_BYTES * $nparts)) {
@@ -75,21 +78,23 @@ sub add {
 
 sub idx_part {
        my ($self, $part) = @_;
-       my $idx = $self->{idx_parts};
-       return $idx->[$part] if $idx; # fast path
+       $self->{idx_parts}->[$part];
+}
 
+sub idx_init {
+       my ($self) = @_;
+       return if $self->{idx_parts};
        # first time initialization:
-       my $all = $self->{all} = 
+       my $all = $self->{all} =
                PublicInbox::SearchIdxThread->new($self->{-inbox});
 
        # need to create all parts before initializing msgmap FD
        my $max = $self->{partitions} - 1;
-       $idx = $self->{idx_parts} = [];
+       my $idx = $self->{idx_parts} = [];
        for my $i (0..$max) {
                push @$idx, PublicInbox::SearchIdxPart->new($self, $i, $all);
        }
        $all->_msgmap_init->{dbh}->begin_work;
-       $idx->[$part];
 }
 
 sub remove {
@@ -127,10 +132,12 @@ sub searchidx_checkpoint {
        # order matters, we can only close {all} after all partitions
        # are done because the partitions also write to {all}
 
-       my $parts = $self->{idx_parts};
-       foreach my $idx (@$parts) {
-               $idx->remote_commit;
-               $idx->remote_close unless $more;
+       if (my $parts = $self->{idx_parts}) {
+               foreach my $idx (@$parts) {
+                       $idx->remote_commit;
+                       $idx->remote_close unless $more;
+               }
+               delete $self->{idx_parts} unless $more;
        }
 
        if (my $all = $self->{all}) {
@@ -140,6 +147,7 @@ sub searchidx_checkpoint {
                }
                $all->remote_commit;
                $all->remote_close unless $more;
+               delete $self->{all} unless $more;
        }
        $self->{transact_bytes} = 0;
 }
-- 
EW

--
unsubscribe: meta+unsubscr...@public-inbox.org
archive: https://public-inbox.org/meta/

Reply via email to