iamaleksey commented on code in PR #3506:
URL: https://github.com/apache/cassandra/pull/3506#discussion_r1756967584


##########
src/java/org/apache/cassandra/journal/Segment.java:
##########
@@ -83,18 +88,21 @@ boolean readFirst(K id, EntrySerializer.EntryHolder<K> into)
         return true;
     }
 
-    void readAll(K id, EntrySerializer.EntryHolder<K> into, Runnable onEntry)
+    void readAll(K id, EntrySerializer.EntryHolder<K> into, RecordConsumer<K> 
onEntry)
     {
+        into.clear();

Review Comment:
   This `clear()` call is not needed/doesn't belong here I'd say.



##########
src/java/org/apache/cassandra/journal/Journal.java:
##########
@@ -225,8 +224,13 @@ public void start()
         allocator = executorFactory().infiniteLoop(name + "-allocator", new 
AllocateRunnable(), SAFE, NON_DAEMON, SYNCHRONIZED);
         advanceSegment(null);
         flusher.start();
-        //invalidator.start();
-        //compactor.start();
+        compactor.start();
+    }
+
+    @VisibleForTesting
+    void forceCompaction()

Review Comment:
   nit: rename to `runCompactorForTesting()` and put it down next to other 
purely-for-testing method at the bottom of the class?



##########
src/java/org/apache/cassandra/journal/Journal.java:
##########
@@ -348,130 +324,30 @@ public V readFirst(K id)
     public List<V> readAll(K id)
     {
         List<V> res = new ArrayList<>(2);
-        readAll(id, (in, userVersion) -> 
res.add(valueSerializer.deserialize(id, in, userVersion)));
-        return res;
-    }
-
-    public void readAll(K id, Reader reader)
-    {
-        EntrySerializer.EntryHolder<K> holder = new 
EntrySerializer.EntryHolder<>();
-        try (ReferencedSegments<K, V> segments = selectAndReference(id))
-        {
-            for (Segment<K, V> segment : segments.all())
+        readAll(id, (segment, position, key, buffer, hosts, userVersion) -> {
+            try (DataInputBuffer in = new DataInputBuffer(buffer, false))
             {
-                segment.readAll(id, holder, () -> {
-                    try (DataInputBuffer in = new 
DataInputBuffer(holder.value, false))
-                    {
-                        Invariants.checkState(Objects.equals(holder.key, id),
-                                              "%s != %s", holder.key, id);
-                        reader.read(in, segment.descriptor.userVersion);
-                        holder.clear();
-                    }
-                    catch (IOException e)
-                    {
-                        // can only throw if serializer is buggy
-                        throw new RuntimeException(e);
-                    }
-                });
+                res.add(valueSerializer.deserialize(id, in, userVersion));
             }
-        }
-    }
-
-    /**
-     * Looks up a record by the provided id, if the value satisfies the 
provided condition.
-     * <p/>
-     * Looking up an invalidated record may or may not return a record, 
depending on
-     * compaction progress.
-     * <p/>
-     * In case multiple copies of the record exist in the log (e.g. because of 
user retries),
-     * and more than one of them satisfy the provided condition, the first one 
found will be returned.
-     *
-     * @param id user-provided record id, expected to roughly correlate with 
time and go up
-     * @param condition predicate to test the record against
-     * @return deserialized record if found, null otherwise
-     */
-    public V readFirstMatching(K id, Predicate<V> condition)
-    {
-        EntrySerializer.EntryHolder<K> holder = new 
EntrySerializer.EntryHolder<>();
-
-        try (ReferencedSegments<K, V> segments = selectAndReference(id))
-        {
-            for (Segment<K, V> segment : segments.all())
+            catch (IOException e)
             {
-                long[] offsets = segment.index().lookUp(id);
-                for (long offsetAndSize : offsets)
-                {
-                    int offset = Index.readOffset(offsetAndSize);
-                    int size = Index.readSize(offsetAndSize);
-                    holder.clear();
-                    if (segment.read(offset, size, holder))
-                    {
-                        try (DataInputBuffer in = new 
DataInputBuffer(holder.value, false))
-                        {
-                            V record = valueSerializer.deserialize(holder.key, 
in, segment.descriptor.userVersion);
-                            if (condition.test(record))
-                                return record;
-                        }
-                        catch (IOException e)
-                        {
-                            // can only throw if serializer is buggy
-                            throw new RuntimeException(e);
-                        }
-                    }
-                }
+                // can only throw if serializer is buggy
+                throw new RuntimeException(e);
             }
-        }
-        return null;
+        });
+        return res;
     }
 
-    /**
-     * Looks up a record by the provided id.
-     * <p/>
-     * Looking up an invalidated record may or may not return a record, 
depending on
-     * compaction progress.
-     * <p/>
-     * In case multiple copies of the record exist in the log (e.g. because of 
user retries),
-     * only the first found record will be consumed.
-     *
-     * @param id user-provided record id, expected to roughly correlate with 
time and go up
-     * @param consumer function to consume the raw record (bytes and 
invalidation set) if found
-     * @return true if the record was found, false otherwise
-     */
-    public boolean readFirst(K id, RecordConsumer<K> consumer)
+    public void readAll(K id, RecordConsumer<K> reader)

Review Comment:
   Nit: `reader` -> `consumer`?



##########
src/java/org/apache/cassandra/journal/Segments.java:
##########
@@ -68,27 +71,34 @@ Segments<K, V> withCompletedSegment(ActiveSegment<K, V> 
activeSegment, StaticSeg
         return new Segments<>(newSegments);
     }
 
-    Segments<K, V> withCompactedSegment(StaticSegment<K, V> oldSegment, 
StaticSegment<K, V> newSegment)
+    Segments<K, V> withCompactedSegments(Collection<StaticSegment<K, V>> 
oldSegments, Collection<StaticSegment<K, V>> compactedSegments)
     {
-        Invariants.checkArgument(oldSegment.descriptor.timestamp == 
newSegment.descriptor.timestamp);
-        Invariants.checkArgument(oldSegment.descriptor.generation < 
newSegment.descriptor.generation);
         Long2ObjectHashMap<Segment<K, V>> newSegments = new 
Long2ObjectHashMap<>(segments);
-        Segment<K, V> oldValue = 
newSegments.put(newSegment.descriptor.timestamp, newSegment);
-        Invariants.checkState(oldValue == oldSegment);
+        for (StaticSegment<K, V> oldSegment : oldSegments)
+        {
+            Segment<K, V> oldValue = 
newSegments.remove(oldSegment.descriptor.timestamp);
+            Invariants.checkState(oldValue == oldSegment);
+        }
+
+        for (StaticSegment<K, V> compactedSegment : compactedSegments)
+        {
+            Segment<K, V> oldValue = 
newSegments.put(compactedSegment.descriptor.timestamp, compactedSegment);
+            Invariants.checkState(oldValue == null);
+        }
+
         return new Segments<>(newSegments);
     }
 
-    Segments<K, V> withoutInvalidatedSegment(StaticSegment<K, V> staticSegment)
+    boolean contains(long descriptor)
     {
-        Long2ObjectHashMap<Segment<K, V>> newSegments = new 
Long2ObjectHashMap<>(segments);
-        if (!newSegments.remove(staticSegment.descriptor.timestamp, 
staticSegment))
-            throw new IllegalStateException();
-        return new Segments<>(newSegments);
+        return segments.containsKey(descriptor);
     }
 
     Iterable<Segment<K, V>> all()
     {
-        return segments.values();
+        List<Segment<K, V>> segments = new ArrayList<>(this.segments.values());

Review Comment:
   We should probably not be allocating an entire `ArrayList` and sort it for 
every call to `all()` with this signature. Doesn't sit well with me. But can be 
addressed later - I've made an internal TODO for myself.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to