nfsantos commented on code in PR #2031: URL: https://github.com/apache/jackrabbit-oak/pull/2031#discussion_r1957784231
########## oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/writer/IndexWriterPool.java: ########## @@ -0,0 +1,407 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.jackrabbit.oak.plugins.index.lucene.writer; + +import org.apache.jackrabbit.guava.common.util.concurrent.ThreadFactoryBuilder; +import org.apache.jackrabbit.oak.commons.concurrent.ExecutorCloser; +import org.apache.jackrabbit.oak.plugins.index.ConfigHelper; +import org.apache.jackrabbit.oak.plugins.index.FormattingUtils; +import org.apache.lucene.index.IndexableField; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Objects; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.SynchronousQueue; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + + +public class IndexWriterPool { + private final static Logger LOG = LoggerFactory.getLogger(IndexWriterPool.class); + + public static final String OAK_INDEXER_PARALLEL_WRITER_MAX_BATCH_SIZE = "oak.indexer.parallelWriter.maxBatchSize"; + public static final int DEFAULT_OAK_INDEXER_PARALLEL_WRITER_MAX_BATCH_SIZE = 256; + + public static final String OAK_INDEXER_PARALLEL_WRITER_QUEUE_SIZE = "oak.indexer.parallelWriter.queueSize"; + public static final int DEFAULT_OAK_INDEXER_PARALLEL_WRITER_QUEUE_SIZE = 64; + + public static final String OAK_INDEXER_PARALLEL_WRITER_NUMBER_THREADS = "oak.indexer.parallelWriter.numberThreads"; + public static final int DEFAULT_OAK_INDEXER_PARALLEL_WRITER_NUMBER_THREADS = 4; + + private final int maxBatchSize = ConfigHelper.getSystemPropertyAsInt(OAK_INDEXER_PARALLEL_WRITER_MAX_BATCH_SIZE, DEFAULT_OAK_INDEXER_PARALLEL_WRITER_MAX_BATCH_SIZE); + private final int queueSize = ConfigHelper.getSystemPropertyAsInt(OAK_INDEXER_PARALLEL_WRITER_QUEUE_SIZE, DEFAULT_OAK_INDEXER_PARALLEL_WRITER_QUEUE_SIZE); + private final int numberOfThreads = ConfigHelper.getSystemPropertyAsInt(OAK_INDEXER_PARALLEL_WRITER_NUMBER_THREADS, DEFAULT_OAK_INDEXER_PARALLEL_WRITER_NUMBER_THREADS); + + // The batch of operations that will be sent to the workers. + // Batching individual operations reduces the overhead of synchronization and context switching. + private final ArrayList<Operation> batch = new ArrayList<>(maxBatchSize); + // Shared queue between producer and workers + private final BlockingQueue<OperationBatch> queue = new ArrayBlockingQueue<>(queueSize); + private final List<Worker> workers; + private final List<Future<?>> workerFutures; + private final ExecutorService writersPool; + // Used to schedule a task that periodically prints statistics + private final ScheduledExecutorService monitorTaskExecutor; + private final AtomicBoolean closed = new AtomicBoolean(false); + + // Used to keep track of the sequence number of the batches that are currently being processed. + // This is used to wait until all operations for a writer are processed before closing it. + private final Object pendingBatchesLock = new Object(); + private final HashSet<Long> pendingBatches = new HashSet<>(); + private long batchSequenceNumber = 0; + + // Statistics + private final long startTimeNanos = System.nanoTime(); + private long updateCount = 0; + private long deleteCount = 0; + private long totalEnqueueTimeNanos = 0; + + private static class OperationBatch { + final long sequenceNumber; + final Operation[] operations; + + public OperationBatch(long sequenceNumber, Operation[] operations) { + Objects.requireNonNull(operations); + this.sequenceNumber = sequenceNumber; + this.operations = operations; + } + } + + private static abstract class Operation { + final LuceneIndexWriter delegate; + + public Operation(LuceneIndexWriter delegate) { + this.delegate = delegate; + } + + abstract void execute() throws IOException; + } + + private static class UpdateOperation extends Operation { + private final String path; + private final Iterable<? extends IndexableField> doc; + + UpdateOperation(LuceneIndexWriter delegate, String path, Iterable<? extends IndexableField> doc) { + super(delegate); + this.path = path; + this.doc = doc; + } + + @Override + public void execute() throws IOException { + delegate.updateDocument(path, doc); + } + } + + private static class DeleteOperation extends Operation { + private final String path; + + DeleteOperation(LuceneIndexWriter delegate, String path) { + super(delegate); + this.path = path; + } + + @Override + public void execute() throws IOException { + delegate.deleteDocuments(path); + } + } + + private static class CloseResult { + // Either result or error is non-null. The two constructors enforce this invariant. + final Boolean result; + final Throwable error; + + CloseResult(boolean result) { + this.result = result; + this.error = null; + } + + CloseResult(Throwable error) { + this.result = null; + this.error = error; + } + + @Override + public String toString() { + return "CloseResult{" + + "result=" + result + + ", error=" + error + + '}'; + } + } + + private static class CloseWriterOperation extends Operation { + private final long timestamp; + private final SynchronousQueue<CloseResult> sync; + + /** + * The close operation should be synchronous and applied only after all the write operations for this writer + * are processed. + * + * @param sync A synchronous queue used to wait for the result of the close operation. + */ + CloseWriterOperation(LuceneIndexWriter delegate, long timestamp, SynchronousQueue<CloseResult> sync) { + super(delegate); + this.timestamp = timestamp; + this.sync = sync; + } + + @Override + public void execute() { + try { + try { + boolean closeResult = delegate.close(timestamp); + sync.put(new CloseResult(closeResult)); + } catch (IOException e) { + sync.put(new CloseResult(e)); + } + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + } + + // Sentinel used to terminate worker threads + final static OperationBatch SHUTDOWN = new OperationBatch(-1, new Operation[0]); + + private class Worker implements Runnable { + private final long startTime = System.nanoTime(); + private final int id; + private long opCount = 0; + private long batchesProcessed = 0; + private long totalBusyTime = 0; + + public Worker(int id) { + this.id = id; + } + + @Override + public void run() { + String oldName = Thread.currentThread().getName(); + Thread.currentThread().setName("index-writer-" + id); + try { + LOG.info("[{}] Worker started", id); + while (true) { + OperationBatch op = queue.take(); + if (op == SHUTDOWN) { + queue.add(SHUTDOWN); + LOG.info("[{}] Shutting down worker", id); + printStatistics(); + return; + } + long start = System.nanoTime(); + for (Operation operation : op.operations) { + operation.execute(); + opCount++; + } + batchesProcessed++; + long durationNanos = System.nanoTime() - start; + totalBusyTime += durationNanos; + long durationMillis = durationNanos / 1_000_000; + if (durationMillis > 1000) { + LOG.info("[{}] Processing batch {} of size {} took {} millis.", + id, op.sequenceNumber, op.operations.length, durationMillis); + } + synchronized (pendingBatchesLock) { + pendingBatches.remove(op.sequenceNumber); + pendingBatchesLock.notifyAll(); + } + } + } catch (InterruptedException | IOException e) { Review Comment: Yes, the message was not correct for the case of an IOException. I removed the IOException from the handler of the InterruptedException and left only the catch Throwable afterwards to deal with all other errors. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
