beobal commented on code in PR #3088:
URL: https://github.com/apache/cassandra/pull/3088#discussion_r1479697203
##########
src/java/org/apache/cassandra/tcm/log/LogReader.java:
##########
@@ -18,16 +18,132 @@
package org.apache.cassandra.tcm.log;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import com.google.common.collect.ImmutableList;
+
+import org.apache.cassandra.tcm.ClusterMetadata;
import org.apache.cassandra.tcm.Epoch;
-import org.apache.cassandra.tcm.Sealed;
+import org.apache.cassandra.tcm.MetadataSnapshots;
+import org.apache.cassandra.tcm.Period;
public interface LogReader
{
- default Replication getReplication(Epoch since)
+ /**
+ * Gets all entries where epoch >= since in the given period - could be
empty if since is in a later epoch
+ */
+ EntryHolder getEntries(long period, Epoch since) throws IOException;
+ MetadataSnapshots snapshots();
+
+ /**
+ * Idea is to fill LogState "backwards" - we start querying the partition
at currentPeriod, and if that doesn't
+ * include `since` we read currentPeriod - 1. Assuming we have something
like
+
+ * epoch | period | transformation
+ * 10 | 2 | SEAL_PERIOD
+ * 11 | 3 | SOMETHING
+ * 12 | 3 | SOMETHING
+ * 13 | 3 | SEAL_PERIOD
+ * 14 | 4 | SOMETHING
+ * 15 | 4 | SOMETHING
+ * 16 | 4 | SEAL_PERIOD
+ * 17 | 5 | SOMETHING
+ *
+ * and `since` is 14, we want to return epoch 15, 16, 17 - not the
snapshot at 16 + entry at 17 since it is assumed that the full
+ * snapshot is much larger than the transformations.
+ * But, if `since` is 11, we want to return the most recent snapshot (at
epoch 16) + entry at 17.
+ *
+ * If a snapshot is missing we keep reading backwards until we find one,
or we end up at period 0 and in that
+ * case we return all transformations in the log.
+ */
+ default LogState getLogState(long currentPeriod, Epoch since)
{
- Sealed sealed = Sealed.lookupForReplication(since);
- return getReplication(sealed.period, since);
+ try
+ {
+ EntryHolder current = getEntries(currentPeriod, since);
+ if (current.done)
+ return new LogState(null,
ImmutableList.sortedCopyOf(current.entries));
+ List<Entry> allEntries = new ArrayList<>(current.entries);
+ int i = 0;
+ while (true)
+ {
+ i++;
+ EntryHolder previous = getEntries(currentPeriod - i, since);
+ allEntries.addAll(previous.entries);
+ if (isContinuous(since, allEntries) && (previous.done ||
currentPeriod - i == Period.FIRST))
+ {
+ return new LogState(null,
ImmutableList.sortedCopyOf(allEntries));
+ }
+ else
+ {
+ if (i == 1)
+ {
+ // we end up here if `since` is not in currentPeriod
or currentPeriod - 1
+ // so we should return a snapshot - we prefer
returning the most recent snapshot + entries in `current`
+ // but if that doesn't exist we check previous.
+ if (current.min == null && previous.min == null) // we
found no entries >= since in the tables -> since > current
+ return LogState.EMPTY;
+ ClusterMetadata snapshot =
snapshots().getSnapshot(Epoch.create(current.min.getEpoch() - 1));
+ if (snapshot != null)
+ return new LogState(snapshot,
ImmutableList.sortedCopyOf(current.entries));
+ }
+ ClusterMetadata snapshot =
snapshots().getSnapshot(Epoch.create(previous.min.getEpoch() - 1));
Review Comment:
There's a chance we can NPE here if we are missing both a snapshot and
entries from the log, which should come after `since`. This could happen in
the local table if we received a snapshot at `since + x`, causing us not to
have the individual entries directly following `since`, but then we lost the
snapshot. I've pushed a test and fix for this to
https://github.com/beobal/cassandra/commit/7d66c2997186a6dda7bc3ba95102207b988cfd4c
One question there is whether its better to return a `LogState` with a
non-contiguous list of entries, or an empty one.
##########
src/java/org/apache/cassandra/tcm/log/LogState.java:
##########
@@ -237,31 +171,122 @@ public Serializer(Version serializationVersion)
@Override
public void serialize(LogState t, DataOutputPlus out, int version)
throws IOException
{
- out.writeBoolean(t.baseState != null);
- if (t.baseState != null)
-
VerboseMetadataSerializer.serialize(ClusterMetadata.serializer, t.baseState,
out, serializationVersion);
- VerboseMetadataSerializer.serialize(Replication.serializer,
t.transformations, out, serializationVersion);
+ VerboseMetadataSerializer.serialize(metadataSerializer, t, out,
serializationVersion);
}
@Override
public LogState deserialize(DataInputPlus in, int version) throws
IOException
{
- boolean hasSnapshot = in.readBoolean();
- ClusterMetadata snapshot = null;
- if (hasSnapshot)
- snapshot =
VerboseMetadataSerializer.deserialize(ClusterMetadata.serializer, in);
- Replication replication =
VerboseMetadataSerializer.deserialize(Replication.serializer, in);
- return new LogState(snapshot, replication);
+ return VerboseMetadataSerializer.deserialize(metadataSerializer,
in);
}
@Override
public long serializedSize(LogState t, int version)
{
- long size = TypeSizes.BOOL_SIZE;
+ return
VerboseMetadataSerializer.serializedSize(metadataSerializer, t,
serializationVersion);
+ }
+ }
+
+ static final class MetadataSerializer implements
org.apache.cassandra.tcm.serialization.MetadataSerializer<LogState>
+ {
+ @Override
+ public void serialize(LogState t, DataOutputPlus out, Version version)
throws IOException
+ {
+ out.writeBoolean(t.baseState != null);
if (t.baseState != null)
- size +=
VerboseMetadataSerializer.serializedSize(ClusterMetadata.serializer,
t.baseState, serializationVersion);
- size +=
VerboseMetadataSerializer.serializedSize(Replication.serializer,
t.transformations, serializationVersion);
+ ClusterMetadata.serializer.serialize(t.baseState, out,
version);
+ out.writeInt(t.entries.size());
+ for (Entry entry : t.entries)
+ Entry.serializer.serialize(entry, out, version);
+ }
+
+ @Override
+ public LogState deserialize(DataInputPlus in, Version version) throws
IOException
+ {
+ ClusterMetadata baseState = null;
+ if (in.readBoolean())
+ baseState = ClusterMetadata.serializer.deserialize(in,
version);
+ int size = in.readInt();
+ ImmutableList.Builder<Entry> builder = ImmutableList.builder();
+ for(int i=0;i<size;i++)
+ builder.add(Entry.serializer.deserialize(in, version));
+ return new LogState(baseState, builder.build());
+ }
+
+ @Override
+ public long serializedSize(LogState t, Version version)
+ {
+ long size = TypeSizes.sizeof(t.baseState != null);
+ if (t.baseState != null)
+ size += ClusterMetadata.serializer.serializedSize(t.baseState,
version);
+ size += TypeSizes.INT_SIZE;
+ for (Entry entry : t.entries)
+ size += Entry.serializer.serializedSize(entry, version);
return size;
}
}
+
+ public static final class ReplicationHandler implements
IVerbHandler<LogState>
+ {
+ private static final Logger logger =
LoggerFactory.getLogger(ReplicationHandler.class);
+ private final LocalLog log;
+
+ public ReplicationHandler(LocalLog log)
+ {
+ this.log = log;
+ }
+
+ public void doVerb(Message<LogState> message) throws IOException
+ {
+ logger.info("Received logstate {} from {}", message.payload,
message.from());
Review Comment:
This might be a bit verbose now. Before this, the equivalent log message
included only the size & min/max epochs in the Replication. Maybe we could go
back to something like that + the epoch of any base state?
##########
src/java/org/apache/cassandra/schema/DistributedMetadataLogKeyspace.java:
##########
@@ -173,105 +171,54 @@ public static boolean tryCommit(Entry.Id entryId,
}
}
- @VisibleForTesting
- public static void truncateLogState()
- {
- QueryProcessor.execute(String.format("TRUNCATE %s.%s",
SchemaConstants.METADATA_KEYSPACE_NAME, TABLE_NAME), ConsistencyLevel.QUORUM);
- }
-
-
private static final LogReader localLogReader = new
DistributedTableLogReader(ConsistencyLevel.NODE_LOCAL);
private static final LogReader serialLogReader = new
DistributedTableLogReader(ConsistencyLevel.SERIAL);
public static LogState getLogState(Epoch since, boolean consistentFetch)
{
- return LogState.getLogState(since,
ClusterMetadataService.instance().snapshotManager(), consistentFetch ?
serialLogReader : localLogReader);
- }
-
- @VisibleForTesting
- public static LogState getLogState(Epoch since, LogReader logReader,
MetadataSnapshots snapshots)
- {
- Retry retry = new Retry.Jitter(TCMMetrics.instance.fetchLogRetries);
- while (!retry.reachedMax())
- {
- try
- {
- return LogState.getLogState(since, snapshots, logReader);
- }
- catch (Throwable t)
- {
- retry.maybeSleep();
- }
- }
-
- throw new IllegalStateException(String.format("Could not retrieve log
state after %s tries.", retry.currentTries()));
+ return (consistentFetch ? serialLogReader :
localLogReader).getLogState(ClusterMetadata.current().period, since);
}
public static class DistributedTableLogReader implements LogReader
{
private final ConsistencyLevel consistencyLevel;
+ private final Supplier<MetadataSnapshots> snapshots;
- public DistributedTableLogReader(ConsistencyLevel consistencyLevel)
+ public DistributedTableLogReader(ConsistencyLevel consistencyLevel,
Supplier<MetadataSnapshots> snapshots)
{
this.consistencyLevel = consistencyLevel;
+ this.snapshots = snapshots;
}
- @Override
- public Replication getReplication(long startPeriod, Epoch since)
+ public DistributedTableLogReader(ConsistencyLevel consistencyLevel)
{
- try
- {
- if (startPeriod == Period.EMPTY)
- {
- startPeriod = Period.scanLogForPeriod(Log, since);
- // There shouldn't be any entries in period 0, the
pre-init transform would bump it to period 1.
- if (startPeriod == Period.EMPTY)
- return Replication.EMPTY;
- }
-
- long currentEpoch = since.getEpoch();
- long lastEpoch = since.getEpoch();
-
- long period = startPeriod;
- ImmutableList.Builder<Entry> entries = new
ImmutableList.Builder<>();
-
- while (true)
- {
- boolean empty = true;
- UntypedResultSet resultSet = execute(String.format("SELECT
current_epoch, period, epoch, kind, transformation, entry_id, sealed FROM %s.%s
WHERE period = ? AND epoch > ?",
-
SchemaConstants.METADATA_KEYSPACE_NAME, TABLE_NAME),
- consistencyLevel,
period, since.getEpoch());
-
- for (UntypedResultSet.Row row : resultSet)
- {
- currentEpoch = row.getLong("current_epoch");
- long epochl = row.getLong("epoch");
- Epoch epoch = Epoch.create(epochl);
- Transformation.Kind kind =
Transformation.Kind.valueOf(row.getString("kind"));
- long entryId = row.getLong("entry_id");
- Transformation transform =
kind.fromVersionedBytes(row.getBlob("transformation"));
- entries.add(new Entry(new Entry.Id(entryId), epoch,
transform));
-
- lastEpoch = currentEpoch;
- empty = false;
- }
-
- if (period != startPeriod && empty)
- break;
-
- period++;
- }
+ this(consistencyLevel, () ->
ClusterMetadataService.instance().snapshotManager());
+ }
- assert currentEpoch == lastEpoch;
- return new Replication(entries.build());
- }
- catch (IOException t)
+ public EntryHolder getEntries(long period, Epoch since) throws
IOException
+ {
+ UntypedResultSet resultSet = execute(String.format("SELECT
current_epoch, period, epoch, kind, transformation, entry_id, sealed FROM %s.%s
WHERE period = ? AND epoch >= ?",
Review Comment:
`current_epoch` & `period` are selected but not used
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]