iamaleksey commented on code in PR #4173:
URL: https://github.com/apache/cassandra/pull/4173#discussion_r2200361607


##########
src/java/org/apache/cassandra/service/reads/tracked/TrackedDataResponse.java:
##########
@@ -33,18 +34,64 @@
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import com.google.common.base.Preconditions;
 
 public class TrackedDataResponse
 {
     private final int serializationVersion;
-    private final ByteBuffer data;
+    private final List<ByteBuffer> data;
 
     public TrackedDataResponse(int serializationVersion, ByteBuffer data)
     {
+        this(serializationVersion, Collections.singletonList(data));
+    }
+
+    private TrackedDataResponse(int serializationVersion, List<ByteBuffer> 
data)
+    {
+        Preconditions.checkArgument(!data.isEmpty());
         this.serializationVersion = serializationVersion;
         this.data = data;
     }
 
+    public TrackedDataResponse merge(TrackedDataResponse that)
+    {
+        Preconditions.checkArgument(serializationVersion == 
that.serializationVersion);
+        List<ByteBuffer> newData = new ArrayList<>(data.size() + 
that.data.size());
+        newData.addAll(data);
+        newData.addAll(that.data);
+        return new TrackedDataResponse(serializationVersion, newData);
+    }
+
+    public static TrackedDataResponse merge(TrackedDataResponse l, 
TrackedDataResponse r)
+    {
+        Preconditions.checkArgument(l.serializationVersion == 
r.serializationVersion);
+        List<ByteBuffer> newData = new ArrayList<>(l.data.size() + 
r.data.size());
+        newData.addAll(l.data);
+        newData.addAll(r.data);
+        return new TrackedDataResponse(l.serializationVersion, newData);
+    }
+
+    public static TrackedDataResponse merge(List<TrackedDataResponse> 
responses)
+    {
+        Preconditions.checkArgument(!responses.isEmpty());
+        int serializationVersion = responses.get(0).serializationVersion;

Review Comment:
   Should probably validate that the versions match across all merged 
responses, if we do that for the 2-response version?



##########
src/java/org/apache/cassandra/service/reads/tracked/FilteredFollowupRead.java:
##########
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service.reads.tracked;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.SortedMap;
+import java.util.concurrent.atomic.AtomicReference;
+
+import com.google.common.collect.Iterators;
+import com.google.common.collect.PeekingIterator;
+
+import org.apache.cassandra.db.ConsistencyLevel;
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.PartitionPosition;
+import org.apache.cassandra.db.PartitionRangeReadCommand;
+import org.apache.cassandra.db.SinglePartitionReadCommand;
+import org.apache.cassandra.db.filter.DataLimits;
+import org.apache.cassandra.db.partitions.PartitionIterator;
+import org.apache.cassandra.dht.AbstractBounds;
+import 
org.apache.cassandra.service.reads.tracked.PartialTrackedRangeRead.FollowUpReadInfo;
+import org.apache.cassandra.tcm.ClusterMetadata;
+import org.apache.cassandra.utils.concurrent.AsyncPromise;
+import org.apache.cassandra.utils.concurrent.Future;
+import org.apache.cassandra.utils.concurrent.FutureCombiner;
+
+import static 
org.apache.cassandra.service.reads.tracked.ExtendingCompletedRead.followUpReadRequired;
+import static 
org.apache.cassandra.service.reads.tracked.ExtendingCompletedRead.toQuery;
+import static 
org.apache.cassandra.service.reads.tracked.PartialTrackedRangeRead.makeFollowUpRead;
+
+class FilteredFollowupRead extends AsyncPromise<TrackedDataResponse>
+{
+    private final TrackedDataResponse initialResponse;
+    private final int toQuery;
+    private final ConsistencyLevel consistencyLevel;
+    private final long expiresAtNanos;
+    private final SortedMap<DecoratedKey, FollowUpReadInfo> followUpReadInfo;
+    private final PartitionRangeReadCommand command;
+    private final AbstractBounds<PartitionPosition> followUpBounds;
+    private final DecoratedKey finalKey;
+
+    public FilteredFollowupRead(TrackedDataResponse initialResponse,
+                                int toQuery,
+                                ConsistencyLevel consistencyLevel,
+                                long expiresAtNanos,
+                                SortedMap<DecoratedKey, FollowUpReadInfo> 
followUpReadInfo,
+                                PartitionRangeReadCommand command,
+                                AbstractBounds<PartitionPosition> 
followUpBounds,
+                                DecoratedKey finalKey)
+    {
+        this.initialResponse = initialResponse;
+        this.toQuery = toQuery;
+        this.consistencyLevel = consistencyLevel;
+        this.expiresAtNanos = expiresAtNanos;
+        this.followUpReadInfo = followUpReadInfo;
+        this.command = command;
+        this.followUpBounds = followUpBounds;
+        this.finalKey = finalKey;
+    }
+
+    private boolean interleavesWithOriginal(DecoratedKey key)
+    {
+        if (finalKey == null)
+            return false;
+        return key.compareTo(finalKey) < 0;
+    }
+
+    public void start()
+    {
+        ClusterMetadata metadata = ClusterMetadata.current();
+        List<Future<TrackedDataResponse>> futures = new ArrayList<>();
+
+        int remaining = toQuery;
+        PeekingIterator<DecoratedKey> followUpKeys = 
Iterators.peekingIterator(followUpReadInfo.keySet().iterator());
+        // query all keys that interleave with the range of keys from the 
original range read
+        while (followUpKeys.hasNext() && (remaining > 0 || 
interleavesWithOriginal(followUpKeys.peek())))
+        {
+            DecoratedKey key = followUpKeys.next();
+            FollowUpReadInfo info = followUpReadInfo.get(key);
+            remaining -= info.potentialMatches;
+            SinglePartitionReadCommand cmd = 
SinglePartitionReadCommand.fromRangeRead(key, command, 
command.limits().forShortReadRetry(toQuery));
+            TrackedRead.Partition read = TrackedRead.create(metadata, cmd, 
consistencyLevel);
+            read.start(expiresAtNanos);
+            futures.add(read.future());
+        }
+
+        int singleKeyReads = futures.size();

Review Comment:
   Nit: unused.



##########
src/java/org/apache/cassandra/service/reads/tracked/TrackedDataResponse.java:
##########
@@ -33,18 +34,64 @@
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import com.google.common.base.Preconditions;
 
 public class TrackedDataResponse
 {
     private final int serializationVersion;
-    private final ByteBuffer data;
+    private final List<ByteBuffer> data;
 
     public TrackedDataResponse(int serializationVersion, ByteBuffer data)
     {
+        this(serializationVersion, Collections.singletonList(data));
+    }
+
+    private TrackedDataResponse(int serializationVersion, List<ByteBuffer> 
data)
+    {
+        Preconditions.checkArgument(!data.isEmpty());
         this.serializationVersion = serializationVersion;
         this.data = data;
     }
 
+    public TrackedDataResponse merge(TrackedDataResponse that)
+    {
+        Preconditions.checkArgument(serializationVersion == 
that.serializationVersion);
+        List<ByteBuffer> newData = new ArrayList<>(data.size() + 
that.data.size());
+        newData.addAll(data);
+        newData.addAll(that.data);
+        return new TrackedDataResponse(serializationVersion, newData);

Review Comment:
   This can be replaced by a call to the static `merge()`?



##########
src/java/org/apache/cassandra/db/filter/RowFilter.java:
##########
@@ -200,73 +202,119 @@ public boolean 
hasExpressionOnClusteringOrRegularColumns()
         return false;
     }
 
-    /**
-     * Note that the application of this transformation does not yet take 
{@link #isStrict()} into account. This means
-     * that even when strict filtering is not safe, expressions will be 
applied as intersections rather than unions.
-     * The filter will always be evaluated strictly in conjunction with 
replica filtering protection at the 
-     * coordinator, however, even after CASSANDRA-19007 is addressed.
-     * 
-     * @see <a 
href="https://issues.apache.org/jira/browse/CASSANDRA-190007";>CASSANDRA-19007</a>
-     */
-    protected Transformation<BaseRowIterator<?>> filter(TableMetadata 
metadata, long nowInSec)
+    public static class RowFilterTransformation extends 
Transformation<BaseRowIterator<?>>
     {
-        List<Expression> partitionLevelExpressions = new ArrayList<>();
-        List<Expression> rowLevelExpressions = new ArrayList<>();
-        for (Expression e: expressions)
+        private final TableMetadata metadata;
+        private final long nowInSec;
+        private final List<Expression> partitionLevelExpressions = new 
ArrayList<>();
+        private final List<Expression> rowLevelExpressions = new ArrayList<>();
+        private final long numberOfRegularColumnExpressions;

Review Comment:
   Nit: this can be a local variable (or even just an expression)?



##########
src/java/org/apache/cassandra/service/reads/tracked/TrackedDataResponse.java:
##########
@@ -33,18 +34,64 @@
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import com.google.common.base.Preconditions;
 
 public class TrackedDataResponse
 {
     private final int serializationVersion;
-    private final ByteBuffer data;
+    private final List<ByteBuffer> data;

Review Comment:
   Could trivially just use an array here, seeing that we always pre-size the 
list and only ever merge and append, but it's fine either way.



##########
src/java/org/apache/cassandra/service/reads/tracked/TrackedRead.java:
##########
@@ -171,6 +172,30 @@ public String toString()
 
     protected abstract Verb verb();
 
+    public boolean intersects(DecoratedKey key)
+    {
+        return command.dataRange().contains(key);
+    }
+
+    public static Partition create(ClusterMetadata metadata,
+                                   SinglePartitionReadCommand command,
+                                   ConsistencyLevel consistencyLevel)
+    {
+        
Preconditions.checkArgument(command.metadata().replicationType().isTracked());
+        Keyspace keyspace = Keyspace.open(command.metadata().keyspace);
+        ColumnFamilyStore cfs = 
keyspace.getColumnFamilyStore(command.metadata().id);
+        SpeculativeRetryPolicy retry = cfs.metadata().params.speculativeRetry;
+
+        ReplicaPlan.ForTokenRead replicaPlan = ReplicaPlans.forRead(metadata,
+                                                                    keyspace,
+                                                                    
command.partitionKey().getToken(),
+                                                                    
command.indexQueryPlan(),
+                                                                    
consistencyLevel,
+                                                                    retry);
+
+        return new Partition(command, replicaPlan, consistencyLevel);
+    }

Review Comment:
   This seems to completely duplicate `Partition.create()` static method, can 
be removed, I think.



##########
src/java/org/apache/cassandra/db/partitions/PartitionIterators.java:
##########
@@ -99,6 +100,52 @@ public static void consume(PartitionIterator iterator)
         }
     }
 
+    /**
+     * Merges multiple partition iterators with the requirement that there are 
no keys in common between any
+     * of the iterators
+     */
+    public static PartitionIterator 
mergeNonOverlapping(List<PartitionIterator> iterators)
+    {
+        MergeIterator.Reducer<RowIterator, RowIterator> reducer = new 
MergeIterator.Reducer<RowIterator, RowIterator>()
+        {
+            RowIterator current;
+
+            @Override
+            protected void onKeyChange()
+            {
+                current = null;
+            }
+
+            @Override
+            public void reduce(int idx, RowIterator partition)
+            {
+                if (current != null)
+                {
+                    throw new IllegalStateException("Multiple partitions 
received for " + current.partitionKey());
+                }
+                current = partition;
+            }
+
+            @Override
+            protected RowIterator getReduced()
+            {
+                return current;
+            }
+        };
+
+        Comparator<RowIterator> comparator = Comparator.comparing(p -> 
p.partitionKey());

Review Comment:
   Might as well stash in a static field and reuse the same comparator? It 
might be scalar-replaced, it may not be, but it doesn't negatively affect 
readability to pull it out I think.



##########
src/java/org/apache/cassandra/service/reads/tracked/AbstractPartialTrackedRead.java:
##########
@@ -30,34 +31,192 @@
 import org.apache.cassandra.db.ReadExecutionController;
 import org.apache.cassandra.db.partitions.PartitionUpdate;
 import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator;
-import org.apache.cassandra.db.partitions.UnfilteredPartitionIterators;
 import org.apache.cassandra.db.transform.RTBoundValidator;
-import org.apache.cassandra.index.Index;
 
-import static 
org.apache.cassandra.db.partitions.UnfilteredPartitionIterators.MergeListener.NOOP;
 
 public abstract class AbstractPartialTrackedRead implements PartialTrackedRead
 {
     private static final Logger logger = 
LoggerFactory.getLogger(AbstractPartialTrackedRead.class);
 
-    private enum State
+    protected interface Augmentable
     {
-        INITIALIZED,
-        PREPARED,
-        READING,
-        FINISHED
+        State augment(PartitionUpdate update);
+    }
+
+    protected static abstract class State
+    {
+        protected static final State CLOSED = new State()
+        {
+            @Override
+            String name()
+            {
+                return "closed";
+            }
+
+            @Override
+            boolean isClosed()
+            {
+                return true;
+            }
+        };
+
+        abstract String name();
+
+        boolean isInitialized()
+        {
+            return false;
+        }
+
+        Initialized asInitialized()
+        {
+            throw new IllegalStateException("State is " + name() + ", not " + 
Initialized.NAME);
+        }
+
+        boolean isPrepared()
+        {
+            return false;
+        }
+
+        Prepared asPrepared()
+        {
+            throw new IllegalStateException("State is " + name() + ", not " + 
Prepared.NAME);
+        }
+
+        boolean isCompleted()
+        {
+            return false;
+        }
+
+        Completed asCompleted()
+        {
+            throw new IllegalStateException("State is " + name() + ", not " + 
Completed.NAME);
+        }
+
+        boolean isAugmentable()
+        {
+            return isPrepared() || isInitialized();
+        }
+
+        Augmentable asAugmentable()
+        {
+            if (isPrepared())
+                return asPrepared();
+            if (isInitialized())
+                return asInitialized();
+            throw new IllegalStateException("State is " + name() + ", not 
augmentable");
+        }
+
+        boolean isClosed()
+        {
+            return false;
+        }
+
+        void close()
+        {
+
+        }
+    }
+
+    protected final class Initialized extends State implements Augmentable
+    {
+        static final String NAME = "initialized";
+
+        List<PartitionUpdate> queuedUpdates = new ArrayList<>();
+
+        @Override
+        String name()
+        {
+            return NAME;
+        }
+
+        @Override
+        boolean isInitialized()
+        {
+            return true;
+        }
+
+        @Override
+        Initialized asInitialized()
+        {
+            return this;
+        }
+
+        @Override
+        public State augment(PartitionUpdate update)
+        {
+            logger.trace("queueing update on {}", 
AbstractPartialTrackedRead.this);
+            queuedUpdates.add(update);
+            return this;
+        }

Review Comment:
   I don't think this can ever be reached? We never expose the new read until 
`prepare()` is called on it, so entire `Initialized` state and the subsequent 
transition are kinda dead code?



##########
src/java/org/apache/cassandra/service/reads/tracked/PartialTrackedSinglePartitionRead.java:
##########
@@ -29,19 +31,24 @@
 import org.apache.cassandra.db.partitions.SimpleBTreePartition;
 import org.apache.cassandra.db.partitions.SingletonUnfilteredPartitionIterator;
 import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator;
+import org.apache.cassandra.db.partitions.UnfilteredPartitionIterators;
 import org.apache.cassandra.db.rows.UnfilteredRowIterator;
 import org.apache.cassandra.index.Index;
 import org.apache.cassandra.index.transactions.UpdateTransaction;
 
+import static 
org.apache.cassandra.db.partitions.UnfilteredPartitionIterators.MergeListener.NOOP;
+
 public class PartialTrackedSinglePartitionRead extends 
AbstractPartialTrackedRead
 {
+    private final Index.Searcher searcher;
     private final SinglePartitionReadCommand command;
     private final UnfilteredPartitionIterator initialData;
     private SimpleBTreePartition augmentedData;

Review Comment:
   These are no longer needed I think



##########
src/java/org/apache/cassandra/service/reads/tracked/PartialTrackedRead.java:
##########
@@ -25,33 +25,46 @@
 import org.apache.cassandra.db.Mutation;
 import org.apache.cassandra.db.ReadCommand;
 import org.apache.cassandra.db.ReadExecutionController;
+import org.apache.cassandra.db.filter.DataLimits;
 import org.apache.cassandra.db.partitions.PartitionIterator;
 import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator;
 import org.apache.cassandra.db.partitions.UnfilteredPartitionIterators;
 import org.apache.cassandra.index.Index;
+import org.apache.cassandra.utils.concurrent.Future;
 
 public interface PartialTrackedRead

Review Comment:
   We probably don't need an interface here, seeing that all existing 
implementations extend `AbstractPartialTrackedRead` anyway?



##########
src/java/org/apache/cassandra/service/reads/tracked/AbstractPartialTrackedRead.java:
##########
@@ -86,69 +239,51 @@ public long startTimeNanos()
         return startTimeNanos;
     }
 
-    abstract void freezeInitialData();
-
-    abstract UnfilteredPartitionIterator initialData();
-
-    abstract UnfilteredPartitionIterator augmentedData();
-
-    abstract void augmentResponse(PartitionUpdate update);
+    protected synchronized State state()
+    {
+        return state;
+    }
 
     /**
      * Implementors need to call this before returning this from 
createInProgressRead
      */
-    synchronized void prepare()
+    synchronized void prepare(UnfilteredPartitionIterator initialData)
     {
         logger.trace("Preparing read {}", this);
-        Preconditions.checkState(state == State.INITIALIZED);
-        freezeInitialData();
-        state = State.PREPARED;
+        state = state.asInitialized().prepare(initialData);
     }
 
     @Override
-    public void augment(Mutation mutation)
+    public synchronized void augment(Mutation mutation)
     {
-        Preconditions.checkState(state == State.PREPARED);
         PartitionUpdate update = 
mutation.getPartitionUpdate(command().metadata());
         if (update != null)
-            augmentResponse(update);
+            state = state.asAugmentable().augment(update);
     }
 
     private UnfilteredPartitionIterator complete(UnfilteredPartitionIterator 
iterator)
     {
         return command().completeTrackedRead(iterator, this);
     }

Review Comment:
   Unused now?



##########
src/java/org/apache/cassandra/service/reads/tracked/PartialTrackedRead.java:
##########
@@ -25,33 +25,46 @@
 import org.apache.cassandra.db.Mutation;
 import org.apache.cassandra.db.ReadCommand;
 import org.apache.cassandra.db.ReadExecutionController;
+import org.apache.cassandra.db.filter.DataLimits;
 import org.apache.cassandra.db.partitions.PartitionIterator;
 import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator;
 import org.apache.cassandra.db.partitions.UnfilteredPartitionIterators;
 import org.apache.cassandra.index.Index;
+import org.apache.cassandra.utils.concurrent.Future;
 
 public interface PartialTrackedRead
 {
     interface CompletedRead extends AutoCloseable

Review Comment:
   Maybe extract this one out from an inner interface?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: pr-unsubscr...@cassandra.apache.org
For additional commands, e-mail: pr-h...@cassandra.apache.org

Reply via email to