iamaleksey commented on code in PR #4360:
URL: https://github.com/apache/cassandra/pull/4360#discussion_r2333446241
##########
src/java/org/apache/cassandra/replication/Shard.java:
##########
@@ -18,72 +18,93 @@
package org.apache.cassandra.replication;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
+import java.util.Set;
import java.util.function.BiConsumer;
-import java.util.function.IntSupplier;
+import java.util.function.LongSupplier;
import javax.annotation.Nonnull;
import com.google.common.base.Preconditions;
import org.agrona.collections.IntArrayList;
+import org.apache.cassandra.cql3.UntypedResultSet;
import org.apache.cassandra.db.Mutation;
import org.apache.cassandra.db.PartitionPosition;
+import org.apache.cassandra.db.SystemKeyspace;
+import org.apache.cassandra.db.marshal.Int32Type;
import org.apache.cassandra.dht.AbstractBounds;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.replication.CoordinatorLog.CoordinatorLogPrimary;
+import org.apache.cassandra.schema.SchemaConstants;
import org.apache.cassandra.tcm.ClusterMetadata;
-import org.apache.cassandra.tcm.Epoch;
import org.apache.cassandra.tcm.membership.NodeId;
import org.jctools.maps.NonBlockingHashMapLong;
+import static java.lang.String.format;
+import static org.apache.cassandra.cql3.QueryProcessor.executeInternal;
+
public class Shard
{
- private final String keyspace;
- private final Range<Token> tokenRange;
- private final int localHostId;
+ final int localNodeId;
+ final String keyspace;
+ final Range<Token> range;
+
private final Participants participants;
- private final Epoch sinceEpoch;
+ private final LongSupplier logIdProvider;
private final BiConsumer<Shard, CoordinatorLog> onNewLog;
private final NonBlockingHashMapLong<CoordinatorLog> logs;
- // TODO (expected): add support for log rotation
- private final CoordinatorLog.CoordinatorLogPrimary currentLocalLog;
-
- private final List<Subscriber> subscribers = new ArrayList<>();
-
- public interface Subscriber
- {
- default void onLogCreation(CoordinatorLog log) {}
- default void onSubscribe(CoordinatorLog currentLog) {}
- }
+ private volatile CoordinatorLogPrimary currentLocalLog;
- Shard(String keyspace,
- Range<Token> tokenRange,
- int localHostId,
+ Shard(int localNodeId,
+ String keyspace,
+ Range<Token> range,
Participants participants,
- Epoch sinceEpoch,
- IntSupplier logIdProvider,
+ List<CoordinatorLog> logs,
+ LongSupplier logIdProvider,
BiConsumer<Shard, CoordinatorLog> onNewLog)
{
- Preconditions.checkArgument(participants.contains(localHostId));
+ Preconditions.checkArgument(participants.contains(localNodeId));
+ this.localNodeId = localNodeId;
this.keyspace = keyspace;
- this.tokenRange = tokenRange;
- this.localHostId = localHostId;
+ this.range = range;
this.participants = participants;
- this.sinceEpoch = sinceEpoch;
+ this.logIdProvider = logIdProvider;
this.logs = new NonBlockingHashMapLong<>();
this.onNewLog = onNewLog;
- this.currentLocalLog = startNewLog(localHostId,
logIdProvider.getAsInt(), participants);
- CoordinatorLogId logId = currentLocalLog.logId;
- Preconditions.checkArgument(!logId.isNone());
- logs.put(logId.asLong(), currentLocalLog);
+ for (CoordinatorLog log : logs)
+ {
+ this.logs.put(log.logId.asLong(), log);
+ onNewLog.accept(Shard.this, log);
+ }
+ this.currentLocalLog = createNewPrimayLog();
+ }
+
+ Shard(int localNodeId, String keyspace, Range<Token> range, Participants
participants, LongSupplier logIdProvider, BiConsumer<Shard, CoordinatorLog>
onNewLog)
+ {
+ this(localNodeId, keyspace, range, participants,
Collections.emptyList(), logIdProvider, onNewLog);
}
MutationId nextId()
{
- return currentLocalLog.nextId();
+ MutationId nextId = currentLocalLog.nextId();
+ if (nextId != null)
+ return nextId;
+ return maybeRotateLocalLogAndGetNextId();
+ }
+
+ // if ids overflow, we need to rotate the local log
+ synchronized private MutationId maybeRotateLocalLogAndGetNextId()
+ {
+ MutationId nextId = currentLocalLog.nextId();
+ if (nextId != null) // another thread got to rotate before us
+ return nextId;
+ currentLocalLog = createNewPrimayLog();
Review Comment:
Added info logging on rotation. Agree with configurability and testing. The
blocking on new id allocation on rotate is arguably less of an issue given how
fast this would normally be still, and how very very infrequently it's going to
happen. But when and if we support rotation for arbitrary reasons we should
consider doing it to avoid the latency increase, yes.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]