Move from Batch to Persistence proc. the responsiblity of adding sendReplies to Reply proc.
Change-Id: I754f7189a166420652fcfec4fa4c1497212f8d7c Project: http://git-wip-us.apache.org/repos/asf/incubator-omid/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-omid/commit/aa2651a0 Tree: http://git-wip-us.apache.org/repos/asf/incubator-omid/tree/aa2651a0 Diff: http://git-wip-us.apache.org/repos/asf/incubator-omid/diff/aa2651a0 Branch: refs/heads/master Commit: aa2651a0a866e60e49504d6b9b6d4b47062a1c5a Parents: 1d60f21 Author: Francisco Perez-Sorrosal <fpe...@yahoo-inc.com> Authored: Wed May 4 10:04:28 2016 -0700 Committer: Francisco Perez-Sorrosal <fpe...@yahoo-inc.com> Committed: Wed May 4 10:04:28 2016 -0700 ---------------------------------------------------------------------- .../main/java/org/apache/omid/tso/Batch.java | 38 ++++++------------- .../omid/tso/PersistenceProcessorHandler.java | 40 ++++++++++++++++---- .../java/org/apache/omid/tso/TestBatch.java | 8 ++-- 3 files changed, 48 insertions(+), 38 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/aa2651a0/tso-server/src/main/java/org/apache/omid/tso/Batch.java ---------------------------------------------------------------------- diff --git a/tso-server/src/main/java/org/apache/omid/tso/Batch.java b/tso-server/src/main/java/org/apache/omid/tso/Batch.java index 2b17f23..b3b9eef 100644 --- a/tso-server/src/main/java/org/apache/omid/tso/Batch.java +++ b/tso-server/src/main/java/org/apache/omid/tso/Batch.java @@ -91,6 +91,18 @@ public class Batch { } + PersistEvent get(int idx) { + return events[idx]; + } + + void set(int idx, PersistEvent event) { + events[idx] = event; + } + + void decreaseNumEvents() { + numEvents--; + } + void addCommit(long startTimestamp, long commitTimestamp, Channel c, MonitoringContext context) { Preconditions.checkState(!isFull(), "batch is full"); int index = numEvents++; @@ -123,32 +135,6 @@ public class Batch { } - void sendReply(ReplyProcessor reply, RetryProcessor retryProc, long batchID) { - - int i = 0; - while (i < numEvents) { - PersistEvent e = events[i]; - if (e.getType() == Type.ABORT && e.isRetry()) { - retryProc.disambiguateRetryRequestHeuristically(e.getStartTimestamp(), e.getChannel(), e.getMonCtx()); - PersistEvent tmp = events[i]; - //TODO: why assign it? - events[i] = events[numEvents - 1]; - events[numEvents - 1] = tmp; - if (numEvents == 1) { - clear(); - reply.manageResponsesBatch(batchID, null); - return; - } - numEvents--; - continue; - } - i++; - } - - reply.manageResponsesBatch(batchID, this); - - } - @Override public String toString() { return Objects.toStringHelper(this) http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/aa2651a0/tso-server/src/main/java/org/apache/omid/tso/PersistenceProcessorHandler.java ---------------------------------------------------------------------- diff --git a/tso-server/src/main/java/org/apache/omid/tso/PersistenceProcessorHandler.java b/tso-server/src/main/java/org/apache/omid/tso/PersistenceProcessorHandler.java index 84890b9..099cf88 100644 --- a/tso-server/src/main/java/org/apache/omid/tso/PersistenceProcessorHandler.java +++ b/tso-server/src/main/java/org/apache/omid/tso/PersistenceProcessorHandler.java @@ -17,7 +17,6 @@ */ package org.apache.omid.tso; -import com.lmax.disruptor.LifecycleAware; import com.lmax.disruptor.WorkHandler; import org.apache.omid.committable.CommitTable; import org.apache.omid.metrics.Histogram; @@ -94,25 +93,24 @@ public class PersistenceProcessorHandler implements WorkHandler<PersistenceProce throw new RuntimeException("Unknown event type: " + localEvent.getType().name()); } } - flush(batch, event.getBatchSequence()); - + if (batch.getNumEvents() > 0) { + flush(batch.getNumEvents()); + sendReplies(batch, event.getBatchSequence()); + } } - private void flush(Batch batch, long batchSequence) { + private void flush(int numBatchedEvents) { - if (batch.getNumEvents() > 0) { commitSuicideIfNotMaster(); try { long startFlushTimeInNs = System.nanoTime(); writer.flush(); flushTimer.update(System.nanoTime() - startFlushTimeInNs); - batchSizeHistogram.update(batch.getNumEvents()); + batchSizeHistogram.update(numBatchedEvents); } catch (IOException e) { panicker.panic("Error persisting commit batch", e); } commitSuicideIfNotMaster(); // TODO Here, we can return the client responses before committing suicide - batch.sendReply(replyProcessor, retryProc, batchSequence); - } } @@ -122,4 +120,30 @@ public class PersistenceProcessorHandler implements WorkHandler<PersistenceProce } } + private void sendReplies(Batch batch, long batchSequence) { + + int i = 0; + while (i < batch.getNumEvents()) { + PersistEvent e = batch.get(i); + if (e.getType() == PersistEvent.Type.ABORT && e.isRetry()) { + retryProc.disambiguateRetryRequestHeuristically(e.getStartTimestamp(), e.getChannel(), e.getMonCtx()); + PersistEvent tmp = batch.get(i); + //TODO: why assign it? + batch.set(i, batch.get(batch.getNumEvents() - 1)); + batch.set(batch.getNumEvents() - 1, tmp); + if (batch.getNumEvents() == 1) { + batch.clear(); + replyProcessor.manageResponsesBatch(batchSequence, null); + return; + } + batch.decreaseNumEvents(); + continue; + } + i++; + } + + replyProcessor.manageResponsesBatch(batchSequence, batch); + + } + } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/aa2651a0/tso-server/src/test/java/org/apache/omid/tso/TestBatch.java ---------------------------------------------------------------------- diff --git a/tso-server/src/test/java/org/apache/omid/tso/TestBatch.java b/tso-server/src/test/java/org/apache/omid/tso/TestBatch.java index c003f34..2b8b318 100644 --- a/tso-server/src/test/java/org/apache/omid/tso/TestBatch.java +++ b/tso-server/src/test/java/org/apache/omid/tso/TestBatch.java @@ -110,8 +110,8 @@ public class TestBatch { // assertFalse(batch.isFull(), "Batch shouldn't be full"); // assertEquals(batch.getNumEvents(), 0, "Num events should be 0"); //======= - batch.sendReply(replyProcessor, retryProcessor, (-1)); - verify(replyProcessor, timeout(100).times(1)).manageResponsesBatch((-1), batch); +// batch.sendReply(replyProcessor, retryProcessor, (-1)); + //verify(replyProcessor, timeout(100).times(1)).manageResponsesBatch((-1), batch); assertTrue(batch.isFull(), "Batch shouldn't be empty"); } @@ -135,8 +135,8 @@ public class TestBatch { // Test that sending replies empties the batch also when the replica is NOT master and calls the // ambiguousCommitResponse() method on the reply processor - batch.sendReply(replyProcessor, retryProcessor, (-1)); - verify(replyProcessor, timeout(100).times(1)).manageResponsesBatch((-1), batch); + //batch.sendReply(replyProcessor, retryProcessor, (-1)); + //verify(replyProcessor, timeout(100).times(1)).manageResponsesBatch((-1), batch); assertTrue(batch.isFull(), "Batch should be full"); }