Michael Blow has uploaded a new change for review. https://asterix-gerrit.ics.uci.edu/1677
Change subject: ASTERIXDB-1883: FeedRuntimeInputHandler issues ...................................................................... ASTERIXDB-1883: FeedRuntimeInputHandler issues Recent commit https://asterix-gerrit.ics.uci.edu/#/c/1591/ includes a number of new issues in FeedRuntimeInputHandler: - hangs caused by race condition with mutex & inbox on close (observed on Jenkins) - CPU spin on disk spilling on empty inbox - The writer is not flushed in as many cases as before Change-Id: I7e091f65eb5f3a76277803b3197d490d3ef2fc04 --- M asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FeedRuntimeInputHandler.java 1 file changed, 70 insertions(+), 132 deletions(-) git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb refs/changes/77/1677/1 diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FeedRuntimeInputHandler.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FeedRuntimeInputHandler.java index 329451d..2c549ad 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FeedRuntimeInputHandler.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FeedRuntimeInputHandler.java @@ -19,6 +19,7 @@ package org.apache.asterix.external.feed.dataflow; import java.nio.ByteBuffer; +import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; import java.util.logging.Level; import java.util.logging.Logger; @@ -51,7 +52,9 @@ private static final Logger LOGGER = Logger.getLogger(FeedRuntimeInputHandler.class.getName()); private static final double MAX_SPILL_USED_BEFORE_RESUME = 0.8; private static final boolean DEBUG = false; - private final Object mutex = new Object(); + private static final ByteBuffer POISON_PILL = ByteBuffer.allocate(0); + private static final ByteBuffer SPILLED = ByteBuffer.allocate(0); + private final FeedExceptionHandler exceptionHandler; private final FrameSpiller spiller; private final FeedPolicyAccessor fpa; @@ -59,7 +62,7 @@ private final int initialFrameSize; private final FrameTransporter consumer; private final Thread consumerThread; - private final LinkedBlockingQueue<ByteBuffer> inbox; + private final BlockingQueue<ByteBuffer> inbox; private final ConcurrentFramePool framePool; private Mode mode = Mode.PROCESS; private int total = 0; @@ -110,28 +113,23 @@ // If we use nextframe, chances are this frame will also be // flushed into the spilled file. This causes problem when trying to // read the frame and the size info is lost. - inbox.put(ByteBuffer.allocate(0)); - synchronized (mutex) { - if (DEBUG) { - LOGGER.info("Producer is waking up consumer"); - } - mutex.notify(); - } + inbox.put(POISON_PILL); consumerThread.join(); } catch (InterruptedException e) { - LOGGER.log(Level.WARNING, e.getMessage(), e); + LOGGER.log(Level.WARNING, "interrupted", e); + Thread.currentThread().interrupt(); } try { framePool.release(inbox); } catch (Throwable th) { - LOGGER.log(Level.WARNING, th.getMessage(), th); + LOGGER.log(Level.WARNING, "exception releasing buffers", th); } try { if (spiller != null) { spiller.close(); } } catch (Throwable th) { - LOGGER.log(Level.WARNING, th.getMessage(), th); + LOGGER.log(Level.WARNING, "exception closing spiller", th); } finally { writer.close(); } @@ -163,8 +161,11 @@ } break; } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw HyracksDataException.create(e); } catch (Throwable th) { - throw new HyracksDataException(th); + throw HyracksDataException.create(th); } } @@ -182,7 +183,7 @@ } } - private void discard(ByteBuffer frame) throws HyracksDataException { + private void discard(ByteBuffer frame) throws HyracksDataException, InterruptedException { if (DEBUG) { LOGGER.info("starting discard(frame)"); } @@ -206,7 +207,7 @@ } numProcessedInMemory++; next.put(frame); - inbox.offer(next); + inbox.put(next); mode = Mode.PROCESS; return; } @@ -224,7 +225,7 @@ } } - private void exitProcessState(ByteBuffer frame) throws HyracksDataException { + private void exitProcessState(ByteBuffer frame) throws HyracksDataException, InterruptedException { if (fpa.spillToDiskOnCongestion()) { mode = Mode.SPILL; spiller.open(); @@ -237,7 +238,7 @@ } } - private void discardOrStall(ByteBuffer frame) throws HyracksDataException { + private void discardOrStall(ByteBuffer frame) throws HyracksDataException, InterruptedException { if (fpa.discardOnCongestion()) { mode = Mode.DISCARD; discard(frame); @@ -249,48 +250,33 @@ } } - private void stall(ByteBuffer frame) throws HyracksDataException { - try { + private void stall(ByteBuffer frame) throws HyracksDataException, InterruptedException { + if (DEBUG) { + LOGGER.info("in stall(frame). So far, I have stalled " + numStalled); + } + numStalled++; + // If spilling is enabled, we wait on the spiller + if (fpa.spillToDiskOnCongestion()) { if (DEBUG) { - LOGGER.info("in stall(frame). So far, I have stalled " + numStalled); + LOGGER.info("in stall(frame). Spilling is enabled so we will attempt to spill"); } - numStalled++; - // If spilling is enabled, we wait on the spiller - if (fpa.spillToDiskOnCongestion()) { - if (DEBUG) { - LOGGER.info("in stall(frame). Spilling is enabled so we will attempt to spill"); - } - waitforSpillSpace(); - spiller.spill(frame); - numSpilled++; - synchronized (mutex) { - if (DEBUG) { - LOGGER.info("Producer is waking up consumer"); - } - mutex.notify(); - } - return; - } - if (DEBUG) { - LOGGER.info("in stall(frame). Spilling is disabled. We will subscribe to frame pool"); - } - // Spilling is disabled, we subscribe to feedMemoryManager - frameAction.setFrame(frame); - framePool.subscribe(frameAction); - ByteBuffer temp = frameAction.retrieve(); - inbox.put(temp); - numProcessedInMemory++; - if (DEBUG) { - LOGGER.info("stall(frame) has been completed. Notifying the consumer that a frame is ready"); - } - synchronized (mutex) { - if (DEBUG) { - LOGGER.info("Producer is waking up consumer"); - } - mutex.notify(); - } - } catch (InterruptedException e) { - throw new HyracksDataException(e); + waitforSpillSpace(); + spiller.spill(frame); + numSpilled++; + inbox.put(SPILLED); + return; + } + if (DEBUG) { + LOGGER.info("in stall(frame). Spilling is disabled. We will subscribe to frame pool"); + } + // Spilling is disabled, we subscribe to feedMemoryManager + frameAction.setFrame(frame); + framePool.subscribe(frameAction); + ByteBuffer temp = frameAction.retrieve(); + inbox.put(temp); + numProcessedInMemory++; + if (DEBUG) { + LOGGER.info("stall(frame) has been completed. Notifying the consumer that a frame is ready"); } } @@ -307,19 +293,14 @@ } } - private void process(ByteBuffer frame) throws HyracksDataException { + private void process(ByteBuffer frame) throws HyracksDataException, InterruptedException { // Get a page from frame pool ByteBuffer next = (frame.capacity() <= framePool.getMaxFrameSize()) ? getFreeBuffer(frame.capacity()) : null; if (next != null) { // Got a page from memory pool numProcessedInMemory++; next.put(frame); - try { - inbox.put(next); - notifyMemoryConsumer(); - } catch (InterruptedException e) { - throw new HyracksDataException(e); - } + inbox.put(next); } else { if (DEBUG) { LOGGER.info("Couldn't allocate memory --> exitProcessState(frame)"); @@ -329,46 +310,29 @@ } } - private void notifyMemoryConsumer() { - if (inbox.size() == 1) { - synchronized (mutex) { - if (DEBUG) { - LOGGER.info("Producer is waking up consumer"); - } - mutex.notify(); - } - } - } - - private void spill(ByteBuffer frame) throws HyracksDataException { + private void spill(ByteBuffer frame) throws HyracksDataException, InterruptedException { if (spiller.switchToMemory()) { - synchronized (mutex) { - // Check if there is memory - ByteBuffer next = null; - if (frame.capacity() <= framePool.getMaxFrameSize()) { - next = getFreeBuffer(frame.capacity()); - } - if (next != null) { - spiller.close(); - numProcessedInMemory++; - next.put(frame); - inbox.offer(next); - notifyMemoryConsumer(); - mode = Mode.PROCESS; - } else { - // spill. This will always succeed since spilled = 0 (TODO must verify that budget can't be 0) - spiller.spill(frame); - numSpilled++; - if (DEBUG) { - LOGGER.info("Producer is waking up consumer"); - } - mutex.notify(); - } + // Check if there is memory + ByteBuffer next = null; + if (frame.capacity() <= framePool.getMaxFrameSize()) { + next = getFreeBuffer(frame.capacity()); + } + if (next != null) { + spiller.close(); + numProcessedInMemory++; + next.put(frame); + inbox.put(next); + mode = Mode.PROCESS; + } else { + // spill. This will always succeed since spilled = 0 (TODO must verify that budget can't be 0) + spiller.spill(frame); + numSpilled++; + inbox.put(SPILLED); } } else { // try to spill. If failed switch to either discard or stall if (spiller.spill(frame)) { - notifyDiskConsumer(); + inbox.put(SPILLED); numSpilled++; } else { if (fpa.discardOnCongestion()) { @@ -377,17 +341,6 @@ } else { stall(frame); } - } - } - } - - private void notifyDiskConsumer() { - if (spiller.remaining() == 1) { - synchronized (mutex) { - if (DEBUG) { - LOGGER.info("Producer is waking up consumer"); - } - mutex.notify(); } } } @@ -458,39 +411,24 @@ try { ByteBuffer frame; boolean running = true; - while (running) { - frame = inbox.poll(); + for (; running; writer.flush()) { + frame = inbox.take(); - if (frame == null && spiller != null) { + if (frame == SPILLED) { running = clearLocalFrames(); - continue; - } - - if (frame == null) { - synchronized (mutex) { - LOGGER.info("Consumer is going to sleep"); - mutex.wait(); - LOGGER.info("Consumer is waking up"); - } - continue; - } - - // process - if (frame.capacity() == 0) { + } else if (frame == POISON_PILL) { running = false; if (spiller != null ) { clearLocalFrames(); } } else { + // process try { - if (consume(frame) != null) { - return; - } + running = consume(frame) == null; } finally { framePool.release(frame); } } - writer.flush(); } } catch (Throwable th) { this.cause = th; @@ -507,7 +445,7 @@ return total; } - public LinkedBlockingQueue<ByteBuffer> getInternalBuffer() { + public BlockingQueue<ByteBuffer> getInternalBuffer() { return inbox; } } -- To view, visit https://asterix-gerrit.ics.uci.edu/1677 To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings Gerrit-MessageType: newchange Gerrit-Change-Id: I7e091f65eb5f3a76277803b3197d490d3ef2fc04 Gerrit-PatchSet: 1 Gerrit-Project: asterixdb Gerrit-Branch: master Gerrit-Owner: Michael Blow <[email protected]>
