>From Wail Alkowaileet <[email protected]>: Wail Alkowaileet has submitted this change. ( https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/18144 )
Change subject: [ASTERIXDB-3346][MTD] Fix GlobalResourceIdFactory race cond. ...................................................................... [ASTERIXDB-3346][MTD] Fix GlobalResourceIdFactory race cond. - user model changes: no - storage format changes: no - interface changes: no Details: Creating a new ID can wait in indefinitely due to a race condition (see ASTERIXDB-3346) for more details. Change-Id: Ic7ff15abbc70277a9f1ae340314335253aa23308 Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/18144 Reviewed-by: Wail Alkowaileet <[email protected]> Reviewed-by: Ali Alsuliman <[email protected]> Tested-by: Jenkins <[email protected]> Integration-Tests: Wail Alkowaileet <[email protected]> --- M asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/transaction/GlobalResourceIdFactory.java 1 file changed, 164 insertions(+), 51 deletions(-) Approvals: Wail Alkowaileet: Looks good to me, but someone else must approve; Verified Ali Alsuliman: Looks good to me, approved Jenkins: Verified Anon. E. Moose #1000171: diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/transaction/GlobalResourceIdFactory.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/transaction/GlobalResourceIdFactory.java index 30877d9..8682937 100644 --- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/transaction/GlobalResourceIdFactory.java +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/transaction/GlobalResourceIdFactory.java @@ -18,9 +18,12 @@ */ package org.apache.asterix.runtime.transaction; -import java.util.NoSuchElementException; -import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; +import org.apache.asterix.common.exceptions.ErrorCode; +import org.apache.asterix.common.exceptions.RuntimeDataException; import org.apache.asterix.common.messaging.api.INCMessageBroker; import org.apache.asterix.runtime.message.ResourceIdRequestMessage; import org.apache.asterix.runtime.message.ResourceIdRequestResponseMessage; @@ -32,7 +35,6 @@ import it.unimi.dsi.fastutil.longs.LongArrayFIFOQueue; import it.unimi.dsi.fastutil.longs.LongPriorityQueue; -import it.unimi.dsi.fastutil.longs.LongPriorityQueues; /** * A resource id factory that generates unique resource ids across all NCs by requesting @@ -41,23 +43,44 @@ public class GlobalResourceIdFactory implements IResourceIdFactory { private static final Logger LOGGER = LogManager.getLogger(); + private static final long INVALID_ID = -1L; + /** + * Maximum number of attempts to request a new block of IDs + */ + private static final int MAX_NUMBER_OF_ATTEMPTS = 3; + /** + * Time threshold to consider a block request was lost + */ + private static final long WAIT_FOR_REQUEST_TIME_THRESHOLD_NS = TimeUnit.SECONDS.toNanos(2); + /** + * Wait time by threads waiting for the response with the new block + */ + private static final long WAIT_FOR_BLOCK_ID_TIME_MS = TimeUnit.SECONDS.toMillis(2); private final INCServiceContext serviceCtx; private final LongPriorityQueue resourceIds; - private final LinkedBlockingQueue<ResourceIdRequestResponseMessage> resourceIdResponseQ; private final String nodeId; private final int initialBlockSize; private final int maxBlockSize; + /** + * Current number of failed block requests + */ + private final AtomicInteger numberOfFailedRequests; + /** + * Last time a request of a block is initiated + */ + private final AtomicLong requestTime; private int currentBlockSize; private volatile boolean reset = false; public GlobalResourceIdFactory(INCServiceContext serviceCtx, int initialBlockSize) { this.serviceCtx = serviceCtx; - this.resourceIdResponseQ = new LinkedBlockingQueue<>(); this.nodeId = serviceCtx.getNodeId(); this.initialBlockSize = initialBlockSize; maxBlockSize = initialBlockSize * 2; currentBlockSize = initialBlockSize; - resourceIds = LongPriorityQueues.synchronize(new LongArrayFIFOQueue(initialBlockSize)); + resourceIds = new LongArrayFIFOQueue(initialBlockSize); + numberOfFailedRequests = new AtomicInteger(); + requestTime = new AtomicLong(); } public synchronized void addNewIds(ResourceIdRequestResponseMessage resourceIdResponse) @@ -70,52 +93,23 @@ currentBlockSize); return; } - resourceIdResponseQ.put(resourceIdResponse); + populateIDs(resourceIdResponse); } @Override public long createId() throws HyracksDataException { - synchronized (resourceIds) { - if (reset) { - resourceIds.clear(); - resourceIdResponseQ.clear(); - reset = false; - } + // Rest IDs if requested to reset + resetIDsIfNeeded(); + // Get a new ID if possible or request a new block + long id = getID(); + while (id == INVALID_ID) { + // All IDs in the previous block were consumed, wait for the new block + waitForID(); + // Retry getting a new ID again + id = getID(); } - try { - final long resourceId = resourceIds.dequeueLong(); - if (resourceIds.isEmpty()) { - serviceCtx.getControllerService().getExecutor().submit(() -> { - try { - requestNewBlock(); - } catch (Exception e) { - LOGGER.warn("failed on preemptive block request", e); - } - }); - } - return resourceId; - } catch (NoSuchElementException e) { - // fallthrough - } - try { - // if there already exists a response, use it - ResourceIdRequestResponseMessage response = resourceIdResponseQ.poll(); - if (response == null) { - requestNewBlock(); - response = resourceIdResponseQ.take(); - } - if (response.getException() != null) { - throw HyracksDataException.create(response.getException()); - } - // take the first id, queue the rest - final long startingId = response.getResourceId(); - for (int i = 1; i < response.getBlockSize(); i++) { - resourceIds.enqueue(startingId + i); - } - return startingId; - } catch (Exception e) { - throw HyracksDataException.create(e); - } + + return id; } @Override @@ -128,9 +122,106 @@ LOGGER.debug("current resource ids block size: {}", currentBlockSize); } - protected synchronized void requestNewBlock() throws Exception { - // queue is empty; request a new block - ResourceIdRequestMessage msg = new ResourceIdRequestMessage(nodeId, currentBlockSize); - ((INCMessageBroker) serviceCtx.getMessageBroker()).sendMessageToPrimaryCC(msg); + private void populateIDs(ResourceIdRequestResponseMessage response) { + synchronized (resourceIds) { + long startingId = response.getResourceId(); + for (int i = 0; i < response.getBlockSize(); i++) { + resourceIds.enqueue(startingId + i); + } + // Notify all waiting threads that a new block of IDs was acquired + resourceIds.notifyAll(); + } + } + + private void resetIDsIfNeeded() throws HyracksDataException { + synchronized (resourceIds) { + if (reset) { + resourceIds.clear(); + reset = false; + // Request the initial block + requestNewBlock(); + } + } + } + + private long getID() throws HyracksDataException { + long id = INVALID_ID; + // Record the time of which getID was called + long time = System.nanoTime(); + int size; + synchronized (resourceIds) { + size = resourceIds.size(); + if (size > 0) { + id = resourceIds.dequeueLong(); + } + } + if (size == 1 || size == 0 && shouldRequestNewBlock(time)) { + // The last ID was taken. Preemptively request a new block. + // Or the last request failed, retry + // Or waiting time for the response exceeded the maximum waiting time threshold + requestNewBlock(); + } + + return id; + } + + private void waitForID() throws HyracksDataException { + long time = System.nanoTime(); + try { + synchronized (resourceIds) { + while (resourceIds.isEmpty() && !shouldRequestNewBlock(time)) { + resourceIds.wait(WAIT_FOR_BLOCK_ID_TIME_MS); + time = System.nanoTime(); + } + } + } catch (Exception e) { + throw HyracksDataException.create(e); + } + } + + private boolean shouldRequestNewBlock(long time) { + int failures = numberOfFailedRequests.get(); + long timeDiff = time - requestTime.get(); + if (failures > 0 || timeDiff >= WAIT_FOR_REQUEST_TIME_THRESHOLD_NS) { + long thresholdSec = TimeUnit.NANOSECONDS.toSeconds(WAIT_FOR_REQUEST_TIME_THRESHOLD_NS); + long timeDiffSec = TimeUnit.NANOSECONDS.toSeconds(timeDiff); + LOGGER.warn( + "Preemptive requests are either failed or lost " + + "(failures:{}, number-of-failures-threshold: {})," + + " (time-since-last-request: {}s, time-threshold: {}s)", + failures, MAX_NUMBER_OF_ATTEMPTS, timeDiffSec, thresholdSec); + return true; + } + return false; + } + + private synchronized void requestNewBlock() throws HyracksDataException { + int attempts = numberOfFailedRequests.get(); + if (attempts >= MAX_NUMBER_OF_ATTEMPTS) { + synchronized (resourceIds) { + // Notify all waiting threads so they can fail as well + resourceIds.notifyAll(); + } + throw new RuntimeDataException(ErrorCode.ILLEGAL_STATE, "New block request was attempted (" + attempts + + " times) - exceeding the maximum number of allowed retries. See the logs for more information."); + } + + requestTime.set(System.nanoTime()); + serviceCtx.getControllerService().getExecutor().submit(() -> { + try { + ResourceIdRequestMessage msg = new ResourceIdRequestMessage(nodeId, currentBlockSize); + ((INCMessageBroker) serviceCtx.getMessageBroker()).sendMessageToPrimaryCC(msg); + // Reset the number failures + numberOfFailedRequests.set(0); + } catch (Exception e) { + LOGGER.warn("failed to request a new block", e); + // Increment the number of failures + numberOfFailedRequests.incrementAndGet(); + synchronized (resourceIds) { + // Notify a waiting thread (if any) to request a new block + resourceIds.notify(); + } + } + }); } } -- To view, visit https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/18144 To unsubscribe, or for help writing mail filters, visit https://asterix-gerrit.ics.uci.edu/settings Gerrit-Project: asterixdb Gerrit-Branch: master Gerrit-Change-Id: Ic7ff15abbc70277a9f1ae340314335253aa23308 Gerrit-Change-Number: 18144 Gerrit-PatchSet: 6 Gerrit-Owner: Wail Alkowaileet <[email protected]> Gerrit-Reviewer: Ali Alsuliman <[email protected]> Gerrit-Reviewer: Anon. E. Moose #1000171 Gerrit-Reviewer: Jenkins <[email protected]> Gerrit-Reviewer: Michael Blow <[email protected]> Gerrit-Reviewer: Murtadha Hubail <[email protected]> Gerrit-Reviewer: Wail Alkowaileet <[email protected]> Gerrit-MessageType: merged
