ifesdjeen commented on code in PR #3743:
URL: https://github.com/apache/cassandra/pull/3743#discussion_r1910450616
##########
src/java/org/apache/cassandra/journal/Journal.java:
##########
@@ -906,8 +941,19 @@ private String maybeAddDiskSpaceContext(String message)
@VisibleForTesting
public void truncateForTesting()
{
- advanceSegment(null);
- segments.set(Segments.none());
+ ActiveSegment<?, ?> discarding = currentSegment;
+ if (discarding.index.size() > 0) // if there is no data in the
segement then ignore it
+ {
+ closeCurrentSegmentForTestingIfNonEmpty();
+ // wait for the ActiveSegment to get released, else can see weird
race conditions;
+ // this thread will see the static segmenet and will release it
(which will delete the file),
+ // and the sync thread will then try to release and will fail as
the file no longer exists...
+ while (discarding.selfRef().globalCount() > 0) {}
Review Comment:
I think we need to investigate the race conditions rather than try to patch
it here: if after waiting for `isSwitched` we still have lingering references,
we best check which ones are there, and let them be scheduled appropriately.
##########
src/java/org/apache/cassandra/service/accord/IJournal.java:
##########
@@ -37,4 +41,12 @@ default SavedCommand.MinimalCommand loadMinimal(int
commandStoreId, TxnId txnId,
}
Persister<DurableBefore, DurableBefore> durableBeforePersister();
+
+ RangeSearcher rangeSearcher();
+
+ interface RangeSearcher
Review Comment:
This probably will go during rebase, since we do not have a separate
interface in the integration anymore. You can just pull this to the top level
on rebase, or make it as a part of one of the searcher subclasses. Either way
works for me.
##########
src/java/org/apache/cassandra/service/accord/ParticipantsInMemoryIndex.java:
##########
@@ -0,0 +1,286 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service.accord;
+
+import java.nio.ByteBuffer;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.TreeSet;
+import java.util.function.Consumer;
+
+import accord.local.StoreParticipants;
+import accord.primitives.Routable;
+import accord.primitives.Route;
+import accord.primitives.Timestamp;
+import accord.primitives.TxnId;
+import accord.primitives.Unseekable;
+import accord.utils.Invariants;
+import org.agrona.collections.Int2ObjectHashMap;
+import org.agrona.collections.Long2ObjectHashMap;
+import org.apache.cassandra.index.accord.OrderedRouteSerializer;
+import org.apache.cassandra.index.accord.ParticipantsJournalIndex;
+import org.apache.cassandra.index.accord.RouteIndexFormat;
+import org.apache.cassandra.journal.Journal;
+import org.apache.cassandra.journal.RecordPointer;
+import org.apache.cassandra.journal.StaticSegment;
+import org.apache.cassandra.schema.TableId;
+import org.apache.cassandra.service.accord.api.AccordRoutingKey;
+import org.apache.cassandra.utils.ByteArrayUtil;
+import org.apache.cassandra.utils.FastByteOperations;
+import org.apache.cassandra.utils.RTree;
+import org.apache.cassandra.utils.RangeTree;
+
+public class ParticipantsInMemoryIndex<K extends JournalKey, V> implements
Journal.Listener<K, V>, IJournal.RangeSearcher
+{
+ private final Long2ObjectHashMap<ParticipantsInMemorySegmentIndex>
segmentIndexes = new Long2ObjectHashMap<>();
+
+ @Override
+ public void onWrite(K id, Journal.Writer writer, Set<Integer> hosts,
RecordPointer pointer)
+ {
+ if (!ParticipantsJournalIndex.allowed(id))
+ return;
+ SavedCommand.Writer saveCommandWriter = (SavedCommand.Writer) writer;
+ if (!saveCommandWriter.hasField(SavedCommand.Fields.PARTICIPANTS))
+ return;
+ StoreParticipants participants =
saveCommandWriter.after().participants();
+ Route<?> route = participants.route();
+ if (route != null)
+ update(pointer.segment, id.commandStoreId, id.id, route);
+ }
+
+ public synchronized void update(long segment, int commandStoreId, TxnId
id, Route<?> route)
+ {
+ if (!ParticipantsJournalIndex.allowed(id))
+ return;
+ Invariants.nonNull(route, "route");
+ segmentIndexes.computeIfAbsent(segment,
ParticipantsInMemorySegmentIndex::new).add(commandStoreId, id, route);
+ }
+
+ public void update(long segment, K id, ByteBuffer buffer, int userVersion)
+ {
+ if (!ParticipantsJournalIndex.allowed(id))
+ return;
+ var participants = RouteIndexFormat.extract(id.id, buffer,
userVersion).participants();
+ if (participants == null || participants.route() == null)
+ return;
+ update(segment, id.commandStoreId, id.id, participants.route());
+ }
+
+ @Override
+ public synchronized void onCompact(Collection<StaticSegment<K, V>>
oldSegments, Collection<StaticSegment<K, V>> compactedSegments)
+ {
+ oldSegments.forEach(s -> segmentIndexes.remove(s.id()));
+ }
+
+ public NavigableMap<IndexRange, Set<TxnId>> search(int storeId,
AccordRoutingKey key)
+ {
+ return search(storeId, key.table(),
OrderedRouteSerializer.serializeRoutingKeyNoTable(key));
+ }
+
+ private synchronized NavigableMap<IndexRange, Set<TxnId>> search(int
storeId, TableId tableId, byte[] key)
+ {
+ TreeMap<IndexRange, Set<TxnId>> matches = new TreeMap<>();
+ segmentIndexes.values().forEach(s -> s.search(storeId, tableId, key, e
-> matches.computeIfAbsent(e.getKey(), i -> new
HashSet<>()).add(e.getValue())));
+ return matches.isEmpty() ? Collections.emptyNavigableMap() : matches;
+ }
+
+ public NavigableMap<IndexRange, Set<TxnId>> search(int storeId,
AccordRoutingKey start, AccordRoutingKey end)
+ {
+ return search(storeId, start.table(),
OrderedRouteSerializer.serializeRoutingKeyNoTable(start),
OrderedRouteSerializer.serializeRoutingKeyNoTable(end));
+ }
+
+ private synchronized NavigableMap<IndexRange, Set<TxnId>> search(int
storeId, TableId tableId, byte[] start, byte[] end)
+ {
+ TreeMap<IndexRange, Set<TxnId>> matches = new TreeMap<>();
+ segmentIndexes.values().forEach(s -> s.search(storeId, tableId, start,
end, e -> matches.computeIfAbsent(e.getKey(), i -> new
HashSet<>()).add(e.getValue())));
+ return matches.isEmpty() ? Collections.emptyNavigableMap() : matches;
+ }
+
+ public synchronized void truncateForTesting()
+ {
+ segmentIndexes.clear();
+ }
+
+ @Override
+ public void intersects(int commandStoreId, TokenRange range, TxnId
minTxnId, Timestamp maxTxnId, Consumer<TxnId> forEach)
+ {
+ var result = search(commandStoreId, range.start(), range.end());
+ TreeSet<TxnId> matches = new TreeSet<>();
+ result.values().forEach(s -> matches.addAll(s));
+ consume(matches.iterator(), minTxnId, maxTxnId, forEach);
+ }
+
+ @Override
+ public void intersects(int commandStoreId, AccordRoutingKey key, TxnId
minTxnId, Timestamp maxTxnId, Consumer<TxnId> forEach)
+ {
+ var result = search(commandStoreId, key);
+ TreeSet<TxnId> matches = new TreeSet<>();
+ result.values().forEach(s -> matches.addAll(s));
+ consume(matches.iterator(), minTxnId, maxTxnId, forEach);
+ }
+
+ private void consume(Iterator<TxnId> it, TxnId minTxnId, Timestamp
maxTxnId, Consumer<TxnId> forEach)
+ {
+ while (it.hasNext())
+ {
+ TxnId next = it.next();
+ if (next.compareTo(minTxnId) >= 0 && next.compareTo(maxTxnId) < 0)
+ forEach.accept(next);
+ }
+ }
+
+ private static class ParticipantsInMemorySegmentIndex
+ {
+ private final Int2ObjectHashMap<ParticipantsInMemoryStoreIndex>
storeIndexes = new Int2ObjectHashMap<>();
+
+ private ParticipantsInMemorySegmentIndex(long segment)
+ {
+ }
+
+ public void add(int commandStoreId, TxnId id, Route<?> route)
+ {
+ storeIndexes.computeIfAbsent(commandStoreId,
ParticipantsInMemoryStoreIndex::new).add(id, route);
+ }
+
+ public void search(int storeId, TableId tableId, byte[] start, byte[]
end, Consumer<Map.Entry<IndexRange, TxnId>> fn)
+ {
+ ParticipantsInMemoryStoreIndex idx = storeIndexes.get(storeId);
+ if (idx == null) return;
+ idx.search(tableId, start, end, fn);
+ }
+
+ public void search(int storeId, TableId tableId, byte[] key,
Consumer<Map.Entry<IndexRange, TxnId>> fn)
+ {
+ ParticipantsInMemoryStoreIndex idx = storeIndexes.get(storeId);
+ if (idx == null) return;
+ idx.search(tableId, key, fn);
+ }
+ }
+
+ private static class ParticipantsInMemoryStoreIndex
Review Comment:
Since we are already scoped in `ParticipantsInMemoryIndex`, might be best to
just call this `StoreIndex`
##########
src/java/org/apache/cassandra/service/accord/ParticipantsInMemoryIndex.java:
##########
@@ -0,0 +1,286 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service.accord;
+
+import java.nio.ByteBuffer;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.TreeSet;
+import java.util.function.Consumer;
+
+import accord.local.StoreParticipants;
+import accord.primitives.Routable;
+import accord.primitives.Route;
+import accord.primitives.Timestamp;
+import accord.primitives.TxnId;
+import accord.primitives.Unseekable;
+import accord.utils.Invariants;
+import org.agrona.collections.Int2ObjectHashMap;
+import org.agrona.collections.Long2ObjectHashMap;
+import org.apache.cassandra.index.accord.OrderedRouteSerializer;
+import org.apache.cassandra.index.accord.ParticipantsJournalIndex;
+import org.apache.cassandra.index.accord.RouteIndexFormat;
+import org.apache.cassandra.journal.Journal;
+import org.apache.cassandra.journal.RecordPointer;
+import org.apache.cassandra.journal.StaticSegment;
+import org.apache.cassandra.schema.TableId;
+import org.apache.cassandra.service.accord.api.AccordRoutingKey;
+import org.apache.cassandra.utils.ByteArrayUtil;
+import org.apache.cassandra.utils.FastByteOperations;
+import org.apache.cassandra.utils.RTree;
+import org.apache.cassandra.utils.RangeTree;
+
+public class ParticipantsInMemoryIndex<K extends JournalKey, V> implements
Journal.Listener<K, V>, IJournal.RangeSearcher
+{
+ private final Long2ObjectHashMap<ParticipantsInMemorySegmentIndex>
segmentIndexes = new Long2ObjectHashMap<>();
+
+ @Override
+ public void onWrite(K id, Journal.Writer writer, Set<Integer> hosts,
RecordPointer pointer)
+ {
+ if (!ParticipantsJournalIndex.allowed(id))
+ return;
+ SavedCommand.Writer saveCommandWriter = (SavedCommand.Writer) writer;
+ if (!saveCommandWriter.hasField(SavedCommand.Fields.PARTICIPANTS))
+ return;
+ StoreParticipants participants =
saveCommandWriter.after().participants();
+ Route<?> route = participants.route();
+ if (route != null)
+ update(pointer.segment, id.commandStoreId, id.id, route);
+ }
+
+ public synchronized void update(long segment, int commandStoreId, TxnId
id, Route<?> route)
+ {
+ if (!ParticipantsJournalIndex.allowed(id))
+ return;
+ Invariants.nonNull(route, "route");
+ segmentIndexes.computeIfAbsent(segment,
ParticipantsInMemorySegmentIndex::new).add(commandStoreId, id, route);
+ }
+
+ public void update(long segment, K id, ByteBuffer buffer, int userVersion)
+ {
+ if (!ParticipantsJournalIndex.allowed(id))
+ return;
+ var participants = RouteIndexFormat.extract(id.id, buffer,
userVersion).participants();
+ if (participants == null || participants.route() == null)
+ return;
+ update(segment, id.commandStoreId, id.id, participants.route());
+ }
+
+ @Override
+ public synchronized void onCompact(Collection<StaticSegment<K, V>>
oldSegments, Collection<StaticSegment<K, V>> compactedSegments)
+ {
+ oldSegments.forEach(s -> segmentIndexes.remove(s.id()));
+ }
+
+ public NavigableMap<IndexRange, Set<TxnId>> search(int storeId,
AccordRoutingKey key)
+ {
+ return search(storeId, key.table(),
OrderedRouteSerializer.serializeRoutingKeyNoTable(key));
+ }
+
+ private synchronized NavigableMap<IndexRange, Set<TxnId>> search(int
storeId, TableId tableId, byte[] key)
+ {
+ TreeMap<IndexRange, Set<TxnId>> matches = new TreeMap<>();
+ segmentIndexes.values().forEach(s -> s.search(storeId, tableId, key, e
-> matches.computeIfAbsent(e.getKey(), i -> new
HashSet<>()).add(e.getValue())));
+ return matches.isEmpty() ? Collections.emptyNavigableMap() : matches;
+ }
+
+ public NavigableMap<IndexRange, Set<TxnId>> search(int storeId,
AccordRoutingKey start, AccordRoutingKey end)
+ {
+ return search(storeId, start.table(),
OrderedRouteSerializer.serializeRoutingKeyNoTable(start),
OrderedRouteSerializer.serializeRoutingKeyNoTable(end));
+ }
+
+ private synchronized NavigableMap<IndexRange, Set<TxnId>> search(int
storeId, TableId tableId, byte[] start, byte[] end)
+ {
+ TreeMap<IndexRange, Set<TxnId>> matches = new TreeMap<>();
+ segmentIndexes.values().forEach(s -> s.search(storeId, tableId, start,
end, e -> matches.computeIfAbsent(e.getKey(), i -> new
HashSet<>()).add(e.getValue())));
+ return matches.isEmpty() ? Collections.emptyNavigableMap() : matches;
+ }
+
+ public synchronized void truncateForTesting()
+ {
+ segmentIndexes.clear();
+ }
+
+ @Override
+ public void intersects(int commandStoreId, TokenRange range, TxnId
minTxnId, Timestamp maxTxnId, Consumer<TxnId> forEach)
+ {
+ var result = search(commandStoreId, range.start(), range.end());
+ TreeSet<TxnId> matches = new TreeSet<>();
+ result.values().forEach(s -> matches.addAll(s));
+ consume(matches.iterator(), minTxnId, maxTxnId, forEach);
+ }
+
+ @Override
+ public void intersects(int commandStoreId, AccordRoutingKey key, TxnId
minTxnId, Timestamp maxTxnId, Consumer<TxnId> forEach)
+ {
+ var result = search(commandStoreId, key);
+ TreeSet<TxnId> matches = new TreeSet<>();
+ result.values().forEach(s -> matches.addAll(s));
+ consume(matches.iterator(), minTxnId, maxTxnId, forEach);
+ }
+
+ private void consume(Iterator<TxnId> it, TxnId minTxnId, Timestamp
maxTxnId, Consumer<TxnId> forEach)
+ {
+ while (it.hasNext())
+ {
+ TxnId next = it.next();
+ if (next.compareTo(minTxnId) >= 0 && next.compareTo(maxTxnId) < 0)
+ forEach.accept(next);
+ }
+ }
+
+ private static class ParticipantsInMemorySegmentIndex
+ {
+ private final Int2ObjectHashMap<ParticipantsInMemoryStoreIndex>
storeIndexes = new Int2ObjectHashMap<>();
+
+ private ParticipantsInMemorySegmentIndex(long segment)
+ {
+ }
+
+ public void add(int commandStoreId, TxnId id, Route<?> route)
+ {
+ storeIndexes.computeIfAbsent(commandStoreId,
ParticipantsInMemoryStoreIndex::new).add(id, route);
+ }
+
+ public void search(int storeId, TableId tableId, byte[] start, byte[]
end, Consumer<Map.Entry<IndexRange, TxnId>> fn)
+ {
+ ParticipantsInMemoryStoreIndex idx = storeIndexes.get(storeId);
+ if (idx == null) return;
+ idx.search(tableId, start, end, fn);
+ }
+
+ public void search(int storeId, TableId tableId, byte[] key,
Consumer<Map.Entry<IndexRange, TxnId>> fn)
+ {
+ ParticipantsInMemoryStoreIndex idx = storeIndexes.get(storeId);
+ if (idx == null) return;
+ idx.search(tableId, key, fn);
+ }
+ }
+
+ private static class ParticipantsInMemoryStoreIndex
+ {
+ private final Map<TableId, ParticipantsInMemoryTableIndex> tableIndex
= new HashMap<>();
+
+ private ParticipantsInMemoryStoreIndex(int commandStoreId)
+ {
+ }
+
+ public void add(TxnId id, Route<?> route)
+ {
+ for (var keyOrRange : route)
+ add(id, keyOrRange);
+ }
+
+ private void add(TxnId id, Unseekable keyOrRange)
+ {
+ if (keyOrRange.domain() != Routable.Domain.Range)
+ throw new IllegalArgumentException("Unexpected domain: " +
keyOrRange.domain());
+ TokenRange ts = (TokenRange) keyOrRange;
+ TableId tableId = ts.table();
+ tableIndex.computeIfAbsent(tableId,
ParticipantsInMemoryTableIndex::new).add(id, ts);
+ }
+
+ public void search(TableId tableId, byte[] start, byte[] end,
Consumer<Map.Entry<IndexRange, TxnId>> fn)
+ {
+ var index = tableIndex.get(tableId);
+ if (index == null) return;
+ index.search(start, end, fn);
+ }
+
+ public void search(TableId tableId, byte[] key,
Consumer<Map.Entry<IndexRange, TxnId>> fn)
+ {
+ var index = tableIndex.get(tableId);
+ if (index == null) return;
+ index.search(key, fn);
+ }
+ }
+
+ private static class ParticipantsInMemoryTableIndex
Review Comment:
Same as above, just `TableIndex`
##########
test/unit/org/apache/cassandra/index/accord/RouteIndexTest.java:
##########
@@ -216,11 +207,12 @@ public void test()
{
cfs().disableAutoCompaction(); // let the test control compaction
//TODO (coverage): include with the ability to mark ranges as durable
for compaction cleanup
- AccordService.unsafeSetNoop(); // disable accord service since
compaction touches it. It would be nice to include this for cleanup support....
- stateful().withExamples(50).check(commands(() -> State::new, i ->
cfs())
- .destroySut(sut ->
sut.truncateBlocking())
- .add(FLUSH)
+ stateful().withExamples(10).withSteps(500).check(commands(() ->
State::new, Sut::new)
+ .destroyState(State::close)
+ .destroySut(Sut::close)
+ .addIf(State::mayFlush, FLUSH)
Review Comment:
May I suggest a slight change in semantics for testing a bit?
```
diff --git a/test/unit/org/apache/cassandra/index/accord/RouteIndexTest.java
b/test/unit/org/apache/cassandra/index/accord/RouteIndexTest.java
index 6807269c95..a4fc38a708 100644
--- a/test/unit/org/apache/cassandra/index/accord/RouteIndexTest.java
+++ b/test/unit/org/apache/cassandra/index/accord/RouteIndexTest.java
@@ -160,7 +160,6 @@ public class RouteIndexTest extends CQLTester.InMemory
private Command<State, Sut, ?> insert(RandomSource rs, State state)
{
-// int storeId = rs.nextInt(0, state.numStores);
Domain domain = state.domainGen.next(rs);
TxnId txnId = state.nextTxnId(domain);
Route<?> route = createRoute(state, rs, domain, rs.nextInt(1, 20));
@@ -210,8 +209,9 @@ public class RouteIndexTest extends CQLTester.InMemory
stateful().withExamples(10).withSteps(500).check(commands(() ->
State::new, Sut::new)
.destroyState(State::close)
.destroySut(Sut::close)
- .addIf(State::mayFlush, FLUSH)
- .add(COMPACT)
+ .add(CLOSE)
+ .add(FLUSH)
+ .add(PURGE)
.add(RESTART)
.add(this::insert)
.add(RouteIndexTest::rangeSearch)
@@ -378,7 +378,7 @@ public class RouteIndexTest extends CQLTester.InMemory
}
}
- private static final CassandraCommand FLUSH = new
CassandraCommand("Flush")
+ private static final CassandraCommand CLOSE = new
CassandraCommand("Close")
{
@Override
public void runUnit(Sut sut)
@@ -387,24 +387,24 @@ public class RouteIndexTest extends CQLTester.InMemory
}
};
- private static final CassandraCommand COMPACT = new
CassandraCommand("Compact")
+ private static final CassandraCommand FLUSH = new
CassandraCommand("Flush")
{
@Override
public void runUnit(Sut sut)
{
sut.journal.get().runCompactorForTesting();
- try
- {
- sut.cfs.enableAutoCompaction();
-
FBUtilities.waitOnFutures(CompactionManager.instance.submitBackground(sut.cfs));
- }
- finally
- {
- sut.cfs.disableAutoCompaction();
- }
}
};
+ private static final CassandraCommand PURGE = new
CassandraCommand("Purge")
+ {
+ @Override
+ public void runUnit(Sut sut)
+ {
+ sut.journal.get().purge(sut.stores.get());
+ }
+ };
+
private static final UnitCommand<State, Sut> RESTART = new
UnitCommand<State, Sut>()
{
@Override
@@ -544,10 +544,12 @@ public class RouteIndexTest extends CQLTester.InMemory
{
private final ColumnFamilyStore cfs;
private final Supplier<AccordJournal> journal;
+ private final Supplier<CommandStores> stores;
public Sut(State state)
{
cfs = cfs();
+ this.stores = () -> state.accordService.node().commandStores();
this.journal = () -> state.accordService.journal();
```
In short, the idea here is that Journal's flow is a bit different: we first
_flush_ multiple segments into one SSTable, then we _compact_ multiple SSTables
together (which would be great to have as a separate event), and then we
_purge_ SSTables (i.e. compact them and clean up / throw away keys that state
machine tells us we can throw away).
##########
src/java/org/apache/cassandra/service/accord/ParticipantsInMemoryIndex.java:
##########
@@ -0,0 +1,286 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service.accord;
+
+import java.nio.ByteBuffer;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.TreeSet;
+import java.util.function.Consumer;
+
+import accord.local.StoreParticipants;
+import accord.primitives.Routable;
+import accord.primitives.Route;
+import accord.primitives.Timestamp;
+import accord.primitives.TxnId;
+import accord.primitives.Unseekable;
+import accord.utils.Invariants;
+import org.agrona.collections.Int2ObjectHashMap;
+import org.agrona.collections.Long2ObjectHashMap;
+import org.apache.cassandra.index.accord.OrderedRouteSerializer;
+import org.apache.cassandra.index.accord.ParticipantsJournalIndex;
+import org.apache.cassandra.index.accord.RouteIndexFormat;
+import org.apache.cassandra.journal.Journal;
+import org.apache.cassandra.journal.RecordPointer;
+import org.apache.cassandra.journal.StaticSegment;
+import org.apache.cassandra.schema.TableId;
+import org.apache.cassandra.service.accord.api.AccordRoutingKey;
+import org.apache.cassandra.utils.ByteArrayUtil;
+import org.apache.cassandra.utils.FastByteOperations;
+import org.apache.cassandra.utils.RTree;
+import org.apache.cassandra.utils.RangeTree;
+
+public class ParticipantsInMemoryIndex<K extends JournalKey, V> implements
Journal.Listener<K, V>, IJournal.RangeSearcher
+{
+ private final Long2ObjectHashMap<ParticipantsInMemorySegmentIndex>
segmentIndexes = new Long2ObjectHashMap<>();
+
+ @Override
+ public void onWrite(K id, Journal.Writer writer, Set<Integer> hosts,
RecordPointer pointer)
+ {
+ if (!ParticipantsJournalIndex.allowed(id))
+ return;
+ SavedCommand.Writer saveCommandWriter = (SavedCommand.Writer) writer;
+ if (!saveCommandWriter.hasField(SavedCommand.Fields.PARTICIPANTS))
+ return;
+ StoreParticipants participants =
saveCommandWriter.after().participants();
+ Route<?> route = participants.route();
+ if (route != null)
+ update(pointer.segment, id.commandStoreId, id.id, route);
+ }
+
+ public synchronized void update(long segment, int commandStoreId, TxnId
id, Route<?> route)
+ {
+ if (!ParticipantsJournalIndex.allowed(id))
+ return;
+ Invariants.nonNull(route, "route");
+ segmentIndexes.computeIfAbsent(segment,
ParticipantsInMemorySegmentIndex::new).add(commandStoreId, id, route);
+ }
+
+ public void update(long segment, K id, ByteBuffer buffer, int userVersion)
+ {
+ if (!ParticipantsJournalIndex.allowed(id))
+ return;
+ var participants = RouteIndexFormat.extract(id.id, buffer,
userVersion).participants();
Review Comment:
nit: var
##########
src/java/org/apache/cassandra/service/accord/ParticipantsInMemoryIndex.java:
##########
Review Comment:
There are a few `var`s in this file; would you mind to ask Idea to replace
them with full types?
##########
src/java/org/apache/cassandra/index/accord/ParticipantsJournalIndex.java:
##########
Review Comment:
There is a TODO saying `flesh this stuff out...` in the middle of this
class. Does this still apply? It is unlikely we will return to this class, or
at least this is not on the roadmap; maybe best to check this out now or at
least describe what fleshing out means?
##########
src/java/org/apache/cassandra/journal/Journal.java:
##########
@@ -124,6 +126,36 @@ public class Journal<K, V> implements Shutdownable
final OpOrder readOrder = new OpOrder();
+ public interface Listener<K, V>
+ {
+ void onWrite(K id, Writer writer, Set<Integer> hosts, RecordPointer
pointer);
Review Comment:
We already have a `flush` notification, which is probably the only thing we
need to have in a Journal API. Moreover, this functionality probably belongs
only to `AccordJournal`, and not to Journal. One of the options would be to
update in-memory index on flush (in `saveCommand`).
As regards compaction, I think the best integration point is
`AccordSegmentCompactor`, since we have both old and new sets there, and it is
accord-only, so we are not modifying the Journal API.
##########
src/java/org/apache/cassandra/journal/Journal.java:
##########
@@ -906,8 +941,19 @@ private String maybeAddDiskSpaceContext(String message)
@VisibleForTesting
public void truncateForTesting()
{
- advanceSegment(null);
- segments.set(Segments.none());
+ ActiveSegment<?, ?> discarding = currentSegment;
+ if (discarding.index.size() > 0) // if there is no data in the
segement then ignore it
Review Comment:
Is there any reason not to use `discarding.isEmpty()`?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]