fabriziofortino commented on code in PR #2183:
URL: https://github.com/apache/jackrabbit-oak/pull/2183#discussion_r2002676564


##########
oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/index/ElasticBulkProcessorHandler.java:
##########
@@ -192,72 +258,222 @@ public void update(String id, ElasticDocument document) 
throws IOException {
             }
 
             // Add the update operation with the script
-            add(BulkOperation.of(op -> op.update(uf -> 
uf.index(indexName).id(id)
-                    .action(uaf -> uaf.script(s -> 
s.source(script.toString()).params("document", JsonData.of(document)))
-                            .upsert(document)))), id);
+            add(BulkOperation.of(op -> op.update(uf ->
+                            uf.index(indexName).id(id).action(uaf ->
+                                    uaf.script(s -> 
s.source(script.toString()).params("document", JsonData.of(document)))
+                                            .upsert(document)))),
+                    context);
         }
     }
 
-    public void delete(String id) throws IOException {
-        add(BulkOperation.of(op -> op.delete(idx -> 
idx.index(indexName).id(id))), id);
-    }
-
-    private void add(BulkOperation operation, String context) throws 
IOException {
-        // fail fast: we don't want to wait until the processor gets closed to 
fail
-        checkFailures();
-        bulkIngester.add(operation, context);
-        totalOperations++;
+    public void delete(String indexName, String id) throws IOException {
+        checkOpen();
+        IndexInfo indexInfo = getIndexInfoOrFail(indexName);
+        indexInfo.deleteOperations++;
+        add(BulkOperation.of(op -> op.delete(idx -> 
idx.index(indexName).id(id))), new OperationContext(indexInfo, id));
     }
 
     /**
-     * Closes the bulk ingester and waits for all the bulk requests to return.
+     * Closes an index. The underlying bulk ingestor will be flushed, to 
ensure that all pending operations for this
+     * index are sent to the server. If this index was registered with 
waitForESAcknowledgement set to true, then this
+     * method will wait until we receive an acknowledgement from the server 
for all the operations up to when this
+     * method was called.
+     * <p>
+     * Note: Closing an index will have the side-effect of flushing all 
pending operations for all indexes registered
+     * with the bulk processor. This should be transparent for the user, but 
it may mean that this method would take
+     * longer to return than if it was flusing only the operations for the 
index being closed.

Review Comment:
   typo
   ```suggestion
        * longer to return than if it was flushing only the operations for the 
index being closed.
   ```



##########
oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/index/ElasticBulkProcessorHandler.java:
##########
@@ -38,148 +40,212 @@
 import java.io.IOException;
 import java.io.PrintWriter;
 import java.io.StringWriter;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
 import java.util.LinkedHashSet;
 import java.util.List;
-import java.util.Set;
+import java.util.OptionalLong;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.Phaser;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
 import java.util.stream.Collectors;
 
-class ElasticBulkProcessorHandler {
+public class ElasticBulkProcessorHandler {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(ElasticBulkProcessorHandler.class);
-    private static final int FAILED_DOC_COUNT_FOR_STATUS_NODE = 
Integer.getInteger("oak.failedDocStatusLimit", 10000);
-
-    private static final int BULK_PROCESSOR_CONCURRENCY =
-            Integer.getInteger("oak.indexer.elastic.bulkProcessorConcurrency", 
1);
-    private static final String SYNC_MODE_PROPERTY = "sync-mode";
-    private static final String SYNC_RT_MODE = "rt";
-
-    protected final ElasticConnection elasticConnection;
-    protected final String indexName;
-    protected final ElasticIndexDefinition indexDefinition;
-    private final NodeBuilder definitionBuilder;
-    protected final BulkIngester<String> bulkIngester;
-    private final boolean waitForESAcknowledgement;
 
     /**
-     * Coordinates communication between bulk processes. It has a main 
controller registered at creation time and
-     * de-registered on {@link ElasticIndexWriter#close(long)}. Each bulk 
request register a new party in
-     * this Phaser in {@link OakBulkListener#beforeBulk(long, BulkRequest, 
List)} and de-register itself when
-     * the request returns.
+     * Keeps information about an index that is being written by the bulk 
processor
      */
-    private final Phaser phaser = new Phaser(1); // register main controller
-
-    /**
-     * Exceptions occurred while trying to update index in elasticsearch
-     */
-    private final ConcurrentLinkedQueue<ErrorCause> suppressedErrorCauses = 
new ConcurrentLinkedQueue<>();
+    static class IndexInfo {
+        public final String indexName;
+        public final ElasticIndexDefinition indexDefinition;
+        public final NodeBuilder definitionBuilder;
+        public final boolean waitForESAcknowledgement;
+        public final boolean isRealTime;
+        /**
+         * Exceptions occurred while trying to update index in elasticsearch
+         */
+        public final ConcurrentLinkedQueue<ErrorCause> suppressedErrorCauses = 
new ConcurrentLinkedQueue<>();
+
+        long indexOperations = 0;
+        long deleteOperations = 0;
+        long updateOperations = 0;
+        boolean indexModified = false;
+
+        IndexInfo(String indexName, ElasticIndexDefinition indexDefinition, 
NodeBuilder definitionBuilder, boolean waitForESAcknowledgement, boolean 
isRealTime) {
+            this.indexName = indexName;
+            this.indexDefinition = indexDefinition;
+            this.definitionBuilder = definitionBuilder;
+            this.waitForESAcknowledgement = waitForESAcknowledgement;
+            this.isRealTime = isRealTime;
+        }
+    }
 
     /**
-     * Key-value structure to keep the history of bulk requests. Keys are the 
bulk execution ids, the boolean
-     * value is {@code true} when at least an update is performed, otherwise 
{@code false}.
+     * Context object associated with each operation passed to the bulk 
processor
      */
-    private final ConcurrentHashMap<Long, Boolean> updatesMap = new 
ConcurrentHashMap<>();
+    public final static class OperationContext {
+        final IndexInfo indexInfo;
+        final String documentId;
 
-    protected long totalOperations;
+        OperationContext(IndexInfo indexInfo, String documentId) {
+            this.indexInfo = indexInfo;
+            this.documentId = documentId;
+        }
 
-    private ElasticBulkProcessorHandler(@NotNull ElasticConnection 
elasticConnection,
-                                        @NotNull String indexName,
-                                        @NotNull ElasticIndexDefinition 
indexDefinition,
-                                        @NotNull NodeBuilder definitionBuilder,
-                                        boolean waitForESAcknowledgement) {
-        this.elasticConnection = elasticConnection;
-        this.indexName = indexName;
-        this.indexDefinition = indexDefinition;
-        this.definitionBuilder = definitionBuilder;
-        this.waitForESAcknowledgement = waitForESAcknowledgement;
-        this.bulkIngester = initBulkIngester();
+        @Override
+        public String toString() {
+            return "OperationContext{" +
+                    "indexInfo=" + indexInfo.indexName +
+                    ", documentId='" + documentId + '\'' +
+                    '}';
+        }
     }
 
-    /**
-     * Returns an ElasticBulkProcessorHandler instance based on the index 
definition configuration.
-     * <p>
-     * The `sync-mode` property can be set to `rt` (real-time). In this case 
the returned handler will be real-time.
-     * This option is available for sync index definitions only.
-     */
-    public static ElasticBulkProcessorHandler getBulkProcessorHandler(@NotNull 
ElasticConnection elasticConnection,
-                                                                      @NotNull 
String indexName,
-                                                                      @NotNull 
ElasticIndexDefinition indexDefinition,
-                                                                      @NotNull 
NodeBuilder definitionBuilder, CommitInfo commitInfo,
-                                                                      boolean 
waitForESAcknowledgement) {
-        PropertyState async = 
indexDefinition.getDefinitionNodeState().getProperty("async");
-
-        if (async != null) {
-            return new ElasticBulkProcessorHandler(elasticConnection, 
indexName, indexDefinition, definitionBuilder, waitForESAcknowledgement);
-        }
+    public static final String BULK_ACTIONS_PROP = 
"oak.indexer.elastic.bulkProcessor.maxBulkOperations";
+    public static final int BULK_ACTIONS_DEFAULT = 8192;
+    public static final String BULK_SIZE_BYTES_PROP = 
"oak.indexer.elastic.bulkProcessor.maxBulkSizeBytes";
+    public static final int BULK_SIZE_BYTES_DEFAULT = 8 * 1024 * 1024; // 8MB
+    public static final String BULK_FLUSH_INTERVAL_MS_PROP = 
"oak.indexer.elastic.bulkProcessor.bulkFlushIntervalMs";
+    public static final int BULK_FLUSH_INTERVAL_MS_DEFAULT = 3000;
+    public static final String BULK_MAX_CONCURRENT_REQUESTS_PROP = 
"oak.indexer.elastic.bulkProcessor.maxConcurrentRequests";
+    private static final int BULK_MAX_CONCURRENT_REQUESTS_DEFAULT = 1;
+    // when true, fails indexing in case of bulk failures
+    public static final String FAIL_ON_ERROR_PROP = 
"oak.indexer.elastic.bulkProcessor.failOnError";
+    public static final boolean FAIL_ON_ERROR_DEFAULT = true;
 
-        // commit-info has priority over configuration in index definition
-        String syncMode = null;
-        if (commitInfo != null) {
-            syncMode = (String) commitInfo.getInfo().get(SYNC_MODE_PROPERTY);
-        }
+    private static final String SYNC_MODE_PROPERTY = "sync-mode";
+    private static final String SYNC_RT_MODE = "rt";
+    private static final int MAX_SUPPRESSED_ERROR_CAUSES = 50;
 
-        if (syncMode == null) {
-            PropertyState syncModeProp = 
indexDefinition.getDefinitionNodeState().getProperty("sync-mode");
-            if (syncModeProp != null) {
-                syncMode = syncModeProp.getValue(Type.STRING);
-            }
-        }
+    private final int FAILED_DOC_COUNT_FOR_STATUS_NODE = 
ConfigHelper.getSystemPropertyAsInt("oak.failedDocStatusLimit", 10000);
+    private final int BULK_MAX_OPERATIONS = 
ConfigHelper.getSystemPropertyAsInt(BULK_ACTIONS_PROP, BULK_ACTIONS_DEFAULT);
+    private final int BULK_MAX_SIZE_BYTES = 
ConfigHelper.getSystemPropertyAsInt(BULK_SIZE_BYTES_PROP, 
BULK_SIZE_BYTES_DEFAULT);
+    private final int BULK_FLUSH_INTERVAL_MS = 
ConfigHelper.getSystemPropertyAsInt(BULK_FLUSH_INTERVAL_MS_PROP, 
BULK_FLUSH_INTERVAL_MS_DEFAULT);
+    private final int BULK_MAX_CONCURRENT_REQUESTS = 
ConfigHelper.getSystemPropertyAsInt(BULK_MAX_CONCURRENT_REQUESTS_PROP, 
BULK_MAX_CONCURRENT_REQUESTS_DEFAULT);
+    private final boolean FAIL_ON_ERROR = 
ConfigHelper.getSystemPropertyAsBoolean(FAIL_ON_ERROR_PROP, 
FAIL_ON_ERROR_DEFAULT);
 
-        if (SYNC_RT_MODE.equals(syncMode)) {
-            return new RealTimeBulkProcessorHandler(elasticConnection, 
indexName, indexDefinition, definitionBuilder, waitForESAcknowledgement);
-        }
+    private final ElasticConnection elasticConnection;
+    private final BulkIngester<OperationContext> bulkIngester;
 
-        return new ElasticBulkProcessorHandler(elasticConnection, indexName, 
indexDefinition, definitionBuilder, waitForESAcknowledgement);
-    }
+    // Used to keep track of the sequence number of the batches that are 
currently being processed.
+    // This is used to wait until all operations for a writer are processed 
before closing it.
+    private final ReentrantLock lock = new ReentrantLock();
+    private final Condition bulkProcessedCondition = lock.newCondition();
+    private final HashSet<Long> pendingBulks = new HashSet<>();
+
+    private final AtomicBoolean closed = new AtomicBoolean(false);
+    private final ConcurrentHashMap<String, IndexInfo> registeredIndexes = new 
ConcurrentHashMap<>();
+    private final ConcurrentLinkedQueue<ErrorCause> 
globalSuppressedErrorCauses = new ConcurrentLinkedQueue<>();
 
-    private BulkIngester<String> initBulkIngester() {
+    // Time blocked waiting to add operations to the bulk processor.
+    private final long startTime = System.nanoTime();
+    private long totalWaitTimeNanos = 0;
+
+    public ElasticBulkProcessorHandler(@NotNull ElasticConnection 
elasticConnection) {
+        this.elasticConnection = elasticConnection;
         // BulkIngester does not support retry policies. Some retries though 
are already implemented in the transport layer.
         // More details here: 
https://github.com/elastic/elasticsearch-java/issues/478
-        return BulkIngester.of(b -> {
+        LOG.info("Creating bulk ingester [maxActions: {}, maxSizeBytes: {} 
flushInterval {}, concurrency {}]",
+                BULK_MAX_OPERATIONS, BULK_MAX_SIZE_BYTES, 
BULK_FLUSH_INTERVAL_MS, BULK_MAX_CONCURRENT_REQUESTS_PROP);
+        this.bulkIngester = BulkIngester.of(b -> {
             b = b.client(elasticConnection.getAsyncClient())
                     .listener(new OakBulkListener());
-            if (indexDefinition.bulkActions > 0) {
-                b = b.maxOperations(indexDefinition.bulkActions);
+            if (BULK_MAX_OPERATIONS > 0) {
+                b = b.maxOperations(BULK_MAX_OPERATIONS);
             }
-            if (indexDefinition.bulkSizeBytes > 0) {
-                b = b.maxSize(indexDefinition.bulkSizeBytes);
+            if (BULK_MAX_SIZE_BYTES > 0) {
+                b = b.maxSize(BULK_MAX_SIZE_BYTES);
             }
-            if (indexDefinition.bulkFlushIntervalMs > 0) {
-                b = b.flushInterval(indexDefinition.bulkFlushIntervalMs, 
TimeUnit.MILLISECONDS);
+            if (BULK_FLUSH_INTERVAL_MS > 0) {
+                b = b.flushInterval(BULK_FLUSH_INTERVAL_MS, 
TimeUnit.MILLISECONDS);
             }
-
-            return b.maxConcurrentRequests(BULK_PROCESSOR_CONCURRENCY);
+            if (BULK_MAX_CONCURRENT_REQUESTS > 0) {
+                b = b.maxConcurrentRequests(BULK_MAX_CONCURRENT_REQUESTS);
+            }
+            return b;
         });
     }
 
-    private void checkFailures() throws IOException {
-        if (!suppressedErrorCauses.isEmpty()) {
-            IOException ioe = new IOException("Exception while indexing. See 
suppressed for details");
-            suppressedErrorCauses.stream().map(ec -> new 
IllegalStateException(ec.reason())).forEach(ioe::addSuppressed);
-            throw ioe;
+    /**
+     * Registers an ElasticIndex with the given index definition configuration.
+     * <p>
+     * The `sync-mode` property can be set to `rt` (real-time). In this case 
the returned handler will be real-time.
+     * This option is available for sync index definitions only.
+     */
+    public void registerIndex(String indexName, ElasticIndexDefinition 
indexDefinition, NodeBuilder definitionBuilder, CommitInfo commitInfo, boolean 
waitForESAcknowledgement) {
+        checkOpen();
+        if (registeredIndexes.containsKey(indexName)) {
+            LOG.warn("Index already registered: {}", indexName);

Review Comment:
   what happens when an existing index definition gets updated? Would the 
writer try to register it again? In that case we might miss changes in the 
index definition. In the IndexInfo we also store `NodeBuilder 
definitionBuilder`, would this be stale?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: oak-dev-unsubscr...@jackrabbit.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to