[GitHub] [lucene-solr] mikemccand commented on a change in pull request #854: Shared PQ Based Early Termination for Concurrent Search

2019-09-11 Thread GitBox
mikemccand commented on a change in pull request #854: Shared PQ Based Early 
Termination for Concurrent Search
URL: https://github.com/apache/lucene-solr/pull/854#discussion_r323174839
 
 

 ##
 File path: lucene/core/src/java/org/apache/lucene/search/Collector.java
 ##
 @@ -77,4 +77,12 @@
* Indicates what features are required from the scorer.
*/
   ScoreMode scoreMode();
+
+  /**
+   * Indicates that input has ended for the collector. This allows the 
collector to perform
+   * post processing (if any).
+   */
+  default void postProcess() {
 
 Review comment:
   Could we rename this to something like `finishLeaf`?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: dev-unsubscr...@lucene.apache.org
For additional commands, e-mail: dev-h...@lucene.apache.org



[GitHub] [lucene-solr] mikemccand commented on a change in pull request #854: Shared PQ Based Early Termination for Concurrent Search

2019-09-11 Thread GitBox
mikemccand commented on a change in pull request #854: Shared PQ Based Early 
Termination for Concurrent Search
URL: https://github.com/apache/lucene-solr/pull/854#discussion_r323176113
 
 

 ##
 File path: lucene/core/src/java/org/apache/lucene/search/TopFieldCollector.java
 ##
 @@ -279,7 +297,130 @@ public void collect(int doc) throws IOException {
 
   }
 
-  private static final ScoreDoc[] EMPTY_SCOREDOCS = new ScoreDoc[0];
+  /*
+   * Collects hits into a local queue until the requested number of hits are 
collected
+   * globally. Post that, a global calibration step is performed
+   */
+   public static class EarlyTerminatingFieldCollector extends 
TopFieldCollector {
+
+final Sort sort;
+final FieldValueHitQueue queue;
+final EarlyTerminatingFieldCollectorManager 
earlyTerminatingFieldCollectorManager;
+private final AtomicInteger globalNumberOfHits;
+private boolean addedSelfToGlobalQueue;
+
+//TODO: Refactor this to make an interface only for field collector uses
+public EarlyTerminatingFieldCollector(Sort sort, FieldValueHitQueue 
queue, int numHits, int totalHitsThreshold,
+  
EarlyTerminatingFieldCollectorManager collectorManager, AtomicInteger 
globalNumberOfHits) {
+  super(queue, numHits, totalHitsThreshold, sort.needsScores());
+  this.sort = sort;
+  this.queue = queue;
+  this.earlyTerminatingFieldCollectorManager = collectorManager;
+  this.globalNumberOfHits = globalNumberOfHits;
+}
+
+@Override
+public LeafCollector getLeafCollector(LeafReaderContext context) throws 
IOException {
+  docBase = context.docBase;
+
+  final LeafFieldComparator[] comparators = queue.getComparators(context);
+  final int[] reverseMul = queue.getReverseMul();
+  final Sort indexSort = context.reader().getMetaData().getSort();
+  final boolean canEarlyTerminate = canEarlyTerminate(sort, indexSort);
+
+  return new EarlyTerminatingMultiComparatorLeafCollector(comparators, 
reverseMul, this) {
+
+boolean collectedAllCompetitiveHits = false;
+
+@Override
+public void setScorer(Scorable scorer) throws IOException {
+  super.setScorer(scorer);
+  updateMinCompetitiveScore(scorer);
+}
+
+@Override
+public void collect(int doc) throws IOException {
+
+  if (globalNumberOfHits.incrementAndGet() > numHits) {
+if (addedSelfToGlobalQueue == false) {
+  Entry returnedEntry = 
earlyTerminatingFieldCollectorManager.addCollectorToGlobalQueue(earlyTerminatingFieldCollector,
 docBase);
+
+  if (returnedEntry != null) {
+filterCompetitiveHit(returnedEntry.doc, false, 
returnedEntry.values);
+
+if (queue.size() > 0) {
+  Entry entry = queue.pop();
+
+  while (entry != null) {
+filterCompetitiveHit(entry.doc, false, entry.values);
+entry = queue.pop();
+  }
+}
+  }
+  addedSelfToGlobalQueue = true;
 
 Review comment:
   I wish there were some way to swap in a different collector implementation 
once we absorb ourselves into the global queue ... the behavior of this 
collector is so modal depending on that.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: dev-unsubscr...@lucene.apache.org
For additional commands, e-mail: dev-h...@lucene.apache.org



[GitHub] [lucene-solr] mikemccand commented on a change in pull request #854: Shared PQ Based Early Termination for Concurrent Search

2019-09-11 Thread GitBox
mikemccand commented on a change in pull request #854: Shared PQ Based Early 
Termination for Concurrent Search
URL: https://github.com/apache/lucene-solr/pull/854#discussion_r323175760
 
 

 ##
 File path: lucene/core/src/java/org/apache/lucene/search/TopFieldCollector.java
 ##
 @@ -279,7 +297,130 @@ public void collect(int doc) throws IOException {
 
   }
 
-  private static final ScoreDoc[] EMPTY_SCOREDOCS = new ScoreDoc[0];
+  /*
+   * Collects hits into a local queue until the requested number of hits are 
collected
+   * globally. Post that, a global calibration step is performed
+   */
+   public static class EarlyTerminatingFieldCollector extends 
TopFieldCollector {
+
+final Sort sort;
+final FieldValueHitQueue queue;
+final EarlyTerminatingFieldCollectorManager 
earlyTerminatingFieldCollectorManager;
+private final AtomicInteger globalNumberOfHits;
+private boolean addedSelfToGlobalQueue;
+
+//TODO: Refactor this to make an interface only for field collector uses
+public EarlyTerminatingFieldCollector(Sort sort, FieldValueHitQueue 
queue, int numHits, int totalHitsThreshold,
+  
EarlyTerminatingFieldCollectorManager collectorManager, AtomicInteger 
globalNumberOfHits) {
+  super(queue, numHits, totalHitsThreshold, sort.needsScores());
+  this.sort = sort;
+  this.queue = queue;
+  this.earlyTerminatingFieldCollectorManager = collectorManager;
+  this.globalNumberOfHits = globalNumberOfHits;
+}
+
+@Override
+public LeafCollector getLeafCollector(LeafReaderContext context) throws 
IOException {
+  docBase = context.docBase;
+
+  final LeafFieldComparator[] comparators = queue.getComparators(context);
+  final int[] reverseMul = queue.getReverseMul();
+  final Sort indexSort = context.reader().getMetaData().getSort();
+  final boolean canEarlyTerminate = canEarlyTerminate(sort, indexSort);
+
+  return new EarlyTerminatingMultiComparatorLeafCollector(comparators, 
reverseMul, this) {
+
+boolean collectedAllCompetitiveHits = false;
+
+@Override
+public void setScorer(Scorable scorer) throws IOException {
+  super.setScorer(scorer);
+  updateMinCompetitiveScore(scorer);
+}
+
+@Override
+public void collect(int doc) throws IOException {
+
+  if (globalNumberOfHits.incrementAndGet() > numHits) {
+if (addedSelfToGlobalQueue == false) {
+  Entry returnedEntry = 
earlyTerminatingFieldCollectorManager.addCollectorToGlobalQueue(earlyTerminatingFieldCollector,
 docBase);
+
+  if (returnedEntry != null) {
+filterCompetitiveHit(returnedEntry.doc, false, 
returnedEntry.values);
+
+if (queue.size() > 0) {
+  Entry entry = queue.pop();
+
+  while (entry != null) {
+filterCompetitiveHit(entry.doc, false, entry.values);
+entry = queue.pop();
+  }
+}
+  }
+  addedSelfToGlobalQueue = true;
+}
+
+filterCompetitiveHit(doc, true, comparator.leafValue(doc));
+  } else {
+// Startup transient: queue hasn't gathered numHits yet
+int slot = totalHits;
+++totalHits;
+
+comparator.copy(slot, doc);
+add(slot, doc, comparator.leafValue(doc));
+  }
+}
+
+private void filterCompetitiveHit(int doc, boolean doEarlyTermination, 
Object value) throws IOException {
+  if (collectedAllCompetitiveHits || 
earlyTerminatingFieldCollectorManager.compareAndUpdateBottom(docBase, doc, 
value) <= 0) {
+// since docs are visited in doc Id order, if compare is 0, it 
means
+// this document is largest than anything else in the queue, and
+// therefore not competitive.
+if (canEarlyTerminate) {
+  if ((globalNumberOfHits.getAcquire() > totalHitsThreshold) && 
doEarlyTermination) {
+totalHitsRelation = Relation.GREATER_THAN_OR_EQUAL_TO;
+throw new CollectionTerminatedException();
+  } else {
+collectedAllCompetitiveHits = true;
+  }
+} else if (totalHitsRelation == Relation.EQUAL_TO) {
+  // we just reached totalHitsThreshold, we can start setting the 
min
+  // competitive score now
+  updateMinCompetitiveScore(scorer);
+}
+return;
+  }
+
+  updateMinCompetitiveScore(scorer);
 
 Review comment:
   Why can we `updateMinCompetitiveScore` here?  The global queue may not yet 
be full at this point?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific 

[GitHub] [lucene-solr] mikemccand commented on a change in pull request #854: Shared PQ Based Early Termination for Concurrent Search

2019-09-11 Thread GitBox
mikemccand commented on a change in pull request #854: Shared PQ Based Early 
Termination for Concurrent Search
URL: https://github.com/apache/lucene-solr/pull/854#discussion_r323176515
 
 

 ##
 File path: lucene/core/src/java/org/apache/lucene/search/TopFieldCollector.java
 ##
 @@ -279,7 +297,130 @@ public void collect(int doc) throws IOException {
 
   }
 
-  private static final ScoreDoc[] EMPTY_SCOREDOCS = new ScoreDoc[0];
+  /*
+   * Collects hits into a local queue until the requested number of hits are 
collected
+   * globally. Post that, a global calibration step is performed
+   */
+   public static class EarlyTerminatingFieldCollector extends 
TopFieldCollector {
+
+final Sort sort;
+final FieldValueHitQueue queue;
+final EarlyTerminatingFieldCollectorManager 
earlyTerminatingFieldCollectorManager;
+private final AtomicInteger globalNumberOfHits;
+private boolean addedSelfToGlobalQueue;
+
+//TODO: Refactor this to make an interface only for field collector uses
+public EarlyTerminatingFieldCollector(Sort sort, FieldValueHitQueue 
queue, int numHits, int totalHitsThreshold,
+  
EarlyTerminatingFieldCollectorManager collectorManager, AtomicInteger 
globalNumberOfHits) {
+  super(queue, numHits, totalHitsThreshold, sort.needsScores());
+  this.sort = sort;
+  this.queue = queue;
+  this.earlyTerminatingFieldCollectorManager = collectorManager;
+  this.globalNumberOfHits = globalNumberOfHits;
+}
+
+@Override
+public LeafCollector getLeafCollector(LeafReaderContext context) throws 
IOException {
+  docBase = context.docBase;
+
+  final LeafFieldComparator[] comparators = queue.getComparators(context);
+  final int[] reverseMul = queue.getReverseMul();
+  final Sort indexSort = context.reader().getMetaData().getSort();
+  final boolean canEarlyTerminate = canEarlyTerminate(sort, indexSort);
+
+  return new EarlyTerminatingMultiComparatorLeafCollector(comparators, 
reverseMul, this) {
+
+boolean collectedAllCompetitiveHits = false;
+
+@Override
+public void setScorer(Scorable scorer) throws IOException {
+  super.setScorer(scorer);
+  updateMinCompetitiveScore(scorer);
+}
+
+@Override
+public void collect(int doc) throws IOException {
+
+  if (globalNumberOfHits.incrementAndGet() > numHits) {
+if (addedSelfToGlobalQueue == false) {
+  Entry returnedEntry = 
earlyTerminatingFieldCollectorManager.addCollectorToGlobalQueue(earlyTerminatingFieldCollector,
 docBase);
+
+  if (returnedEntry != null) {
+filterCompetitiveHit(returnedEntry.doc, false, 
returnedEntry.values);
+
+if (queue.size() > 0) {
+  Entry entry = queue.pop();
+
+  while (entry != null) {
+filterCompetitiveHit(entry.doc, false, entry.values);
+entry = queue.pop();
+  }
+}
+  }
+  addedSelfToGlobalQueue = true;
+}
+
+filterCompetitiveHit(doc, true, comparator.leafValue(doc));
+  } else {
+// Startup transient: queue hasn't gathered numHits yet
+int slot = totalHits;
+++totalHits;
+
+comparator.copy(slot, doc);
+add(slot, doc, comparator.leafValue(doc));
 
 Review comment:
   The fact that we need `leafValue` on every insert will make this threaded 
implementation more costly ... but that is a good tradeoff for use case that 
benefit from multi-threaded search.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: dev-unsubscr...@lucene.apache.org
For additional commands, e-mail: dev-h...@lucene.apache.org



[GitHub] [lucene-solr] mikemccand commented on a change in pull request #854: Shared PQ Based Early Termination for Concurrent Search

2019-09-11 Thread GitBox
mikemccand commented on a change in pull request #854: Shared PQ Based Early 
Termination for Concurrent Search
URL: https://github.com/apache/lucene-solr/pull/854#discussion_r323175031
 
 

 ##
 File path: lucene/core/src/java/org/apache/lucene/search/TopFieldCollector.java
 ##
 @@ -279,7 +297,130 @@ public void collect(int doc) throws IOException {
 
   }
 
-  private static final ScoreDoc[] EMPTY_SCOREDOCS = new ScoreDoc[0];
+  /*
+   * Collects hits into a local queue until the requested number of hits are 
collected
+   * globally. Post that, a global calibration step is performed
+   */
+   public static class EarlyTerminatingFieldCollector extends 
TopFieldCollector {
+
+final Sort sort;
+final FieldValueHitQueue queue;
+final EarlyTerminatingFieldCollectorManager 
earlyTerminatingFieldCollectorManager;
+private final AtomicInteger globalNumberOfHits;
+private boolean addedSelfToGlobalQueue;
+
+//TODO: Refactor this to make an interface only for field collector uses
+public EarlyTerminatingFieldCollector(Sort sort, FieldValueHitQueue 
queue, int numHits, int totalHitsThreshold,
+  
EarlyTerminatingFieldCollectorManager collectorManager, AtomicInteger 
globalNumberOfHits) {
+  super(queue, numHits, totalHitsThreshold, sort.needsScores());
+  this.sort = sort;
+  this.queue = queue;
+  this.earlyTerminatingFieldCollectorManager = collectorManager;
+  this.globalNumberOfHits = globalNumberOfHits;
+}
+
+@Override
+public LeafCollector getLeafCollector(LeafReaderContext context) throws 
IOException {
+  docBase = context.docBase;
+
+  final LeafFieldComparator[] comparators = queue.getComparators(context);
+  final int[] reverseMul = queue.getReverseMul();
+  final Sort indexSort = context.reader().getMetaData().getSort();
+  final boolean canEarlyTerminate = canEarlyTerminate(sort, indexSort);
+
+  return new EarlyTerminatingMultiComparatorLeafCollector(comparators, 
reverseMul, this) {
+
+boolean collectedAllCompetitiveHits = false;
+
+@Override
+public void setScorer(Scorable scorer) throws IOException {
+  super.setScorer(scorer);
+  updateMinCompetitiveScore(scorer);
+}
+
+@Override
+public void collect(int doc) throws IOException {
+
+  if (globalNumberOfHits.incrementAndGet() > numHits) {
+if (addedSelfToGlobalQueue == false) {
+  Entry returnedEntry = 
earlyTerminatingFieldCollectorManager.addCollectorToGlobalQueue(earlyTerminatingFieldCollector,
 docBase);
+
+  if (returnedEntry != null) {
+filterCompetitiveHit(returnedEntry.doc, false, 
returnedEntry.values);
+
+if (queue.size() > 0) {
+  Entry entry = queue.pop();
+
+  while (entry != null) {
+filterCompetitiveHit(entry.doc, false, entry.values);
+entry = queue.pop();
+  }
+}
+  }
+  addedSelfToGlobalQueue = true;
+}
+
+filterCompetitiveHit(doc, true, comparator.leafValue(doc));
+  } else {
+// Startup transient: queue hasn't gathered numHits yet
+int slot = totalHits;
+++totalHits;
+
+comparator.copy(slot, doc);
+add(slot, doc, comparator.leafValue(doc));
+  }
+}
+
+private void filterCompetitiveHit(int doc, boolean doEarlyTermination, 
Object value) throws IOException {
+  if (collectedAllCompetitiveHits || 
earlyTerminatingFieldCollectorManager.compareAndUpdateBottom(docBase, doc, 
value) <= 0) {
+// since docs are visited in doc Id order, if compare is 0, it 
means
+// this document is largest than anything else in the queue, and
+// therefore not competitive.
+if (canEarlyTerminate) {
+  if ((globalNumberOfHits.getAcquire() > totalHitsThreshold) && 
doEarlyTermination) {
+totalHitsRelation = Relation.GREATER_THAN_OR_EQUAL_TO;
+throw new CollectionTerminatedException();
+  } else {
+collectedAllCompetitiveHits = true;
+  }
+} else if (totalHitsRelation == Relation.EQUAL_TO) {
+  // we just reached totalHitsThreshold, we can start setting the 
min
+  // competitive score now
+  updateMinCompetitiveScore(scorer);
+}
+return;
+  }
+
+  updateMinCompetitiveScore(scorer);
+}
+  };
+}
+
+@Override
+public void postProcess() {
+  if (addedSelfToGlobalQueue == false) {
+Entry returnedEntry = 
earlyTerminatingFieldCollectorManager.addCollectorToGlobalQueue(this, docBase);
+
+if (returnedEntry != null) {
+  if (returnedEntry != null) {
 
 Review comment:
   

[GitHub] [lucene-solr] mikemccand commented on a change in pull request #854: Shared PQ Based Early Termination for Concurrent Search

2019-09-11 Thread GitBox
mikemccand commented on a change in pull request #854: Shared PQ Based Early 
Termination for Concurrent Search
URL: https://github.com/apache/lucene-solr/pull/854#discussion_r322910021
 
 

 ##
 File path: 
lucene/core/src/java/org/apache/lucene/search/EarlyTerminatingFieldCollectorManager.java
 ##
 @@ -0,0 +1,210 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.lucene.search;
+
+import java.util.Collection;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.ReentrantLock;
+
+import static org.apache.lucene.search.TopFieldCollector.EMPTY_SCOREDOCS;
+
+/**
+ * CollectorManager which allows early termination across multiple slices
+ * when the index sort key and the query sort key are the same
+ */
+public class EarlyTerminatingFieldCollectorManager implements 
CollectorManager {
+  private final Sort sort;
+  private final int numHits;
+  private final int totalHitsThreshold;
+  private final AtomicInteger globalTotalHits;
+  private final ReentrantLock lock;
+  private int numCollectors;
+
+  private final 
ConcurrentLinkedQueue 
mergeableCollectors;
+  private FieldValueHitQueue globalHitQueue;
+  private FieldValueHitQueue.Entry bottom;
+  // We do not make this Atomic since it will be sought under a lock
+  private int queueSlotCounter;
+  private final AtomicBoolean mergeStarted;
+  public final AtomicBoolean mergeCompleted;
+
+  public EarlyTerminatingFieldCollectorManager(Sort sort, int numHits, int 
totalHitsThreshold) {
+this.sort = sort;
+this.numHits = numHits;
+this.totalHitsThreshold = totalHitsThreshold;
+this.globalTotalHits = new AtomicInteger();
+this.lock = new ReentrantLock();
+this.mergeStarted = new AtomicBoolean();
+this.mergeCompleted = new AtomicBoolean();
+this.mergeableCollectors = new ConcurrentLinkedQueue();
+this.globalHitQueue = null;
+  }
+
+  @Override
+  public TopFieldCollector.EarlyTerminatingFieldCollector newCollector() {
+++numCollectors;
+
+return new TopFieldCollector.EarlyTerminatingFieldCollector(sort, 
FieldValueHitQueue.create(sort.fields, numHits), numHits,
+totalHitsThreshold, this, globalTotalHits);
+  }
+
+  @Override
+  public TopFieldDocs reduce(Collection collectors) {
+
+if (globalHitQueue == null) {
+  final TopFieldDocs[] topDocs = new TopFieldDocs[collectors.size()];
+  int i = 0;
+  for (TopFieldCollector collector : collectors) {
+topDocs[i++] = collector.topDocs();
+  }
+  return TopDocs.merge(sort, 0, numHits, topDocs);
+}
+
+ScoreDoc[] results = populateResults(globalHitQueue.size());
+
+return newTopDocs(results);
+  }
+
+  public int compareAndUpdateBottom(int docBase, int doc, Object value) {
+
+try {
+  lock.lock();
+
+  // If not enough hits are accumulated, add this hit to the global hit 
queue
+  if (globalHitQueue.size() < numHits) {
+FieldValueHitQueue.Entry newEntry = new 
FieldValueHitQueue.Entry(queueSlotCounter++, (doc + docBase), value);
+bottom = (FieldValueHitQueue.Entry) globalHitQueue.add(newEntry);
+return 1;
+  }
+
+  FieldComparator[] comparators = globalHitQueue.getComparators();
+  int[] reverseMul = globalHitQueue.getReverseMul();
+  Object bottomValues = bottom.values;
+  Object[] valuesArray;
+  Object[] bottomValuesArray;
+
+  if (comparators.length > 1) {
+assert value instanceof Object[];
+valuesArray = (Object[]) value;
+
+assert bottomValues instanceof Object[];
+bottomValuesArray = (Object[]) bottomValues;
+  } else {
+valuesArray = new Object[1];
+valuesArray[0] = value;
+
+bottomValuesArray = new Object[1];
+bottomValuesArray[0] = bottomValues;
+  }
+
+  int cmp;
 
 Review comment:
   Move the `int cmp` declaration down to where `cmp` is assigned?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go 

[GitHub] [lucene-solr] mikemccand commented on a change in pull request #854: Shared PQ Based Early Termination for Concurrent Search

2019-09-11 Thread GitBox
mikemccand commented on a change in pull request #854: Shared PQ Based Early 
Termination for Concurrent Search
URL: https://github.com/apache/lucene-solr/pull/854#discussion_r322909770
 
 

 ##
 File path: 
lucene/core/src/java/org/apache/lucene/search/EarlyTerminatingFieldCollectorManager.java
 ##
 @@ -0,0 +1,210 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.lucene.search;
+
+import java.util.Collection;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.ReentrantLock;
+
+import static org.apache.lucene.search.TopFieldCollector.EMPTY_SCOREDOCS;
+
+/**
+ * CollectorManager which allows early termination across multiple slices
+ * when the index sort key and the query sort key are the same
+ */
+public class EarlyTerminatingFieldCollectorManager implements 
CollectorManager {
+  private final Sort sort;
+  private final int numHits;
+  private final int totalHitsThreshold;
+  private final AtomicInteger globalTotalHits;
+  private final ReentrantLock lock;
+  private int numCollectors;
+
+  private final 
ConcurrentLinkedQueue 
mergeableCollectors;
+  private FieldValueHitQueue globalHitQueue;
+  private FieldValueHitQueue.Entry bottom;
+  // We do not make this Atomic since it will be sought under a lock
+  private int queueSlotCounter;
+  private final AtomicBoolean mergeStarted;
+  public final AtomicBoolean mergeCompleted;
+
+  public EarlyTerminatingFieldCollectorManager(Sort sort, int numHits, int 
totalHitsThreshold) {
+this.sort = sort;
+this.numHits = numHits;
+this.totalHitsThreshold = totalHitsThreshold;
+this.globalTotalHits = new AtomicInteger();
+this.lock = new ReentrantLock();
+this.mergeStarted = new AtomicBoolean();
+this.mergeCompleted = new AtomicBoolean();
+this.mergeableCollectors = new ConcurrentLinkedQueue();
+this.globalHitQueue = null;
+  }
+
+  @Override
+  public TopFieldCollector.EarlyTerminatingFieldCollector newCollector() {
+++numCollectors;
+
+return new TopFieldCollector.EarlyTerminatingFieldCollector(sort, 
FieldValueHitQueue.create(sort.fields, numHits), numHits,
+totalHitsThreshold, this, globalTotalHits);
+  }
+
+  @Override
+  public TopFieldDocs reduce(Collection collectors) {
+
+if (globalHitQueue == null) {
+  final TopFieldDocs[] topDocs = new TopFieldDocs[collectors.size()];
+  int i = 0;
+  for (TopFieldCollector collector : collectors) {
+topDocs[i++] = collector.topDocs();
+  }
+  return TopDocs.merge(sort, 0, numHits, topDocs);
+}
+
+ScoreDoc[] results = populateResults(globalHitQueue.size());
+
+return newTopDocs(results);
+  }
+
+  public int compareAndUpdateBottom(int docBase, int doc, Object value) {
+
+try {
+  lock.lock();
+
+  // If not enough hits are accumulated, add this hit to the global hit 
queue
+  if (globalHitQueue.size() < numHits) {
+FieldValueHitQueue.Entry newEntry = new 
FieldValueHitQueue.Entry(queueSlotCounter++, (doc + docBase), value);
+bottom = (FieldValueHitQueue.Entry) globalHitQueue.add(newEntry);
+return 1;
+  }
+
+  FieldComparator[] comparators = globalHitQueue.getComparators();
+  int[] reverseMul = globalHitQueue.getReverseMul();
+  Object bottomValues = bottom.values;
+  Object[] valuesArray;
+  Object[] bottomValuesArray;
+
+  if (comparators.length > 1) {
+assert value instanceof Object[];
+valuesArray = (Object[]) value;
+
+assert bottomValues instanceof Object[];
+bottomValuesArray = (Object[]) bottomValues;
+  } else {
+valuesArray = new Object[1];
+valuesArray[0] = value;
+
+bottomValuesArray = new Object[1];
 
 Review comment:
   Hmm, spooky we must allocate two `Object[]` per collected hit in this case 
...


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, 

[GitHub] [lucene-solr] mikemccand commented on a change in pull request #854: Shared PQ Based Early Termination for Concurrent Search

2019-09-11 Thread GitBox
mikemccand commented on a change in pull request #854: Shared PQ Based Early 
Termination for Concurrent Search
URL: https://github.com/apache/lucene-solr/pull/854#discussion_r322910645
 
 

 ##
 File path: 
lucene/core/src/java/org/apache/lucene/search/EarlyTerminatingFieldCollectorManager.java
 ##
 @@ -0,0 +1,210 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.lucene.search;
+
+import java.util.Collection;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.ReentrantLock;
+
+import static org.apache.lucene.search.TopFieldCollector.EMPTY_SCOREDOCS;
+
+/**
+ * CollectorManager which allows early termination across multiple slices
+ * when the index sort key and the query sort key are the same
+ */
+public class EarlyTerminatingFieldCollectorManager implements 
CollectorManager {
+  private final Sort sort;
+  private final int numHits;
+  private final int totalHitsThreshold;
+  private final AtomicInteger globalTotalHits;
+  private final ReentrantLock lock;
+  private int numCollectors;
+
+  private final 
ConcurrentLinkedQueue 
mergeableCollectors;
+  private FieldValueHitQueue globalHitQueue;
+  private FieldValueHitQueue.Entry bottom;
+  // We do not make this Atomic since it will be sought under a lock
+  private int queueSlotCounter;
+  private final AtomicBoolean mergeStarted;
+  public final AtomicBoolean mergeCompleted;
+
+  public EarlyTerminatingFieldCollectorManager(Sort sort, int numHits, int 
totalHitsThreshold) {
+this.sort = sort;
+this.numHits = numHits;
+this.totalHitsThreshold = totalHitsThreshold;
+this.globalTotalHits = new AtomicInteger();
+this.lock = new ReentrantLock();
+this.mergeStarted = new AtomicBoolean();
+this.mergeCompleted = new AtomicBoolean();
+this.mergeableCollectors = new ConcurrentLinkedQueue();
+this.globalHitQueue = null;
+  }
+
+  @Override
+  public TopFieldCollector.EarlyTerminatingFieldCollector newCollector() {
+++numCollectors;
+
+return new TopFieldCollector.EarlyTerminatingFieldCollector(sort, 
FieldValueHitQueue.create(sort.fields, numHits), numHits,
+totalHitsThreshold, this, globalTotalHits);
+  }
+
+  @Override
+  public TopFieldDocs reduce(Collection collectors) {
+
+if (globalHitQueue == null) {
+  final TopFieldDocs[] topDocs = new TopFieldDocs[collectors.size()];
+  int i = 0;
+  for (TopFieldCollector collector : collectors) {
+topDocs[i++] = collector.topDocs();
+  }
+  return TopDocs.merge(sort, 0, numHits, topDocs);
+}
+
+ScoreDoc[] results = populateResults(globalHitQueue.size());
+
+return newTopDocs(results);
+  }
+
+  public int compareAndUpdateBottom(int docBase, int doc, Object value) {
 
 Review comment:
   `private`?  And add javadoc explaining what this returns?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: dev-unsubscr...@lucene.apache.org
For additional commands, e-mail: dev-h...@lucene.apache.org