abdullah alamoudi has submitted this change and it was merged. Change subject: ASTERIXDB-1883: FeedRuntimeInputHandler issues ......................................................................
ASTERIXDB-1883: FeedRuntimeInputHandler issues Recent commit https://asterix-gerrit.ics.uci.edu/#/c/1591/ includes a number of new issues in FeedRuntimeInputHandler: - hangs caused by race condition with mutex & inbox on close (observed on Jenkins) - CPU spin on disk spilling on empty inbox - The writer is not flushed in as many cases as before Change-Id: I7e091f65eb5f3a76277803b3197d490d3ef2fc04 Reviewed-on: https://asterix-gerrit.ics.uci.edu/1677 Sonar-Qube: Jenkins <[email protected]> Tested-by: Jenkins <[email protected]> BAD: Jenkins <[email protected]> Integration-Tests: Jenkins <[email protected]> Reviewed-by: Michael Blow <[email protected]> --- M asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FeedRuntimeInputHandler.java M asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/feed/test/InputHandlerTest.java 2 files changed, 156 insertions(+), 220 deletions(-) Approvals: Michael Blow: Looks good to me, approved Jenkins: Verified; No violations found; No violations found; Verified diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FeedRuntimeInputHandler.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FeedRuntimeInputHandler.java index 329451d..11561b5 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FeedRuntimeInputHandler.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FeedRuntimeInputHandler.java @@ -19,6 +19,7 @@ package org.apache.asterix.external.feed.dataflow; import java.nio.ByteBuffer; +import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; import java.util.logging.Level; import java.util.logging.Logger; @@ -51,7 +52,10 @@ private static final Logger LOGGER = Logger.getLogger(FeedRuntimeInputHandler.class.getName()); private static final double MAX_SPILL_USED_BEFORE_RESUME = 0.8; private static final boolean DEBUG = false; - private final Object mutex = new Object(); + private static final ByteBuffer POISON_PILL = ByteBuffer.allocate(0); + private static final ByteBuffer SPILLED = ByteBuffer.allocate(0); + private static final ByteBuffer FAIL = ByteBuffer.allocate(0); + private final FeedExceptionHandler exceptionHandler; private final FrameSpiller spiller; private final FeedPolicyAccessor fpa; @@ -59,7 +63,7 @@ private final int initialFrameSize; private final FrameTransporter consumer; private final Thread consumerThread; - private final LinkedBlockingQueue<ByteBuffer> inbox; + private final BlockingQueue<ByteBuffer> inbox; private final ConcurrentFramePool framePool; private Mode mode = Mode.PROCESS; private int total = 0; @@ -90,16 +94,24 @@ @Override public void open() throws HyracksDataException { - synchronized (writer) { - writer.open(); - consumerThread.start(); - } + writer.open(); + consumerThread.start(); } @Override public void fail() throws HyracksDataException { - synchronized (writer) { - writer.fail(); + ByteBuffer buffer = inbox.poll(); + while (buffer != null) { + if (buffer != SPILLED) { + framePool.release(buffer); + } + buffer = inbox.poll(); + } + try { + inbox.put(FAIL); + } catch (InterruptedException e) { + LOGGER.log(Level.WARNING, "interrupted", e); + Thread.currentThread().interrupt(); } } @@ -110,28 +122,18 @@ // If we use nextframe, chances are this frame will also be // flushed into the spilled file. This causes problem when trying to // read the frame and the size info is lost. - inbox.put(ByteBuffer.allocate(0)); - synchronized (mutex) { - if (DEBUG) { - LOGGER.info("Producer is waking up consumer"); - } - mutex.notify(); - } + inbox.put(POISON_PILL); consumerThread.join(); } catch (InterruptedException e) { - LOGGER.log(Level.WARNING, e.getMessage(), e); - } - try { - framePool.release(inbox); - } catch (Throwable th) { - LOGGER.log(Level.WARNING, th.getMessage(), th); + LOGGER.log(Level.WARNING, "interrupted", e); + Thread.currentThread().interrupt(); } try { if (spiller != null) { spiller.close(); } } catch (Throwable th) { - LOGGER.log(Level.WARNING, th.getMessage(), th); + LOGGER.log(Level.WARNING, "exception closing spiller", th); } finally { writer.close(); } @@ -163,8 +165,11 @@ } break; } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw HyracksDataException.create(e); } catch (Throwable th) { - throw new HyracksDataException(th); + throw HyracksDataException.create(th); } } @@ -182,7 +187,7 @@ } } - private void discard(ByteBuffer frame) throws HyracksDataException { + private void discard(ByteBuffer frame) throws HyracksDataException, InterruptedException { if (DEBUG) { LOGGER.info("starting discard(frame)"); } @@ -206,7 +211,7 @@ } numProcessedInMemory++; next.put(frame); - inbox.offer(next); + inbox.put(next); mode = Mode.PROCESS; return; } @@ -224,7 +229,7 @@ } } - private void exitProcessState(ByteBuffer frame) throws HyracksDataException { + private void exitProcessState(ByteBuffer frame) throws HyracksDataException, InterruptedException { if (fpa.spillToDiskOnCongestion()) { mode = Mode.SPILL; spiller.open(); @@ -237,7 +242,7 @@ } } - private void discardOrStall(ByteBuffer frame) throws HyracksDataException { + private void discardOrStall(ByteBuffer frame) throws HyracksDataException, InterruptedException { if (fpa.discardOnCongestion()) { mode = Mode.DISCARD; discard(frame); @@ -249,48 +254,33 @@ } } - private void stall(ByteBuffer frame) throws HyracksDataException { - try { + private void stall(ByteBuffer frame) throws HyracksDataException, InterruptedException { + if (DEBUG) { + LOGGER.info("in stall(frame). So far, I have stalled " + numStalled); + } + numStalled++; + // If spilling is enabled, we wait on the spiller + if (fpa.spillToDiskOnCongestion()) { if (DEBUG) { - LOGGER.info("in stall(frame). So far, I have stalled " + numStalled); + LOGGER.info("in stall(frame). Spilling is enabled so we will attempt to spill"); } - numStalled++; - // If spilling is enabled, we wait on the spiller - if (fpa.spillToDiskOnCongestion()) { - if (DEBUG) { - LOGGER.info("in stall(frame). Spilling is enabled so we will attempt to spill"); - } - waitforSpillSpace(); - spiller.spill(frame); - numSpilled++; - synchronized (mutex) { - if (DEBUG) { - LOGGER.info("Producer is waking up consumer"); - } - mutex.notify(); - } - return; - } - if (DEBUG) { - LOGGER.info("in stall(frame). Spilling is disabled. We will subscribe to frame pool"); - } - // Spilling is disabled, we subscribe to feedMemoryManager - frameAction.setFrame(frame); - framePool.subscribe(frameAction); - ByteBuffer temp = frameAction.retrieve(); - inbox.put(temp); - numProcessedInMemory++; - if (DEBUG) { - LOGGER.info("stall(frame) has been completed. Notifying the consumer that a frame is ready"); - } - synchronized (mutex) { - if (DEBUG) { - LOGGER.info("Producer is waking up consumer"); - } - mutex.notify(); - } - } catch (InterruptedException e) { - throw new HyracksDataException(e); + waitforSpillSpace(); + spiller.spill(frame); + numSpilled++; + inbox.put(SPILLED); + return; + } + if (DEBUG) { + LOGGER.info("in stall(frame). Spilling is disabled. We will subscribe to frame pool"); + } + // Spilling is disabled, we subscribe to feedMemoryManager + frameAction.setFrame(frame); + framePool.subscribe(frameAction); + ByteBuffer temp = frameAction.retrieve(); + inbox.put(temp); + numProcessedInMemory++; + if (DEBUG) { + LOGGER.info("stall(frame) has been completed. Notifying the consumer that a frame is ready"); } } @@ -307,19 +297,14 @@ } } - private void process(ByteBuffer frame) throws HyracksDataException { + private void process(ByteBuffer frame) throws HyracksDataException, InterruptedException { // Get a page from frame pool ByteBuffer next = (frame.capacity() <= framePool.getMaxFrameSize()) ? getFreeBuffer(frame.capacity()) : null; if (next != null) { // Got a page from memory pool numProcessedInMemory++; next.put(frame); - try { - inbox.put(next); - notifyMemoryConsumer(); - } catch (InterruptedException e) { - throw new HyracksDataException(e); - } + inbox.put(next); } else { if (DEBUG) { LOGGER.info("Couldn't allocate memory --> exitProcessState(frame)"); @@ -329,46 +314,29 @@ } } - private void notifyMemoryConsumer() { - if (inbox.size() == 1) { - synchronized (mutex) { - if (DEBUG) { - LOGGER.info("Producer is waking up consumer"); - } - mutex.notify(); - } - } - } - - private void spill(ByteBuffer frame) throws HyracksDataException { + private void spill(ByteBuffer frame) throws HyracksDataException, InterruptedException { if (spiller.switchToMemory()) { - synchronized (mutex) { - // Check if there is memory - ByteBuffer next = null; - if (frame.capacity() <= framePool.getMaxFrameSize()) { - next = getFreeBuffer(frame.capacity()); - } - if (next != null) { - spiller.close(); - numProcessedInMemory++; - next.put(frame); - inbox.offer(next); - notifyMemoryConsumer(); - mode = Mode.PROCESS; - } else { - // spill. This will always succeed since spilled = 0 (TODO must verify that budget can't be 0) - spiller.spill(frame); - numSpilled++; - if (DEBUG) { - LOGGER.info("Producer is waking up consumer"); - } - mutex.notify(); - } + // Check if there is memory + ByteBuffer next = null; + if (frame.capacity() <= framePool.getMaxFrameSize()) { + next = getFreeBuffer(frame.capacity()); + } + if (next != null) { + spiller.close(); + numProcessedInMemory++; + next.put(frame); + inbox.put(next); + mode = Mode.PROCESS; + } else { + // spill. This will always succeed since spilled = 0 (TODO must verify that budget can't be 0) + spiller.spill(frame); + numSpilled++; + inbox.put(SPILLED); } } else { // try to spill. If failed switch to either discard or stall if (spiller.spill(frame)) { - notifyDiskConsumer(); + inbox.put(SPILLED); numSpilled++; } else { if (fpa.discardOnCongestion()) { @@ -379,22 +347,6 @@ } } } - } - - private void notifyDiskConsumer() { - if (spiller.remaining() == 1) { - synchronized (mutex) { - if (DEBUG) { - LOGGER.info("Producer is waking up consumer"); - } - mutex.notify(); - } - } - } - - @Override - public void flush() throws HyracksDataException { - // no op } public int getNumDiscarded() { @@ -460,37 +412,28 @@ boolean running = true; while (running) { frame = inbox.poll(); - - if (frame == null && spiller != null) { - running = clearLocalFrames(); - continue; - } - if (frame == null) { - synchronized (mutex) { - LOGGER.info("Consumer is going to sleep"); - mutex.wait(); - LOGGER.info("Consumer is waking up"); - } - continue; + writer.flush(); + frame = inbox.take(); } - - // process - if (frame.capacity() == 0) { + if (frame == SPILLED) { + running = clearLocalFrames(); + } else if (frame == POISON_PILL) { running = false; - if (spiller != null ) { + if (spiller != null) { clearLocalFrames(); } + } else if (frame == FAIL) { + running = false; + writer.fail(); } else { + // process try { - if (consume(frame) != null) { - return; - } + running = consume(frame) == null; } finally { framePool.release(frame); } } - writer.flush(); } } catch (Throwable th) { this.cause = th; @@ -507,7 +450,7 @@ return total; } - public LinkedBlockingQueue<ByteBuffer> getInternalBuffer() { + public BlockingQueue<ByteBuffer> getInternalBuffer() { return inbox; } } diff --git a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/feed/test/InputHandlerTest.java b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/feed/test/InputHandlerTest.java index 2237505..b6343b9 100644 --- a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/feed/test/InputHandlerTest.java +++ b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/feed/test/InputHandlerTest.java @@ -48,14 +48,13 @@ import org.apache.hyracks.api.test.TestFrameWriter; import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor; import org.apache.hyracks.test.support.TestUtils; +import org.junit.After; import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; import org.mockito.Mockito; -import junit.framework.Test; -import junit.framework.TestCase; -import junit.framework.TestSuite; - -public class InputHandlerTest extends TestCase { +public class InputHandlerTest { private static final int DEFAULT_FRAME_SIZE = 32768; private static final int NUM_FRAMES = 128; @@ -67,14 +66,6 @@ private static final float DISCARD_ALLOWANCE = 0.15f; private static final ExecutorService EXECUTOR = Executors.newFixedThreadPool(1); private volatile static HyracksDataException cause = null; - - public InputHandlerTest(String testName) { - super(testName); - } - - public static Test suite() { - return new TestSuite(InputHandlerTest.class); - } private FeedRuntimeInputHandler createInputHandler(IHyracksTaskContext ctx, IFrameWriter writer, FeedPolicyAccessor fpa, ConcurrentFramePool framePool) throws HyracksDataException { @@ -122,12 +113,12 @@ } } - @org.junit.Before + @Before public void testCleanBefore() throws IOException { cleanDiskFiles(); } - @org.junit.After + @After public void testCleanAfter() throws IOException { cleanDiskFiles(); } @@ -139,11 +130,11 @@ Random random = new Random(); IHyracksTaskContext ctx = TestUtils.create(DEFAULT_FRAME_SIZE); // No spill, No discard - FeedPolicyAccessor fpa = createFeedPolicyAccessor(true, false, NUM_FRAMES * DEFAULT_FRAME_SIZE, - DISCARD_ALLOWANCE); + FeedPolicyAccessor fpa = + createFeedPolicyAccessor(true, false, NUM_FRAMES * DEFAULT_FRAME_SIZE, DISCARD_ALLOWANCE); // Non-Active Writer - TestFrameWriter writer = FrameWriterTestUtils.create(Collections.emptyList(), Collections.emptyList(), - false); + TestFrameWriter writer = + FrameWriterTestUtils.create(Collections.emptyList(), Collections.emptyList(), false); // FramePool ConcurrentFramePool framePool = new ConcurrentFramePool(NODE_ID, 0, DEFAULT_FRAME_SIZE); FeedRuntimeInputHandler handler = createInputHandler(ctx, writer, fpa, framePool); @@ -175,17 +166,17 @@ } } - @org.junit.Test + @Test public void testZeroMemoryFixedSizeFrameWithDiskNoDiscard() { try { int numRounds = 10; IHyracksTaskContext ctx = TestUtils.create(DEFAULT_FRAME_SIZE); // No spill, No discard - FeedPolicyAccessor fpa = createFeedPolicyAccessor(true, false, NUM_FRAMES * DEFAULT_FRAME_SIZE, - DISCARD_ALLOWANCE); + FeedPolicyAccessor fpa = + createFeedPolicyAccessor(true, false, NUM_FRAMES * DEFAULT_FRAME_SIZE, DISCARD_ALLOWANCE); // Non-Active Writer - TestFrameWriter writer = FrameWriterTestUtils.create(Collections.emptyList(), Collections.emptyList(), - false); + TestFrameWriter writer = + FrameWriterTestUtils.create(Collections.emptyList(), Collections.emptyList(), false); // FramePool ConcurrentFramePool framePool = new ConcurrentFramePool(NODE_ID, 0, DEFAULT_FRAME_SIZE); FeedRuntimeInputHandler handler = createInputHandler(ctx, writer, fpa, framePool); @@ -213,7 +204,6 @@ } finally { Assert.assertNull(cause); } - } /* @@ -221,7 +211,7 @@ * Discard = true; discard only 5% * Fixed size frames */ - @org.junit.Test + @Test public void testMemoryVarSizeFrameWithSpillWithDiscard() { try { int numberOfMemoryFrames = 50; @@ -230,14 +220,14 @@ int totalMinFrames = 0; IHyracksTaskContext ctx = TestUtils.create(DEFAULT_FRAME_SIZE); // Spill budget = Memory budget, No discard - FeedPolicyAccessor fpa = createFeedPolicyAccessor(true, true, DEFAULT_FRAME_SIZE * numberOfSpillFrames, - DISCARD_ALLOWANCE); + FeedPolicyAccessor fpa = + createFeedPolicyAccessor(true, true, DEFAULT_FRAME_SIZE * numberOfSpillFrames, DISCARD_ALLOWANCE); // Non-Active Writer TestControlledFrameWriter writer = FrameWriterTestUtils.create(DEFAULT_FRAME_SIZE, false); writer.freeze(); // FramePool - ConcurrentFramePool framePool = new ConcurrentFramePool(NODE_ID, numberOfMemoryFrames * DEFAULT_FRAME_SIZE, - DEFAULT_FRAME_SIZE); + ConcurrentFramePool framePool = + new ConcurrentFramePool(NODE_ID, numberOfMemoryFrames * DEFAULT_FRAME_SIZE, DEFAULT_FRAME_SIZE); FeedRuntimeInputHandler handler = createInputHandler(ctx, writer, fpa, framePool); handler.open(); ByteBuffer buffer1 = ByteBuffer.allocate(DEFAULT_FRAME_SIZE); @@ -300,8 +290,8 @@ Assert.assertEquals(0, handler.getNumDiscarded()); // We can only discard one frame double numDiscarded = 0; - boolean nextShouldDiscard = ((numDiscarded + 1.0) / (handler.getTotal() + 1.0)) <= fpa - .getMaxFractionDiscard(); + boolean nextShouldDiscard = + ((numDiscarded + 1.0) / (handler.getTotal() + 1.0)) <= fpa.getMaxFractionDiscard(); while (nextShouldDiscard) { handler.nextFrame(buffer5); numDiscarded++; @@ -333,21 +323,21 @@ * Discard = true * Fixed size frames */ - @org.junit.Test + @Test public void testMemoryFixedSizeFrameWithSpillWithDiscard() { try { int numberOfMemoryFrames = 50; int numberOfSpillFrames = 50; IHyracksTaskContext ctx = TestUtils.create(DEFAULT_FRAME_SIZE); // Spill budget = Memory budget, No discard - FeedPolicyAccessor fpa = createFeedPolicyAccessor(true, true, DEFAULT_FRAME_SIZE * numberOfSpillFrames, - DISCARD_ALLOWANCE); + FeedPolicyAccessor fpa = + createFeedPolicyAccessor(true, true, DEFAULT_FRAME_SIZE * numberOfSpillFrames, DISCARD_ALLOWANCE); // Non-Active Writer TestControlledFrameWriter writer = FrameWriterTestUtils.create(DEFAULT_FRAME_SIZE, false); writer.freeze(); // FramePool - ConcurrentFramePool framePool = new ConcurrentFramePool(NODE_ID, numberOfMemoryFrames * DEFAULT_FRAME_SIZE, - DEFAULT_FRAME_SIZE); + ConcurrentFramePool framePool = + new ConcurrentFramePool(NODE_ID, numberOfMemoryFrames * DEFAULT_FRAME_SIZE, DEFAULT_FRAME_SIZE); FeedRuntimeInputHandler handler = createInputHandler(ctx, writer, fpa, framePool); handler.open(); VSizeFrame frame = new VSizeFrame(ctx); @@ -370,8 +360,8 @@ Assert.assertEquals(0, handler.getNumDiscarded()); // We can only discard one frame double numDiscarded = 0; - boolean nextShouldDiscard = ((numDiscarded + 1.0) / (handler.getTotal() + 1.0)) <= fpa - .getMaxFractionDiscard(); + boolean nextShouldDiscard = + ((numDiscarded + 1.0) / (handler.getTotal() + 1.0)) <= fpa.getMaxFractionDiscard(); while (nextShouldDiscard) { handler.nextFrame(frame.getBuffer()); numDiscarded++; @@ -407,7 +397,7 @@ * Discard = true; discard only 5% * Fixed size frames */ - @org.junit.Test + @Test public void testMemoryVariableSizeFrameNoSpillWithDiscard() { try { int discardTestFrames = 100; @@ -419,8 +409,8 @@ TestControlledFrameWriter writer = FrameWriterTestUtils.create(DEFAULT_FRAME_SIZE, false); writer.freeze(); // FramePool - ConcurrentFramePool framePool = new ConcurrentFramePool(NODE_ID, discardTestFrames * DEFAULT_FRAME_SIZE, - DEFAULT_FRAME_SIZE); + ConcurrentFramePool framePool = + new ConcurrentFramePool(NODE_ID, discardTestFrames * DEFAULT_FRAME_SIZE, DEFAULT_FRAME_SIZE); FeedRuntimeInputHandler handler = createInputHandler(ctx, writer, fpa, framePool); handler.open(); // add NUM_FRAMES times @@ -436,8 +426,8 @@ } // Next call should NOT block but should discard. double numDiscarded = 0.0; - boolean nextShouldDiscard = ((numDiscarded + 1.0) / (handler.getTotal() + 1.0)) <= fpa - .getMaxFractionDiscard(); + boolean nextShouldDiscard = + ((numDiscarded + 1.0) / (handler.getTotal() + 1.0)) <= fpa.getMaxFractionDiscard(); while (nextShouldDiscard) { handler.nextFrame(buffer); numDiscarded++; @@ -448,9 +438,9 @@ Assert.fail("The producer should switch to stall mode since it is exceeding the discard allowance"); } else { // Check that no records were discarded - assertEquals((int) numDiscarded, handler.getNumDiscarded()); + Assert.assertEquals((int) numDiscarded, handler.getNumDiscarded()); // Check that one frame is spilled - assertEquals(handler.getNumSpilled(), 0); + Assert.assertEquals(handler.getNumSpilled(), 0); } // consume memory frames writer.unfreeze(); @@ -470,7 +460,7 @@ * Discard = true; discard only 5% * Fixed size frames */ - @org.junit.Test + @Test public void testMemoryFixedSizeFrameNoSpillWithDiscard() { try { int discardTestFrames = 100; @@ -481,8 +471,8 @@ TestControlledFrameWriter writer = FrameWriterTestUtils.create(DEFAULT_FRAME_SIZE, false); writer.freeze(); // FramePool - ConcurrentFramePool framePool = new ConcurrentFramePool(NODE_ID, discardTestFrames * DEFAULT_FRAME_SIZE, - DEFAULT_FRAME_SIZE); + ConcurrentFramePool framePool = + new ConcurrentFramePool(NODE_ID, discardTestFrames * DEFAULT_FRAME_SIZE, DEFAULT_FRAME_SIZE); FeedRuntimeInputHandler handler = createInputHandler(ctx, writer, fpa, framePool); handler.open(); VSizeFrame frame = new VSizeFrame(ctx); @@ -492,8 +482,8 @@ } // Next 5 calls call should NOT block but should discard. double numDiscarded = 0.0; - boolean nextShouldDiscard = ((numDiscarded + 1.0) / (handler.getTotal() + 1.0)) <= fpa - .getMaxFractionDiscard(); + boolean nextShouldDiscard = + ((numDiscarded + 1.0) / (handler.getTotal() + 1.0)) <= fpa.getMaxFractionDiscard(); while (nextShouldDiscard) { handler.nextFrame(frame.getBuffer()); numDiscarded++; @@ -505,9 +495,9 @@ Assert.fail("The producer should switch to stall mode since it is exceeding the discard allowance"); } else { // Check that no records were discarded - assertEquals((int) numDiscarded, handler.getNumDiscarded()); + Assert.assertEquals((int) numDiscarded, handler.getNumDiscarded()); // Check that one frame is spilled - assertEquals(handler.getNumSpilled(), 0); + Assert.assertEquals(handler.getNumSpilled(), 0); } // consume memory frames writer.unfreeze(); @@ -527,13 +517,13 @@ * Discard = false; * Fixed size frames */ - @org.junit.Test + @Test public void testMemoryFixedSizeFrameWithSpillNoDiscard() { try { IHyracksTaskContext ctx = TestUtils.create(DEFAULT_FRAME_SIZE); // Spill budget = Memory budget, No discard - FeedPolicyAccessor fpa = createFeedPolicyAccessor(true, false, DEFAULT_FRAME_SIZE * NUM_FRAMES, - DISCARD_ALLOWANCE); + FeedPolicyAccessor fpa = + createFeedPolicyAccessor(true, false, DEFAULT_FRAME_SIZE * NUM_FRAMES, DISCARD_ALLOWANCE); // Non-Active Writer TestControlledFrameWriter writer = FrameWriterTestUtils.create(DEFAULT_FRAME_SIZE, false); writer.freeze(); @@ -550,9 +540,9 @@ Future<?> result = EXECUTOR.submit(new Pusher(frame.getBuffer(), handler)); result.get(); // Check that no records were discarded - assertEquals(handler.getNumDiscarded(), 0); + Assert.assertEquals(handler.getNumDiscarded(), 0); // Check that one frame is spilled - assertEquals(handler.getNumSpilled(), 1); + Assert.assertEquals(handler.getNumSpilled(), 1); // consume memory frames writer.unfreeze(); handler.close(); @@ -571,7 +561,7 @@ * Fixed size frames * Very fast next operator */ - @org.junit.Test + @Test public void testMemoryFixedSizeFrameNoDiskNoDiscardFastConsumer() { try { int numRounds = 10; @@ -579,8 +569,8 @@ // No spill, No discard FeedPolicyAccessor fpa = createFeedPolicyAccessor(false, false, 0L, DISCARD_ALLOWANCE); // Non-Active Writer - TestFrameWriter writer = FrameWriterTestUtils.create(Collections.emptyList(), Collections.emptyList(), - false); + TestFrameWriter writer = + FrameWriterTestUtils.create(Collections.emptyList(), Collections.emptyList(), false); // FramePool ConcurrentFramePool framePool = new ConcurrentFramePool(NODE_ID, FEED_MEM_BUDGET, DEFAULT_FRAME_SIZE); FeedRuntimeInputHandler handler = createInputHandler(ctx, writer, fpa, framePool); @@ -612,7 +602,7 @@ * Fixed size frames * Slow next operator */ - @org.junit.Test + @Test public void testMemoryFixedSizeFrameNoDiskNoDiscardSlowConsumer() { try { int numRounds = 10; @@ -620,8 +610,8 @@ // No spill, No discard FeedPolicyAccessor fpa = createFeedPolicyAccessor(false, false, 0L, DISCARD_ALLOWANCE); // Non-Active Writer - TestFrameWriter writer = FrameWriterTestUtils.create(Collections.emptyList(), Collections.emptyList(), - false); + TestFrameWriter writer = + FrameWriterTestUtils.create(Collections.emptyList(), Collections.emptyList(), false); // FramePool ConcurrentFramePool framePool = new ConcurrentFramePool(NODE_ID, FEED_MEM_BUDGET, DEFAULT_FRAME_SIZE); FeedRuntimeInputHandler handler = createInputHandler(ctx, writer, fpa, framePool); @@ -653,6 +643,7 @@ * Discard = false * VarSizeFrame */ + @Test public void testMemoryVarSizeFrameNoDiskNoDiscard() { try { Random random = new Random(); @@ -682,12 +673,13 @@ Assert.fail(); } // Check that no records were discarded - assertEquals(handler.getNumDiscarded(), 0); + Assert.assertEquals(handler.getNumDiscarded(), 0); // Check that no records were spilled - assertEquals(handler.getNumSpilled(), 0); + Assert.assertEquals(handler.getNumSpilled(), 0); // Check that number of stalled is not greater than 1 Assert.assertTrue(handler.getNumStalled() <= 1); writer.unfreeze(); + handler.close(); result.get(); } catch (Throwable th) { th.printStackTrace(); @@ -701,15 +693,15 @@ * Discard = false; * Variable size frames */ - @org.junit.Test + @Test public void testMemoryVarSizeFrameWithSpillNoDiscard() { for (int k = 0; k < 1000; k++) { try { Random random = new Random(); IHyracksTaskContext ctx = TestUtils.create(DEFAULT_FRAME_SIZE); // Spill budget = Memory budget, No discard - FeedPolicyAccessor fpa = createFeedPolicyAccessor(true, false, DEFAULT_FRAME_SIZE * NUM_FRAMES, - DISCARD_ALLOWANCE); + FeedPolicyAccessor fpa = + createFeedPolicyAccessor(true, false, DEFAULT_FRAME_SIZE * NUM_FRAMES, DISCARD_ALLOWANCE); // Non-Active Writer TestControlledFrameWriter writer = FrameWriterTestUtils.create(DEFAULT_FRAME_SIZE, false); writer.freeze(); @@ -719,8 +711,10 @@ handler.open(); ByteBuffer buffer = ByteBuffer.allocate(DEFAULT_FRAME_SIZE); int multiplier = 1; + int numOfBuffersInMemory = 0; // add NUM_FRAMES times while ((multiplier <= framePool.remaining())) { + numOfBuffersInMemory++; handler.nextFrame(buffer); multiplier = random.nextInt(10) + 1; buffer = ByteBuffer.allocate(DEFAULT_FRAME_SIZE * multiplier); @@ -729,12 +723,11 @@ Future<?> result = EXECUTOR.submit(new Pusher(buffer, handler)); result.get(); // Check that no records were discarded - assertEquals(handler.getNumDiscarded(), 0); + Assert.assertEquals(handler.getNumDiscarded(), 0); // Check that one frame is spilled - assertEquals(handler.getNumSpilled(), 1); - int numOfBuffersInMemory = handler.getInternalBuffer().size(); + Assert.assertEquals(handler.getNumSpilled(), 1); // consume memory frames - while (numOfBuffersInMemory > 0) { + while (numOfBuffersInMemory > 1) { writer.kick(); numOfBuffersInMemory--; } @@ -756,7 +749,7 @@ * Discard = false; * Fixed size frames */ - @org.junit.Test + @Test public void testMemoryFixedSizeFrameNoDiskNoDiscard() { try { IHyracksTaskContext ctx = TestUtils.create(DEFAULT_FRAME_SIZE); -- To view, visit https://asterix-gerrit.ics.uci.edu/1677 To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings Gerrit-MessageType: merged Gerrit-Change-Id: I7e091f65eb5f3a76277803b3197d490d3ef2fc04 Gerrit-PatchSet: 7 Gerrit-Project: asterixdb Gerrit-Branch: master Gerrit-Owner: Michael Blow <[email protected]> Gerrit-Reviewer: Jenkins <[email protected]> Gerrit-Reviewer: Michael Blow <[email protected]> Gerrit-Reviewer: Xikui Wang <[email protected]> Gerrit-Reviewer: abdullah alamoudi <[email protected]>
