Repository: aurora Updated Branches: refs/heads/master cea43db9d -> 5e008ff7f
Remove redundant transaction recorder Reviewed at https://reviews.apache.org/r/64283/ Project: http://git-wip-us.apache.org/repos/asf/aurora/repo Commit: http://git-wip-us.apache.org/repos/asf/aurora/commit/5e008ff7 Tree: http://git-wip-us.apache.org/repos/asf/aurora/tree/5e008ff7 Diff: http://git-wip-us.apache.org/repos/asf/aurora/diff/5e008ff7 Branch: refs/heads/master Commit: 5e008ff7fe7f5701e1baf2051c2873f655ca7aed Parents: cea43db Author: Bill Farner <wfar...@apache.org> Authored: Sun Dec 3 06:50:45 2017 -0800 Committer: Bill Farner <wfar...@apache.org> Committed: Sun Dec 3 06:50:45 2017 -0800 ---------------------------------------------------------------------- .../scheduler/storage/log/LogPersistence.java | 5 +- .../scheduler/storage/log/StreamManager.java | 9 +- .../storage/log/StreamManagerImpl.java | 133 ++--------------- .../storage/log/StreamTransaction.java | 40 ----- .../scheduler/storage/log/LogManagerTest.java | 149 +------------------ 5 files changed, 21 insertions(+), 315 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/aurora/blob/5e008ff7/src/main/java/org/apache/aurora/scheduler/storage/log/LogPersistence.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/storage/log/LogPersistence.java b/src/main/java/org/apache/aurora/scheduler/storage/log/LogPersistence.java index a0a6b6c..e70e605 100644 --- a/src/main/java/org/apache/aurora/scheduler/storage/log/LogPersistence.java +++ b/src/main/java/org/apache/aurora/scheduler/storage/log/LogPersistence.java @@ -18,6 +18,7 @@ import java.util.Date; import java.util.Iterator; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; import java.util.stream.Stream; import java.util.stream.StreamSupport; @@ -99,10 +100,8 @@ class LogPersistence implements Persistence, DistributedSnapshotStore { @Override public void persist(Stream<Op> mutations) throws PersistenceException { - StreamTransaction transaction = streamManager.startTransaction(); - mutations.forEach(transaction::add); try { - transaction.commit(); + streamManager.commit(mutations.collect(Collectors.toList())); } catch (CodingException e) { throw new PersistenceException(e); } http://git-wip-us.apache.org/repos/asf/aurora/blob/5e008ff7/src/main/java/org/apache/aurora/scheduler/storage/log/StreamManager.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/storage/log/StreamManager.java b/src/main/java/org/apache/aurora/scheduler/storage/log/StreamManager.java index 18da32d..73602cb 100644 --- a/src/main/java/org/apache/aurora/scheduler/storage/log/StreamManager.java +++ b/src/main/java/org/apache/aurora/scheduler/storage/log/StreamManager.java @@ -14,8 +14,10 @@ package org.apache.aurora.scheduler.storage.log; import java.util.Iterator; +import java.util.List; import org.apache.aurora.gen.storage.LogEntry; +import org.apache.aurora.gen.storage.Op; import org.apache.aurora.gen.storage.Snapshot; import org.apache.aurora.scheduler.log.Log; @@ -52,12 +54,11 @@ public interface StreamManager { void truncateBefore(Log.Position position); /** - * Starts a transaction that can be used to commit a series of ops to the log stream atomically. + * Saves operations to the log stream. * - * @return StreamTransaction A transaction manager to handle batching up commits to the - * underlying stream. + * @param mutations Operations to save. */ - StreamTransaction startTransaction(); + void commit(List<Op> mutations); /** * Adds a snapshot to the log and if successful, truncates the log entries preceding the http://git-wip-us.apache.org/repos/asf/aurora/blob/5e008ff7/src/main/java/org/apache/aurora/scheduler/storage/log/StreamManagerImpl.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/storage/log/StreamManagerImpl.java b/src/main/java/org/apache/aurora/scheduler/storage/log/StreamManagerImpl.java index c5b107f..9eb309a 100644 --- a/src/main/java/org/apache/aurora/scheduler/storage/log/StreamManagerImpl.java +++ b/src/main/java/org/apache/aurora/scheduler/storage/log/StreamManagerImpl.java @@ -15,33 +15,24 @@ package org.apache.aurora.scheduler.storage.log; import java.util.Arrays; import java.util.Iterator; -import java.util.Map; -import java.util.concurrent.atomic.AtomicBoolean; +import java.util.List; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import javax.annotation.Nullable; import javax.inject.Inject; -import com.google.common.base.Preconditions; import com.google.common.collect.AbstractIterator; -import com.google.common.collect.ImmutableSet; -import com.google.common.collect.Iterables; -import com.google.common.collect.Maps; import com.google.common.hash.HashFunction; import com.google.common.hash.Hasher; import com.google.common.primitives.Bytes; import com.google.inject.assistedinject.Assisted; import org.apache.aurora.common.stats.Stats; -import org.apache.aurora.gen.ScheduledTask; import org.apache.aurora.gen.storage.Frame; import org.apache.aurora.gen.storage.FrameHeader; import org.apache.aurora.gen.storage.LogEntry; import org.apache.aurora.gen.storage.Op; -import org.apache.aurora.gen.storage.RemoveTasks; -import org.apache.aurora.gen.storage.SaveHostAttributes; -import org.apache.aurora.gen.storage.SaveTasks; import org.apache.aurora.gen.storage.Snapshot; import org.apache.aurora.gen.storage.Transaction; import org.apache.aurora.gen.storage.storageConstants; @@ -193,8 +184,16 @@ class StreamManagerImpl implements StreamManager { } @Override - public StreamTransactionImpl startTransaction() { - return new StreamTransactionImpl(); + public void commit(List<Op> mutations) { + if (mutations.isEmpty()) { + return; + } + + Transaction transaction = new Transaction() + .setSchemaVersion(storageConstants.CURRENT_SCHEMA_VERSION) + .setOps(mutations); + appendAndGetPosition(LogEntry.transaction(transaction)); + vars.unSnapshottedTransactions.incrementAndGet(); } @Override @@ -235,114 +234,4 @@ class StreamManagerImpl implements StreamManager { vars.entriesWritten.incrementAndGet(); return firstPosition; } - - final class StreamTransactionImpl implements StreamTransaction { - private final Transaction transaction = - new Transaction().setSchemaVersion(storageConstants.CURRENT_SCHEMA_VERSION); - private final AtomicBoolean committed = new AtomicBoolean(false); - - StreamTransactionImpl() { - // supplied by factory method - } - - @Override - public Log.Position commit() throws CodingException { - Preconditions.checkState(!committed.getAndSet(true), - "Can only call commit once per transaction."); - - if (!transaction.isSetOps()) { - return null; - } - - Log.Position position = appendAndGetPosition(LogEntry.transaction(transaction)); - vars.unSnapshottedTransactions.incrementAndGet(); - return position; - } - - @Override - public void add(Op op) { - Preconditions.checkState(!committed.get()); - - Op prior = transaction.isSetOps() ? Iterables.getLast(transaction.getOps(), null) : null; - if (prior == null || !coalesce(prior, op)) { - transaction.addToOps(op); - } - } - - /** - * Tries to coalesce a new op into the prior to compact the binary representation and increase - * batching. - * - * @param prior The previous op. - * @param next The next op to be added. - * @return {@code true} if the next op was coalesced into the prior, {@code false} otherwise. - */ - private boolean coalesce(Op prior, Op next) { - if (!prior.isSet() && !next.isSet()) { - return false; - } - - Op._Fields priorType = prior.getSetField(); - if (!priorType.equals(next.getSetField())) { - return false; - } - - switch (priorType) { - case SAVE_FRAMEWORK_ID: - prior.setSaveFrameworkId(next.getSaveFrameworkId()); - return true; - case SAVE_TASKS: - coalesce(prior.getSaveTasks(), next.getSaveTasks()); - return true; - case REMOVE_TASKS: - coalesce(prior.getRemoveTasks(), next.getRemoveTasks()); - return true; - case SAVE_HOST_ATTRIBUTES: - return coalesce(prior.getSaveHostAttributes(), next.getSaveHostAttributes()); - default: - return false; - } - } - - private void coalesce(SaveTasks prior, SaveTasks next) { - if (next.isSetTasks()) { - if (prior.isSetTasks()) { - // It is an expected invariant that an operation may reference a task (identified by - // task ID) no more than one time. Therefore, to coalesce two SaveTasks operations, - // the most recent task definition overrides the prior operation. - Map<String, ScheduledTask> coalesced = Maps.newHashMap(); - for (ScheduledTask task : prior.getTasks()) { - coalesced.put(task.getAssignedTask().getTaskId(), task); - } - for (ScheduledTask task : next.getTasks()) { - coalesced.put(task.getAssignedTask().getTaskId(), task); - } - prior.setTasks(ImmutableSet.copyOf(coalesced.values())); - } else { - prior.setTasks(next.getTasks()); - } - } - } - - private void coalesce(RemoveTasks prior, RemoveTasks next) { - if (next.isSetTaskIds()) { - if (prior.isSetTaskIds()) { - prior.setTaskIds(ImmutableSet.<String>builder() - .addAll(prior.getTaskIds()) - .addAll(next.getTaskIds()) - .build()); - } else { - prior.setTaskIds(next.getTaskIds()); - } - } - } - - private boolean coalesce(SaveHostAttributes prior, SaveHostAttributes next) { - if (prior.getHostAttributes().getHost().equals(next.getHostAttributes().getHost())) { - prior.getHostAttributes().setAttributes(next.getHostAttributes().getAttributes()); - return true; - } - return false; - } - } } http://git-wip-us.apache.org/repos/asf/aurora/blob/5e008ff7/src/main/java/org/apache/aurora/scheduler/storage/log/StreamTransaction.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/aurora/scheduler/storage/log/StreamTransaction.java b/src/main/java/org/apache/aurora/scheduler/storage/log/StreamTransaction.java deleted file mode 100644 index a51fd18..0000000 --- a/src/main/java/org/apache/aurora/scheduler/storage/log/StreamTransaction.java +++ /dev/null @@ -1,40 +0,0 @@ -/** - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.aurora.scheduler.storage.log; - -import org.apache.aurora.codec.ThriftBinaryCodec; -import org.apache.aurora.gen.storage.Op; -import org.apache.aurora.scheduler.log.Log; - -/** - * Manages a single log stream append transaction. Local storage ops can be added to the - * transaction and then later committed as an atomic unit. - */ -interface StreamTransaction { - /** - * Appends any ops that have been added to this transaction to the log stream in a single - * atomic record. - * - * @return The position of the log entry committed in this transaction, if any. - * @throws CodingException If there was a problem encoding a log entry for commit. - */ - Log.Position commit() throws ThriftBinaryCodec.CodingException; - - /** - * Adds a local storage operation to this transaction. - * - * @param op The local storage op to add. - */ - void add(Op op); -} http://git-wip-us.apache.org/repos/asf/aurora/blob/5e008ff7/src/test/java/org/apache/aurora/scheduler/storage/log/LogManagerTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/aurora/scheduler/storage/log/LogManagerTest.java b/src/test/java/org/apache/aurora/scheduler/storage/log/LogManagerTest.java index cb38f10..4d210b2 100644 --- a/src/test/java/org/apache/aurora/scheduler/storage/log/LogManagerTest.java +++ b/src/test/java/org/apache/aurora/scheduler/storage/log/LogManagerTest.java @@ -16,13 +16,8 @@ package org.apache.aurora.scheduler.storage.log; import java.nio.ByteBuffer; import java.security.MessageDigest; import java.util.Collections; -import java.util.Deque; -import java.util.Iterator; import java.util.List; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.LinkedBlockingDeque; -import com.google.common.base.Function; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Iterables; @@ -61,25 +56,13 @@ import org.junit.Test; import static org.apache.aurora.scheduler.storage.log.SnapshotDeduplicator.SnapshotDeduplicatorImpl; import static org.easymock.EasyMock.expect; -import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotSame; -import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertSame; public class LogManagerTest extends EasyMockTest { private static final Amount<Integer, Data> NO_FRAMES_EVER_SIZE = Amount.of(Integer.MAX_VALUE, Data.GB); - private static final Function<LogEntry, byte[]> ENCODER = entry -> { - try { - return encode(entry); - } catch (CodingException e) { - throw new RuntimeException(e); - } - }; - private Stream stream; private Position position1; private Position position2; @@ -139,41 +122,10 @@ public class LogManagerTest extends EasyMockTest { } @Test - public void testStreamManagerSuccessiveCommits() throws CodingException { - control.replay(); - - StreamManager streamManager = createNoMessagesStreamManager(); - StreamTransaction streamTransaction = streamManager.startTransaction(); - streamTransaction.commit(); - - assertNotSame("Expected a new transaction to be started after a commit", - streamTransaction, streamManager.startTransaction()); - } - - @Test public void testTransactionEmpty() throws CodingException { control.replay(); - Position position = createNoMessagesStreamManager().startTransaction().commit(); - assertNull(position); - } - - @Test(expected = IllegalStateException.class) - public void testTransactionDoubleCommit() throws CodingException { - control.replay(); - - StreamTransaction streamTransaction = createNoMessagesStreamManager().startTransaction(); - streamTransaction.commit(); - streamTransaction.commit(); - } - - @Test(expected = IllegalStateException.class) - public void testTransactionAddAfterCommit() throws CodingException { - control.replay(); - - StreamTransaction streamTransaction = createNoMessagesStreamManager().startTransaction(); - streamTransaction.commit(); - streamTransaction.add(Op.saveFrameworkId(new SaveFrameworkId("don't allow this"))); + createNoMessagesStreamManager().commit(ImmutableList.of()); } private static class LogEntryMatcher implements IArgumentMatcher { @@ -228,12 +180,7 @@ public class LogManagerTest extends EasyMockTest { StreamManager streamManager = createNoMessagesStreamManager(); control.replay(); - StreamTransaction transaction = streamManager.startTransaction(); - transaction.add(saveFrameworkId); - transaction.add(deleteJob); - - Position position = transaction.commit(); - assertSame(position1, position); + streamManager.commit(ImmutableList.of(saveFrameworkId, deleteJob)); } static class Message { @@ -281,97 +228,7 @@ public class LogManagerTest extends EasyMockTest { StreamManager streamManager = createStreamManager(message.chunkSize); control.replay(); - StreamTransaction transaction = streamManager.startTransaction(); - transaction.add(saveFrameworkId); - - Position position = transaction.commit(); - assertSame(position1, position); - } - - @Test - public void testConcurrentWrites() throws Exception { - control.replay(); // No easymock expectations used here - - Op op1 = Op.removeJob(new RemoveJob(JobKeys.from("r1", "env", "name").newBuilder())); - final Op op2 = Op.removeJob(new RemoveJob(JobKeys.from("r2", "env", "name").newBuilder())); - - LogEntry transaction1 = createLogEntry(op1); - LogEntry transaction2 = createLogEntry(op2); - - final CountDownLatch message1Started = new CountDownLatch(1); - - Message message1 = frame(transaction1); - Message message2 = frame(transaction2); - - List<byte[]> expectedAppends = - ImmutableList.<byte[]>builder() - .add(encode(message1.header)) - .addAll(Iterables.transform(message1.chunks, ENCODER)) - .add(encode(message2.header)) - .addAll(Iterables.transform(message2.chunks, ENCODER)) - .build(); - - final Deque<byte[]> actualAppends = new LinkedBlockingDeque<>(); - - Stream mockStream = new Stream() { - @Override - public Position append(byte[] contents) throws StreamAccessException { - actualAppends.addLast(contents); - message1Started.countDown(); - try { - // If a chunked message is not properly serialized to the log, this sleep all but ensures - // interleaved chunk writes and a test failure. - Thread.sleep(100); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } - return null; - } - - @Override - public Iterator<Entry> readAll() throws InvalidPositionException, StreamAccessException { - throw new UnsupportedOperationException(); - } - - @Override - public void truncateBefore(Position position) - throws InvalidPositionException, StreamAccessException { - throw new UnsupportedOperationException(); - } - }; - - final StreamManagerImpl streamManager = new StreamManagerImpl( - mockStream, - new EntrySerializer.EntrySerializerImpl(message1.chunkSize, Hashing.md5()), - Hashing.md5(), - new SnapshotDeduplicatorImpl()); - StreamTransaction tr1 = streamManager.startTransaction(); - tr1.add(op1); - - Thread snapshotThread = new Thread() { - @Override - public void run() { - StreamTransaction tr2 = streamManager.startTransaction(); - tr2.add(op2); - try { - message1Started.await(); - tr2.commit(); - } catch (CodingException | InterruptedException e) { - throw new RuntimeException(e); - } - } - }; - snapshotThread.setDaemon(true); - snapshotThread.start(); - - tr1.commit(); - - snapshotThread.join(); - - assertEquals(expectedAppends.size(), actualAppends.size()); - for (byte[] expectedData : expectedAppends) { - assertArrayEquals(expectedData, actualAppends.removeFirst()); - } + streamManager.commit(ImmutableList.of(saveFrameworkId)); } @Test