iamaleksey commented on code in PR #3506:
URL: https://github.com/apache/cassandra/pull/3506#discussion_r1756967584
##########
src/java/org/apache/cassandra/journal/Segment.java:
##########
@@ -83,18 +88,21 @@ boolean readFirst(K id, EntrySerializer.EntryHolder<K> into)
return true;
}
- void readAll(K id, EntrySerializer.EntryHolder<K> into, Runnable onEntry)
+ void readAll(K id, EntrySerializer.EntryHolder<K> into, RecordConsumer<K>
onEntry)
{
+ into.clear();
Review Comment:
This `clear()` call is not needed/doesn't belong here I'd say.
##########
src/java/org/apache/cassandra/journal/Journal.java:
##########
@@ -225,8 +224,13 @@ public void start()
allocator = executorFactory().infiniteLoop(name + "-allocator", new
AllocateRunnable(), SAFE, NON_DAEMON, SYNCHRONIZED);
advanceSegment(null);
flusher.start();
- //invalidator.start();
- //compactor.start();
+ compactor.start();
+ }
+
+ @VisibleForTesting
+ void forceCompaction()
Review Comment:
nit: rename to `runCompactorForTesting()` and put it down next to other
purely-for-testing method at the bottom of the class?
##########
src/java/org/apache/cassandra/journal/Journal.java:
##########
@@ -348,130 +324,30 @@ public V readFirst(K id)
public List<V> readAll(K id)
{
List<V> res = new ArrayList<>(2);
- readAll(id, (in, userVersion) ->
res.add(valueSerializer.deserialize(id, in, userVersion)));
- return res;
- }
-
- public void readAll(K id, Reader reader)
- {
- EntrySerializer.EntryHolder<K> holder = new
EntrySerializer.EntryHolder<>();
- try (ReferencedSegments<K, V> segments = selectAndReference(id))
- {
- for (Segment<K, V> segment : segments.all())
+ readAll(id, (segment, position, key, buffer, hosts, userVersion) -> {
+ try (DataInputBuffer in = new DataInputBuffer(buffer, false))
{
- segment.readAll(id, holder, () -> {
- try (DataInputBuffer in = new
DataInputBuffer(holder.value, false))
- {
- Invariants.checkState(Objects.equals(holder.key, id),
- "%s != %s", holder.key, id);
- reader.read(in, segment.descriptor.userVersion);
- holder.clear();
- }
- catch (IOException e)
- {
- // can only throw if serializer is buggy
- throw new RuntimeException(e);
- }
- });
+ res.add(valueSerializer.deserialize(id, in, userVersion));
}
- }
- }
-
- /**
- * Looks up a record by the provided id, if the value satisfies the
provided condition.
- * <p/>
- * Looking up an invalidated record may or may not return a record,
depending on
- * compaction progress.
- * <p/>
- * In case multiple copies of the record exist in the log (e.g. because of
user retries),
- * and more than one of them satisfy the provided condition, the first one
found will be returned.
- *
- * @param id user-provided record id, expected to roughly correlate with
time and go up
- * @param condition predicate to test the record against
- * @return deserialized record if found, null otherwise
- */
- public V readFirstMatching(K id, Predicate<V> condition)
- {
- EntrySerializer.EntryHolder<K> holder = new
EntrySerializer.EntryHolder<>();
-
- try (ReferencedSegments<K, V> segments = selectAndReference(id))
- {
- for (Segment<K, V> segment : segments.all())
+ catch (IOException e)
{
- long[] offsets = segment.index().lookUp(id);
- for (long offsetAndSize : offsets)
- {
- int offset = Index.readOffset(offsetAndSize);
- int size = Index.readSize(offsetAndSize);
- holder.clear();
- if (segment.read(offset, size, holder))
- {
- try (DataInputBuffer in = new
DataInputBuffer(holder.value, false))
- {
- V record = valueSerializer.deserialize(holder.key,
in, segment.descriptor.userVersion);
- if (condition.test(record))
- return record;
- }
- catch (IOException e)
- {
- // can only throw if serializer is buggy
- throw new RuntimeException(e);
- }
- }
- }
+ // can only throw if serializer is buggy
+ throw new RuntimeException(e);
}
- }
- return null;
+ });
+ return res;
}
- /**
- * Looks up a record by the provided id.
- * <p/>
- * Looking up an invalidated record may or may not return a record,
depending on
- * compaction progress.
- * <p/>
- * In case multiple copies of the record exist in the log (e.g. because of
user retries),
- * only the first found record will be consumed.
- *
- * @param id user-provided record id, expected to roughly correlate with
time and go up
- * @param consumer function to consume the raw record (bytes and
invalidation set) if found
- * @return true if the record was found, false otherwise
- */
- public boolean readFirst(K id, RecordConsumer<K> consumer)
+ public void readAll(K id, RecordConsumer<K> reader)
Review Comment:
Nit: `reader` -> `consumer`?
##########
src/java/org/apache/cassandra/journal/Segments.java:
##########
@@ -68,27 +71,34 @@ Segments<K, V> withCompletedSegment(ActiveSegment<K, V>
activeSegment, StaticSeg
return new Segments<>(newSegments);
}
- Segments<K, V> withCompactedSegment(StaticSegment<K, V> oldSegment,
StaticSegment<K, V> newSegment)
+ Segments<K, V> withCompactedSegments(Collection<StaticSegment<K, V>>
oldSegments, Collection<StaticSegment<K, V>> compactedSegments)
{
- Invariants.checkArgument(oldSegment.descriptor.timestamp ==
newSegment.descriptor.timestamp);
- Invariants.checkArgument(oldSegment.descriptor.generation <
newSegment.descriptor.generation);
Long2ObjectHashMap<Segment<K, V>> newSegments = new
Long2ObjectHashMap<>(segments);
- Segment<K, V> oldValue =
newSegments.put(newSegment.descriptor.timestamp, newSegment);
- Invariants.checkState(oldValue == oldSegment);
+ for (StaticSegment<K, V> oldSegment : oldSegments)
+ {
+ Segment<K, V> oldValue =
newSegments.remove(oldSegment.descriptor.timestamp);
+ Invariants.checkState(oldValue == oldSegment);
+ }
+
+ for (StaticSegment<K, V> compactedSegment : compactedSegments)
+ {
+ Segment<K, V> oldValue =
newSegments.put(compactedSegment.descriptor.timestamp, compactedSegment);
+ Invariants.checkState(oldValue == null);
+ }
+
return new Segments<>(newSegments);
}
- Segments<K, V> withoutInvalidatedSegment(StaticSegment<K, V> staticSegment)
+ boolean contains(long descriptor)
{
- Long2ObjectHashMap<Segment<K, V>> newSegments = new
Long2ObjectHashMap<>(segments);
- if (!newSegments.remove(staticSegment.descriptor.timestamp,
staticSegment))
- throw new IllegalStateException();
- return new Segments<>(newSegments);
+ return segments.containsKey(descriptor);
}
Iterable<Segment<K, V>> all()
{
- return segments.values();
+ List<Segment<K, V>> segments = new ArrayList<>(this.segments.values());
Review Comment:
We should probably not be allocating an entire `ArrayList` and sort it for
every call to `all()` with this signature. Doesn't sit well with me. But can be
addressed later - I've made an internal TODO for myself.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]