We only need the combined mset query when we care about sort
order. When writing to --output destinations intended for MUA
consumption, sort order is irrelevant as MUAs are expected to
offer their own sorting, so run queries to each external in
parallel.
This prepares us for docid-sort-based saved search support.
It will also become faster than the combined mset query for
users with many externals due to current Xapian exhibiting poor
performance with many shards (the same reason -extindex exists)
---
lib/PublicInbox/LeiXSearch.pm | 63 ++++++++++++++++++++---------------
1 file changed, 37 insertions(+), 26 deletions(-)
diff --git a/lib/PublicInbox/LeiXSearch.pm b/lib/PublicInbox/LeiXSearch.pm
index 692d5e54..9d367977 100644
--- a/lib/PublicInbox/LeiXSearch.pm
+++ b/lib/PublicInbox/LeiXSearch.pm
@@ -111,7 +111,7 @@ sub _mset_more ($$) {
}
# $startq will EOF when do_augment is done augmenting and allow
-# query_mset and query_thread_mset to proceed.
+# query_combined_mset and query_thread_mset to proceed.
sub wait_startq ($) {
my ($lei) = @_;
my $startq = delete $lei->{startq} or return;
@@ -144,9 +144,9 @@ sub mset_progress {
}
}
-sub query_thread_mset { # for --threads
+sub query_one_mset { # for --threads and l2m w/o sort
my ($self, $ibxish) = @_;
- local $0 = "$0 query_thread_mset";
+ local $0 = "$0 query_one_mset";
my $lei = $self->{lei};
my ($srch, $over) = ($ibxish->search, $ibxish->over);
my $desc = $ibxish->{inboxdir} // $ibxish->{topdir};
@@ -155,41 +155,51 @@ sub query_thread_mset { # for --threads
my $mset;
my $each_smsg = $lei->{ovv}->ovv_each_smsg_cb($lei);
my $can_kw = !!$ibxish->can('msg_keywords');
- my $fl = $lei->{opt}->{threads} > 1 ? 1 : undef;
+ my $threads = $lei->{opt}->{threads} // 0;
+ my $fl = $threads > 1 ? 1 : undef;
do {
$mset = $srch->mset($mo->{qstr}, $mo);
mset_progress($lei, $desc, $mset->size,
$mset->get_matches_estimated);
wait_startq($lei); # wait for keyword updates
my $ids = $srch->mset_to_artnums($mset, $mo);
- my $ctx = { ids => $ids };
my $i = 0;
- my %n2item = map { ($ids->[$i++], $_) } $mset->items;
- while ($over->expand_thread($ctx)) {
- for my $n (@{$ctx->{xids}}) {
- my $smsg = $over->get_art($n) or next;
- my $mitem = delete $n2item{$smsg->{num}};
- next if $smsg->{bytes} == 0;
- if ($mitem) {
- if ($can_kw) {
+ if ($threads) {
+ my $ctx = { ids => $ids };
+ my %n2item = map { ($ids->[$i++], $_) } $mset->items;
+ while ($over->expand_thread($ctx)) {
+ for my $n (@{$ctx->{xids}}) {
+ my $smsg = $over->get_art($n) or next;
+ my $mitem = delete $n2item{$n};
+ next if $smsg->{bytes} == 0;
+ if ($mitem && $can_kw) {
mitem_kw($smsg, $mitem, $fl);
- } elsif ($fl) {
+ } elsif ($mitem && $fl) {
# call ->xsmsg_vmd, later
$smsg->{lei_q_tt_flagged} = 1;
}
+ $each_smsg->($smsg, $mitem);
}
+ @{$ctx->{xids}} = ();
+ }
+ } else {
+ my @items = $mset->items;
+ for my $n (@$ids) {
+ my $mitem = $items[$i++];
+ my $smsg = $over->get_art($n) or next;
+ next if $smsg->{bytes} == 0;
+ mitem_kw($smsg, $mitem, $fl) if $can_kw;
$each_smsg->($smsg, $mitem);
}
- @{$ctx->{xids}} = ();
}
} while (_mset_more($mset, $mo));
- undef $each_smsg; # drops @io for l2m->{each_smsg_done}
+ undef $each_smsg; # may commit
$lei->{ovv}->ovv_atexit_child($lei);
}
-sub query_mset { # non-parallel for non-"--threads" users
+sub query_combined_mset { # non-parallel for non-"--threads" users
my ($self) = @_;
- local $0 = "$0 query_mset";
+ local $0 = "$0 query_combined_mset";
my $lei = $self->{lei};
my $mo = { %{$lei->{mset_opt}} };
my $mset;
@@ -207,7 +217,7 @@ sub query_mset { # non-parallel for non-"--threads" users
$each_smsg->($smsg, $mitem);
}
} while (_mset_more($mset, $mo));
- undef $each_smsg; # drops @io for l2m->{each_smsg_done}
+ undef $each_smsg; # may commit
$lei->{ovv}->ovv_atexit_child($lei);
}
@@ -379,14 +389,14 @@ sub concurrency {
$nl + $nr;
}
-sub start_query { # always runs in main (lei-daemon) process
- my ($self) = @_;
- if ($self->{threads}) {
+sub start_query ($;$) { # always runs in main (lei-daemon) process
+ my ($self, $l2m) = @_;
+ if ($self->{opt_threads} || ($l2m && !$self->{opt_sort})) {
for my $ibxish (locals($self)) {
- $self->wq_io_do('query_thread_mset', [], $ibxish);
+ $self->wq_io_do('query_one_mset', [], $ibxish);
}
} elsif (locals($self)) {
- $self->wq_io_do('query_mset', []);
+ $self->wq_io_do('query_combined_mset', []);
}
my $i = 0;
my $q = [];
@@ -402,7 +412,7 @@ sub start_query { # always runs in main (lei-daemon) process
sub incr_start_query { # called whenever an l2m shard starts do_post_auth
my ($self, $l2m) = @_;
return if ++$self->{nr_start_query} != $l2m->{-wq_nr_workers};
- start_query($self);
+ start_query($self, $l2m);
}
sub ipc_atfork_child {
@@ -448,7 +458,8 @@ sub do_query {
my $op_c = delete $lei->{pkt_op_c};
delete $lei->{pkt_op_p};
@$end = ();
- $self->{threads} = $lei->{opt}->{threads};
+ $self->{opt_threads} = $lei->{opt}->{threads};
+ $self->{opt_sort} = $lei->{opt}->{'sort'};
if ($l2m) {
$l2m->net_merge_complete unless $lei->{auth};
} else {
--
unsubscribe: one-click, see List-Unsubscribe header
archive: https://public-inbox.org/meta/