Fix batchlog to account for CF truncation records patch by Aleksey Yeschenko; reviewed by Jonathan Ellis for CASSANDRA-6999
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/87097066 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/87097066 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/87097066 Branch: refs/heads/cassandra-2.1 Commit: 87097066e7c3c133e333804c4e4b00457b6c989d Parents: fe94e90 Author: Aleksey Yeschenko <alek...@apache.org> Authored: Fri Apr 18 01:36:08 2014 +0300 Committer: Aleksey Yeschenko <alek...@apache.org> Committed: Fri Apr 18 01:38:55 2014 +0300 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../apache/cassandra/db/BatchlogManager.java | 102 ++++++++++++------- .../apache/cassandra/db/ColumnFamilyStore.java | 6 -- .../cassandra/db/HintedHandOffManager.java | 16 +-- .../org/apache/cassandra/db/RowMutation.java | 6 +- .../org/apache/cassandra/db/SystemTable.java | 53 +++++++--- .../db/commitlog/CommitLogReplayer.java | 4 +- .../apache/cassandra/service/StorageProxy.java | 9 +- .../cassandra/db/BatchlogManagerTest.java | 78 ++++++++++++-- .../apache/cassandra/db/HintedHandOffTest.java | 2 +- 10 files changed, 189 insertions(+), 88 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/87097066/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 07c09cf..bb08a37 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -5,6 +5,7 @@ * Schedule schema pulls on change (CASSANDRA-6971) * Non-droppable verbs shouldn't be dropped from OTC (CASSANDRA-6980) * Shutdown batchlog executor in SS#drain() (CASSANDRA-7025) + * Fix batchlog to account for CF truncation records (CASSANDRA-6999) 1.2.16 http://git-wip-us.apache.org/repos/asf/cassandra/blob/87097066/src/java/org/apache/cassandra/db/BatchlogManager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/BatchlogManager.java b/src/java/org/apache/cassandra/db/BatchlogManager.java index b8dbadd..ea32e9d 100644 --- a/src/java/org/apache/cassandra/db/BatchlogManager.java +++ b/src/java/org/apache/cassandra/db/BatchlogManager.java @@ -24,10 +24,7 @@ import java.lang.management.ManagementFactory; import java.net.InetAddress; import java.nio.ByteBuffer; import java.util.*; -import java.util.concurrent.CopyOnWriteArraySet; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; +import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import javax.management.MBeanServer; @@ -36,6 +33,7 @@ import javax.management.ObjectName; import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Iterables; import com.google.common.collect.Lists; +import com.google.common.collect.Sets; import com.google.common.util.concurrent.RateLimiter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -254,45 +252,72 @@ public class BatchlogManager implements BatchlogManagerMBean { DataInputStream in = new DataInputStream(ByteBufferUtil.inputStream(data)); int size = in.readInt(); + List<RowMutation> mutations = new ArrayList<RowMutation>(size); + for (int i = 0; i < size; i++) - replaySerializedMutation(RowMutation.serializer.deserialize(in, VERSION), writtenAt, rateLimiter); + { + RowMutation mutation = RowMutation.serializer.deserialize(in, VERSION); + + // Remove CFs that have been truncated since. writtenAt and SystemTable#getTruncatedAt() both return millis. + // We don't abort the replay entirely b/c this can be considered a succes (truncated is same as delivered then + // truncated. + for (UUID cfId : mutation.getColumnFamilyIds()) + if (writtenAt <= SystemTable.getTruncatedAt(cfId)) + mutation = mutation.without(cfId); + + if (!mutation.isEmpty()) + mutations.add(mutation); + } + + if (!mutations.isEmpty()) + replayMutations(mutations, writtenAt, rateLimiter); } /* * We try to deliver the mutations to the replicas ourselves if they are alive and only resort to writing hints * when a replica is down or a write request times out. */ - private void replaySerializedMutation(RowMutation mutation, long writtenAt, RateLimiter rateLimiter) throws IOException + private void replayMutations(List<RowMutation> mutations, long writtenAt, RateLimiter rateLimiter) throws IOException { - int ttl = calculateHintTTL(mutation, writtenAt); + int ttl = calculateHintTTL(mutations, writtenAt); if (ttl <= 0) - return; // the mutation isn't safe to replay. - - Set<InetAddress> liveEndpoints = new HashSet<InetAddress>(); - String ks = mutation.getTable(); - Token tk = StorageService.getPartitioner().getToken(mutation.key()); - int mutationSize = (int) RowMutation.serializer.serializedSize(mutation, VERSION); + return; // this batchlog entry has 'expired' - for (InetAddress endpoint : Iterables.concat(StorageService.instance.getNaturalEndpoints(ks, tk), - StorageService.instance.getTokenMetadata().pendingEndpointsFor(tk, ks))) + for (RowMutation mutation : mutations) { - rateLimiter.acquire(mutationSize); - if (endpoint.equals(FBUtilities.getBroadcastAddress())) - mutation.apply(); - else if (FailureDetector.instance.isAlive(endpoint)) - liveEndpoints.add(endpoint); // will try delivering directly instead of writing a hint. - else - StorageProxy.writeHintForMutation(mutation, ttl, endpoint); - } + Set<InetAddress> liveEndpoints = Sets.newHashSet(); + List<InetAddress> hintEndpoints = Lists.newArrayList(); - if (!liveEndpoints.isEmpty()) - attemptDirectDelivery(mutation, writtenAt, liveEndpoints); + String ks = mutation.getTable(); + Token tk = StorageService.getPartitioner().getToken(mutation.key()); + int mutationSize = (int) RowMutation.serializer.serializedSize(mutation, VERSION); + + for (InetAddress endpoint : Iterables.concat(StorageService.instance.getNaturalEndpoints(ks, tk), + StorageService.instance.getTokenMetadata().pendingEndpointsFor(tk, ks))) + { + rateLimiter.acquire(mutationSize); + if (endpoint.equals(FBUtilities.getBroadcastAddress())) + mutation.apply(); + else if (FailureDetector.instance.isAlive(endpoint)) + liveEndpoints.add(endpoint); // will try delivering directly instead of writing a hint. + else + hintEndpoints.add(endpoint); + } + + if (!liveEndpoints.isEmpty()) + hintEndpoints.addAll(attemptDirectDelivery(mutation, liveEndpoints)); + + for (InetAddress endpoint : hintEndpoints) + StorageProxy.writeHintForMutation(mutation, writtenAt, ttl, endpoint); + } } - private void attemptDirectDelivery(RowMutation mutation, long writtenAt, Set<InetAddress> endpoints) throws IOException + // Returns the endpoints we failed to deliver to. + private Set<InetAddress> attemptDirectDelivery(RowMutation mutation, Set<InetAddress> endpoints) throws IOException { - List<WriteResponseHandler> handlers = Lists.newArrayList(); - final CopyOnWriteArraySet<InetAddress> undelivered = new CopyOnWriteArraySet<InetAddress>(endpoints); + final List<WriteResponseHandler> handlers = Lists.newArrayList(); + final Set<InetAddress> undelivered = Collections.synchronizedSet(new HashSet<InetAddress>()); + for (final InetAddress ep : endpoints) { Runnable callback = new Runnable() @@ -320,20 +345,19 @@ public class BatchlogManager implements BatchlogManagerMBean } } - if (!undelivered.isEmpty()) - { - int ttl = calculateHintTTL(mutation, writtenAt); // recalculate ttl - if (ttl > 0) - for (InetAddress endpoint : undelivered) - StorageProxy.writeHintForMutation(mutation, ttl, endpoint); - } + return undelivered; } - // calculate ttl for the mutation's hint (and reduce ttl by the time the mutation spent in the batchlog). - // this ensures that deletes aren't "undone" by an old batch replay. - private int calculateHintTTL(RowMutation mutation, long writtenAt) + /* + * Calculate ttl for the mutations' hints (and reduce ttl by the time the mutations spent in the batchlog). + * This ensures that deletes aren't "undone" by an old batch replay. + */ + private int calculateHintTTL(List<RowMutation> mutations, long writtenAt) { - return (int) ((mutation.calculateHintTTL() * 1000 - (System.currentTimeMillis() - writtenAt)) / 1000); + int unadjustedTTL = Integer.MAX_VALUE; + for (RowMutation mutation : mutations) + unadjustedTTL = Math.min(unadjustedTTL, mutation.calculateHintTTL()); + return unadjustedTTL - (int) TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis() - writtenAt); } private static ByteBuffer columnName(String name) http://git-wip-us.apache.org/repos/asf/cassandra/blob/87097066/src/java/org/apache/cassandra/db/ColumnFamilyStore.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java index 9e6987d..1100fb9 100644 --- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java +++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java @@ -2096,10 +2096,4 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean { return getDataTracker().getDroppableTombstoneRatio(); } - - public long getTruncationTime() - { - Pair<ReplayPosition, Long> truncationRecord = SystemTable.getTruncationRecords().get(metadata.cfId); - return truncationRecord == null ? Long.MIN_VALUE : truncationRecord.right; - } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/87097066/src/java/org/apache/cassandra/db/HintedHandOffManager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/HintedHandOffManager.java b/src/java/org/apache/cassandra/db/HintedHandOffManager.java index b1ccbc3..427bbf2 100644 --- a/src/java/org/apache/cassandra/db/HintedHandOffManager.java +++ b/src/java/org/apache/cassandra/db/HintedHandOffManager.java @@ -30,7 +30,6 @@ import javax.management.MBeanServer; import javax.management.ObjectName; import com.google.common.annotations.VisibleForTesting; -import com.google.common.collect.ImmutableSet; import com.google.common.collect.ImmutableSortedSet; import com.google.common.collect.Lists; import com.google.common.util.concurrent.RateLimiter; @@ -377,20 +376,11 @@ public class HintedHandOffManager implements HintedHandOffManagerMBean throw new AssertionError(e); } - Map<UUID, Long> truncationTimesCache = new HashMap<UUID, Long>(); - for (UUID cfId : ImmutableSet.copyOf((rm.getColumnFamilyIds()))) + for (UUID cfId : rm.getColumnFamilyIds()) { - Long truncatedAt = truncationTimesCache.get(cfId); - if (truncatedAt == null) + if (hint.maxTimestamp() <= SystemTable.getTruncatedAt(cfId)) { - ColumnFamilyStore cfs = Table.open(rm.getTable()).getColumnFamilyStore(cfId); - truncatedAt = cfs.getTruncationTime(); - truncationTimesCache.put(cfId, truncatedAt); - } - - if (hint.maxTimestamp() < truncatedAt) - { - logger.debug("Skipping delivery of hint for truncated columnfamily {}" + cfId); + logger.debug("Skipping delivery of hint for truncated columnfamily {}", cfId); rm = rm.without(cfId); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/87097066/src/java/org/apache/cassandra/db/RowMutation.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/RowMutation.java b/src/java/org/apache/cassandra/db/RowMutation.java index 1a095c5..cdfc0ae 100644 --- a/src/java/org/apache/cassandra/db/RowMutation.java +++ b/src/java/org/apache/cassandra/db/RowMutation.java @@ -103,8 +103,10 @@ public class RowMutation implements IMutation /** * Returns mutation representing a Hint to be sent to <code>targetId</code> * as soon as it becomes available. See HintedHandoffManager for more details. + * + * @param now current time in milliseconds - relevant for hint replay handling of truncated CFs */ - public RowMutation toHint(int ttl, UUID targetId) throws IOException + public RowMutation toHint(long now, int ttl, UUID targetId) throws IOException { assert ttl > 0; @@ -116,7 +118,7 @@ public class RowMutation implements IMutation HintedHandOffManager.comparator.decompose(hintId, MessagingService.current_version)); rm.add(path, ByteBuffer.wrap(FBUtilities.serialize(this, serializer, MessagingService.current_version)), - System.currentTimeMillis(), + now, ttl); return rm; http://git-wip-us.apache.org/repos/asf/cassandra/blob/87097066/src/java/org/apache/cassandra/db/SystemTable.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/SystemTable.java b/src/java/org/apache/cassandra/db/SystemTable.java index fbd765f..c691e9f 100644 --- a/src/java/org/apache/cassandra/db/SystemTable.java +++ b/src/java/org/apache/cassandra/db/SystemTable.java @@ -80,6 +80,8 @@ public class SystemTable private static final String LOCAL_KEY = "local"; private static final ByteBuffer ALL_LOCAL_NODE_ID_KEY = ByteBufferUtil.bytes("Local"); + private static volatile Map<UUID, Pair<ReplayPosition, Long>> truncationRecords; + public enum BootstrapState { NEEDS_BOOTSTRAP, @@ -237,20 +239,22 @@ public class SystemTable } } - public static void saveTruncationRecord(ColumnFamilyStore cfs, long truncatedAt, ReplayPosition position) + public static synchronized void saveTruncationRecord(ColumnFamilyStore cfs, long truncatedAt, ReplayPosition position) { String req = "UPDATE system.%s SET truncated_at = truncated_at + %s WHERE key = '%s'"; processInternal(String.format(req, LOCAL_CF, truncationAsMapEntry(cfs, truncatedAt, position), LOCAL_KEY)); + truncationRecords = null; forceBlockingFlush(LOCAL_CF); } /** * This method is used to remove information about truncation time for specified column family */ - public static void removeTruncationRecord(UUID cfId) + public static synchronized void removeTruncationRecord(UUID cfId) { String req = "DELETE truncated_at[%s] from system.%s WHERE key = '%s'"; processInternal(String.format(req, cfId, LOCAL_CF, LOCAL_KEY)); + truncationRecords = null; forceBlockingFlush(LOCAL_CF); } @@ -271,22 +275,41 @@ public class SystemTable ByteBufferUtil.bytesToHex(ByteBuffer.wrap(out.getData(), 0, out.getLength()))); } - public static Map<UUID, Pair<ReplayPosition, Long>> getTruncationRecords() + public static ReplayPosition getTruncatedPosition(UUID cfId) { - String req = "SELECT truncated_at FROM system.%s WHERE key = '%s'"; - UntypedResultSet rows = processInternal(String.format(req, LOCAL_CF, LOCAL_KEY)); - if (rows.isEmpty()) - return Collections.emptyMap(); + Pair<ReplayPosition, Long> record = getTruncationRecord(cfId); + return record == null ? null : record.left; + } - UntypedResultSet.Row row = rows.one(); - Map<UUID, ByteBuffer> rawMap = row.getMap("truncated_at", UUIDType.instance, BytesType.instance); - if (rawMap == null) - return Collections.emptyMap(); + public static long getTruncatedAt(UUID cfId) + { + Pair<ReplayPosition, Long> record = getTruncationRecord(cfId); + return record == null ? Long.MIN_VALUE : record.right; + } + + private static synchronized Pair<ReplayPosition, Long> getTruncationRecord(UUID cfId) + { + if (truncationRecords == null) + truncationRecords = readTruncationRecords(); + return truncationRecords.get(cfId); + } + + private static Map<UUID, Pair<ReplayPosition, Long>> readTruncationRecords() + { + UntypedResultSet rows = processInternal(String.format("SELECT truncated_at FROM system.%s WHERE key = '%s'", + LOCAL_CF, + LOCAL_KEY)); + + Map<UUID, Pair<ReplayPosition, Long>> records = new HashMap<UUID, Pair<ReplayPosition, Long>>(); + + if (!rows.isEmpty() && rows.one().has("truncated_at")) + { + Map<UUID, ByteBuffer> map = rows.one().getMap("truncated_at", UUIDType.instance, BytesType.instance); + for (Map.Entry<UUID, ByteBuffer> entry : map.entrySet()) + records.put(entry.getKey(), truncationRecordFromBlob(entry.getValue())); + } - Map<UUID, Pair<ReplayPosition, Long>> positions = new HashMap<UUID, Pair<ReplayPosition, Long>>(); - for (Map.Entry<UUID, ByteBuffer> entry : rawMap.entrySet()) - positions.put(entry.getKey(), truncationRecordFromBlob(entry.getValue())); - return positions; + return records; } private static Pair<ReplayPosition, Long> truncationRecordFromBlob(ByteBuffer bytes) http://git-wip-us.apache.org/repos/asf/cassandra/blob/87097066/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java index 934cb6a..46d18b2 100644 --- a/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java +++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java @@ -68,7 +68,6 @@ public class CommitLogReplayer // compute per-CF and global replay positions cfPositions = new HashMap<UUID, ReplayPosition>(); Ordering<ReplayPosition> replayPositionOrdering = Ordering.from(ReplayPosition.comparator); - Map<UUID,Pair<ReplayPosition,Long>> truncationPositions = SystemTable.getTruncationRecords(); for (ColumnFamilyStore cfs : ColumnFamilyStore.all()) { // it's important to call RP.gRP per-cf, before aggregating all the positions w/ the Ordering.min call @@ -77,8 +76,7 @@ public class CommitLogReplayer ReplayPosition rp = ReplayPosition.getReplayPosition(cfs.getSSTables()); // but, if we've truncted the cf in question, then we need to need to start replay after the truncation - Pair<ReplayPosition, Long> truncateRecord = truncationPositions.get(cfs.metadata.cfId); - ReplayPosition truncatedAt = truncateRecord == null ? null : truncateRecord.left; + ReplayPosition truncatedAt = SystemTable.getTruncatedPosition(cfs.metadata.cfId); if (truncatedAt != null) rp = replayPositionOrdering.max(Arrays.asList(rp, truncatedAt)); http://git-wip-us.apache.org/repos/asf/cassandra/blob/87097066/src/java/org/apache/cassandra/service/StorageProxy.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java index ca82a1f..7ef3d72 100644 --- a/src/java/org/apache/cassandra/service/StorageProxy.java +++ b/src/java/org/apache/cassandra/service/StorageProxy.java @@ -560,7 +560,7 @@ public class StorageProxy implements StorageProxyMBean if (ttl > 0) { logger.debug("Adding hint for {}", target); - writeHintForMutation(mutation, ttl, target); + writeHintForMutation(mutation, System.currentTimeMillis(), ttl, target); // Notify the handler only for CL == ANY if (responseHandler != null && consistencyLevel == ConsistencyLevel.ANY) responseHandler.response(null); @@ -582,7 +582,10 @@ public class StorageProxy implements StorageProxyMBean return (Future<Void>) StageManager.getStage(Stage.MUTATION).submit(runnable); } - public static void writeHintForMutation(RowMutation mutation, int ttl, InetAddress target) throws IOException + /** + * @param now current time in milliseconds - relevant for hint replay handling of truncated CFs + */ + public static void writeHintForMutation(RowMutation mutation, long now, int ttl, InetAddress target) throws IOException { assert ttl > 0; UUID hostId = StorageService.instance.getTokenMetadata().getHostId(target); @@ -592,7 +595,7 @@ public class StorageProxy implements StorageProxyMBean return; } assert hostId != null : "Missing host ID for " + target.getHostAddress(); - mutation.toHint(ttl, hostId).apply(); + mutation.toHint(now, ttl, hostId).apply(); StorageMetrics.totalHints.inc(); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/87097066/test/unit/org/apache/cassandra/db/BatchlogManagerTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/BatchlogManagerTest.java b/test/unit/org/apache/cassandra/db/BatchlogManagerTest.java index 1d89f4b..fd2812f 100644 --- a/test/unit/org/apache/cassandra/db/BatchlogManagerTest.java +++ b/test/unit/org/apache/cassandra/db/BatchlogManagerTest.java @@ -19,7 +19,10 @@ package org.apache.cassandra.db; import java.net.InetAddress; import java.util.Collections; +import java.util.List; +import java.util.concurrent.ExecutionException; +import com.google.common.collect.Lists; import org.junit.Before; import org.junit.Test; @@ -28,6 +31,7 @@ import org.apache.cassandra.Util; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.cql3.QueryProcessor; import org.apache.cassandra.cql3.UntypedResultSet; +import org.apache.cassandra.db.commitlog.ReplayPosition; import org.apache.cassandra.db.filter.QueryPath; import org.apache.cassandra.locator.TokenMetadata; import org.apache.cassandra.service.StorageService; @@ -52,8 +56,8 @@ public class BatchlogManagerTest extends SchemaLoader @Test public void testReplay() throws Exception { - assertEquals(0, BatchlogManager.instance.countAllBatches()); - assertEquals(0, BatchlogManager.instance.getTotalBatchesReplayed()); + long initialAllBatches = BatchlogManager.instance.countAllBatches(); + long initialReplayedBatches = BatchlogManager.instance.getTotalBatchesReplayed(); // Generate 1000 mutations and put them all into the batchlog. // Half (500) ready to be replayed, half not. @@ -70,15 +74,15 @@ public class BatchlogManagerTest extends SchemaLoader // Flush the batchlog to disk (see CASSANDRA-6822). Table.open(Table.SYSTEM_KS).getColumnFamilyStore(SystemTable.BATCHLOG_CF).forceFlush(); - assertEquals(1000, BatchlogManager.instance.countAllBatches()); - assertEquals(0, BatchlogManager.instance.getTotalBatchesReplayed()); + assertEquals(1000, BatchlogManager.instance.countAllBatches() - initialAllBatches); + assertEquals(0, BatchlogManager.instance.getTotalBatchesReplayed() - initialReplayedBatches); // Force batchlog replay. BatchlogManager.instance.replayAllFailedBatches(); // Ensure that the first half, and only the first half, got replayed. - assertEquals(500, BatchlogManager.instance.countAllBatches()); - assertEquals(500, BatchlogManager.instance.getTotalBatchesReplayed()); + assertEquals(500, BatchlogManager.instance.countAllBatches() - initialAllBatches); + assertEquals(500, BatchlogManager.instance.getTotalBatchesReplayed() - initialReplayedBatches); for (int i = 0; i < 1000; i++) { @@ -99,4 +103,66 @@ public class BatchlogManagerTest extends SchemaLoader UntypedResultSet result = QueryProcessor.processInternal(String.format("SELECT count(*) FROM \"Keyspace1\".\"Standard1\"")); assertEquals(500, result.one().getLong("count")); } + + @Test + public void testTruncatedReplay() throws InterruptedException, ExecutionException + { + // Generate 2000 mutations (1000 batchlog entries) and put them all into the batchlog. + // Each batchlog entry with a mutation for Standard2 and Standard3. + // In the middle of the process, 'truncate' Standard2. + for (int i = 0; i < 1000; i++) + { + RowMutation mutation1 = new RowMutation("Keyspace1", bytes(i)); + mutation1.add(new QueryPath("Standard2", null, bytes(i)), bytes(i), 0); + RowMutation mutation2 = new RowMutation("Keyspace1", bytes(i)); + mutation2.add(new QueryPath("Standard3", null, bytes(i)), bytes(i), 0); + List<RowMutation> mutations = Lists.newArrayList(mutation1, mutation2); + + // Make sure it's ready to be replayed, so adjust the timestamp. + long timestamp = System.currentTimeMillis() - DatabaseDescriptor.getWriteRpcTimeout() * 2; + + if (i == 500) + SystemTable.saveTruncationRecord(Table.open("Keyspace1").getColumnFamilyStore("Standard2"), + timestamp, + ReplayPosition.NONE); + + // Adjust the timestamp (slightly) to make the test deterministic. + if (i >= 500) + timestamp++; + else + timestamp--; + + BatchlogManager.getBatchlogMutationFor(mutations, UUIDGen.getTimeUUID(), timestamp * 1000).apply(); + } + + // Flush the batchlog to disk (see CASSANDRA-6822). + Table.open(Table.SYSTEM_KS).getColumnFamilyStore(SystemTable.BATCHLOG_CF).forceFlush(); + + // Force batchlog replay. + BatchlogManager.instance.replayAllFailedBatches(); + + // We should see half of Standard2-targeted mutations written after the replay and all of Standard3 mutations applied. + for (int i = 0; i < 1000; i++) + { + UntypedResultSet result = QueryProcessor.processInternal(String.format("SELECT * FROM \"Keyspace1\".\"Standard2\" WHERE key = intAsBlob(%d)", i)); + if (i >= 500) + { + assertEquals(bytes(i), result.one().getBytes("key")); + assertEquals(bytes(i), result.one().getBytes("column1")); + assertEquals(bytes(i), result.one().getBytes("value")); + } + else + { + assertTrue(result.isEmpty()); + } + } + + for (int i = 0; i < 1000; i++) + { + UntypedResultSet result = QueryProcessor.processInternal(String.format("SELECT * FROM \"Keyspace1\".\"Standard3\" WHERE key = intAsBlob(%d)", i)); + assertEquals(bytes(i), result.one().getBytes("key")); + assertEquals(bytes(i), result.one().getBytes("column1")); + assertEquals(bytes(i), result.one().getBytes("value")); + } + } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/87097066/test/unit/org/apache/cassandra/db/HintedHandOffTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/HintedHandOffTest.java b/test/unit/org/apache/cassandra/db/HintedHandOffTest.java index a012109..7b4a736 100644 --- a/test/unit/org/apache/cassandra/db/HintedHandOffTest.java +++ b/test/unit/org/apache/cassandra/db/HintedHandOffTest.java @@ -70,7 +70,7 @@ public class HintedHandOffTest extends SchemaLoader ByteBufferUtil.EMPTY_BYTE_BUFFER, System.currentTimeMillis()); - rm.toHint(rm.calculateHintTTL(), UUID.randomUUID()).apply(); + rm.toHint(System.currentTimeMillis(), rm.calculateHintTTL(), UUID.randomUUID()).apply(); // flush data to disk hintStore.forceBlockingFlush();