belliottsmith commented on code in PR #4373:
URL: https://github.com/apache/cassandra/pull/4373#discussion_r2378855763


##########
src/java/org/apache/cassandra/db/virtual/AbstractLazyVirtualTable.java:
##########
@@ -0,0 +1,753 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.virtual;
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.TreeMap;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.function.UnaryOperator;
+
+import javax.annotation.Nullable;
+
+import accord.utils.Invariants;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.Clusterable;
+import org.apache.cassandra.db.Clustering;
+import org.apache.cassandra.db.ClusteringPrefix;
+import org.apache.cassandra.db.DataRange;
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.DeletionTime;
+import org.apache.cassandra.db.LivenessInfo;
+import org.apache.cassandra.db.RegularAndStaticColumns;
+import org.apache.cassandra.db.filter.ClusteringIndexFilter;
+import org.apache.cassandra.db.filter.ColumnFilter;
+import org.apache.cassandra.db.filter.DataLimits;
+import org.apache.cassandra.db.filter.RowFilter;
+import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.db.marshal.CompositeType;
+import org.apache.cassandra.db.partitions.PartitionUpdate;
+import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator;
+import org.apache.cassandra.db.rows.BTreeRow;
+import org.apache.cassandra.db.rows.BufferCell;
+import org.apache.cassandra.db.rows.ColumnData;
+import org.apache.cassandra.db.rows.EncodingStats;
+import org.apache.cassandra.db.rows.Row;
+import org.apache.cassandra.db.rows.Unfiltered;
+import org.apache.cassandra.db.rows.UnfilteredRowIterator;
+import org.apache.cassandra.dht.AbstractBounds;
+import org.apache.cassandra.dht.Bounds;
+import org.apache.cassandra.exceptions.InvalidRequestException;
+import org.apache.cassandra.exceptions.ReadTimeoutException;
+import org.apache.cassandra.schema.ColumnMetadata;
+import org.apache.cassandra.schema.TableMetadata;
+import org.apache.cassandra.service.ClientWarn;
+import org.apache.cassandra.utils.BulkIterator;
+import org.apache.cassandra.utils.Clock;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.btree.BTree;
+import org.apache.cassandra.utils.btree.UpdateFunction;
+
+import static org.apache.cassandra.db.ClusteringPrefix.Kind.STATIC_CLUSTERING;
+import static org.apache.cassandra.db.ConsistencyLevel.ONE;
+import static org.apache.cassandra.utils.Clock.Global.nanoTime;
+
+/**
+ * An abstract virtual table implementation that builds the resultset on 
demand.
+ */
+public abstract class AbstractLazyVirtualTable implements VirtualTable
+{
+    public enum OnTimeout { BEST_EFFORT, FAIL }
+
+    // in the special case where we know we have enough rows in the collector, 
throw this exception to terminate early
+    public static class InternalDoneException extends RuntimeException {}
+    // in the special case where we have timed out, throw this exception to 
terminate early
+    public static class InternalTimeoutException extends RuntimeException {}
+
+    public static class FilterRange<V>
+    {
+        final V min, max;
+        public FilterRange(V min, V max)
+        {
+            this.min = min;
+            this.max = max;
+        }
+    }
+
+    public interface PartitionsCollector
+    {
+        DataRange dataRange();
+        RowFilter rowFilter();
+        ColumnFilter columnFilter();
+        DataLimits limits();
+        long nowInSeconds();
+        long timestampMicros();
+        long deadlineNanos();
+        boolean isEmpty();
+
+        RowCollector row(Object... primaryKeys);
+        PartitionCollector partition(Object... partitionKeys);
+        UnfilteredPartitionIterator finish();
+
+        @Nullable Object[] singlePartitionKey();
+        <I, O> FilterRange<O> filters(String column, Function<I, O> translate, 
UnaryOperator<O> exclusiveStart, UnaryOperator<O> exclusiveEnd);
+    }
+
+    public interface PartitionCollector
+    {
+        void collect(Consumer<RowsCollector> addTo);
+    }
+
+    public interface RowsCollector
+    {
+        RowCollector add(Object... clusteringKeys);
+    }
+
+    public interface RowCollector
+    {
+        default void lazyCollect(Consumer<ColumnsCollector> addToIfNeeded) { 
eagerCollect(addToIfNeeded); }
+        void eagerCollect(Consumer<ColumnsCollector> addToNow);
+    }
+
+    public interface ColumnsCollector
+    {
+        /**
+         * equivalent to
+         * {@code
+         * if (value == null) add(columnName, null);
+         * else if (f1.apply(value) == null) add(columnName, f1.apply(value));
+         * else add(columnName, f2.apply(f1.apply(value)));
+         * }
+         */
+        <V1, V2> ColumnsCollector add(String columnName, V1 value, Function<? 
super V1, ? extends V2> f1, Function<? super V2, ?> f2);
+
+        default <V> ColumnsCollector add(String columnName, V value, 
Function<? super V, ?> transform)
+        {
+            return add(columnName, value, Function.identity(), transform);
+        }
+        default ColumnsCollector add(String columnName, Object value)
+        {
+            return add(columnName, value, Function.identity());
+        }
+    }
+
+    public static class SimplePartitionsCollector implements 
PartitionsCollector
+    {
+        final TableMetadata metadata;
+        final boolean isSorted;
+        final boolean isSortedByPartitionKey;
+
+        final Map<String, ColumnMetadata> columnLookup = new HashMap<>();
+        final NavigableMap<DecoratedKey, SimplePartition> partitions;
+
+        final DataRange dataRange;
+        final ColumnFilter columnFilter;
+        final RowFilter rowFilter;
+        final DataLimits limits;
+
+        final long startedAtNanos = Clock.Global.nanoTime();
+        final long deadlineNanos;
+
+        final long nowInSeconds = Clock.Global.nowInSeconds();
+        final long timestampMicros;
+
+        int totalRowCount;
+        int lastFilteredTotalRowCount, lastFilteredPartitionCount;
+
+        @Override public DataRange dataRange() { return dataRange; }
+        @Override public RowFilter rowFilter() { return rowFilter; }
+        @Override public ColumnFilter columnFilter() { return columnFilter; }
+        @Override public DataLimits limits() { return limits; }
+        @Override public long nowInSeconds() { return nowInSeconds; }
+        @Override public long timestampMicros() { return timestampMicros; }
+        @Override public long deadlineNanos() { return deadlineNanos; }
+        @Override public boolean isEmpty() { return totalRowCount == 0; }
+
+        public SimplePartitionsCollector(TableMetadata metadata, boolean 
isSorted, boolean isSortedByPartitionKey,
+                                         DataRange dataRange, ColumnFilter 
columnFilter, RowFilter rowFilter, DataLimits limits)
+        {
+            this.metadata = metadata;
+            this.isSorted = isSorted;
+            this.isSortedByPartitionKey = isSortedByPartitionKey;
+            this.dataRange = dataRange;
+            this.columnFilter = columnFilter;
+            this.rowFilter = rowFilter;
+            this.limits = limits;
+            this.timestampMicros = FBUtilities.timestampMicros();
+            this.deadlineNanos = startedAtNanos + 
DatabaseDescriptor.getReadRpcTimeout(TimeUnit.NANOSECONDS);
+            this.partitions = new TreeMap<>(dataRange.isReversed() ? 
DecoratedKey.comparator.reversed() : DecoratedKey.comparator);
+            for (ColumnMetadata cm : metadata.columns())
+                columnLookup.put(cm.name.toString(), cm);
+        }
+
+        public Object[] singlePartitionKey()
+        {
+            AbstractBounds<?> bounds = dataRange().keyRange();
+            if (!bounds.isStartInclusive() || !bounds.isEndInclusive() || 
!bounds.left.equals(bounds.right) || !(bounds.left instanceof DecoratedKey))
+                return null;
+
+            return composePartitionKeys((DecoratedKey) bounds.left, metadata);
+        }
+
+        @Override
+        public PartitionCollector partition(Object ... partitionKeys)
+        {
+            int pkSize = metadata.partitionKeyColumns().size();
+            if (pkSize != partitionKeys.length)
+                throw new IllegalArgumentException();
+
+            DecoratedKey partitionKey = decomposePartitionKeys(metadata, 
partitionKeys);
+            if (!dataRange.contains(partitionKey))
+                return dropCks -> {};
+
+            return partitions.computeIfAbsent(partitionKey, 
SimplePartition::new);
+        }
+
+        @Override
+        public UnfilteredPartitionIterator finish()
+        {
+            final Iterator<SimplePartition> partitions = 
this.partitions.values().iterator();
+            return new UnfilteredPartitionIterator()
+            {
+                @Override public TableMetadata metadata() { return metadata; }
+                @Override public void close() {}
+
+                @Override
+                public boolean hasNext()
+                {
+                    return partitions.hasNext();
+                }
+
+                @Override
+                public UnfilteredRowIterator next()
+                {
+                    SimplePartition partition = partitions.next();
+                    Iterator<Row> rows = partition.rows();
+
+                    return new UnfilteredRowIterator()
+                    {
+                        @Override public TableMetadata metadata() { return 
metadata; }
+                        @Override public boolean isReverseOrder() { return 
dataRange.isReversed(); }
+                        @Override public RegularAndStaticColumns columns() { 
return columnFilter.fetchedColumns(); }
+                        @Override public DecoratedKey partitionKey() { return 
partition.key; }
+
+                        @Override public Row staticRow() { return 
partition.staticRow(); }
+                        @Override public boolean hasNext() { return 
rows.hasNext(); }
+                        @Override public Unfiltered next() { return 
rows.next(); }
+
+                        @Override public void close() {}
+                        @Override public DeletionTime partitionLevelDeletion() 
{ return DeletionTime.LIVE; }
+                        @Override public EncodingStats stats() { return 
EncodingStats.NO_STATS; }
+                    };
+                }
+            };
+        }
+
+        @Override
+        @Nullable
+        public <I, O> FilterRange<O> filters(String columnName, Function<I, O> 
translate, UnaryOperator<O> exclusiveStart, UnaryOperator<O> exclusiveEnd)
+        {
+            ColumnMetadata column = columnLookup.get(columnName);
+            O min = null, max = null;
+            for (RowFilter.Expression expression : 
rowFilter().getExpressions())
+            {
+                if (!expression.column().equals(column))
+                    continue;
+
+                O bound = 
translate.apply((I)column.type.compose(expression.getIndexValue()));
+                switch (expression.operator())
+                {
+                    default: throw new InvalidRequestException("Operator " + 
expression.operator() + " not supported for txn_id");
+                    case EQ:  min = max = bound; break;
+                    case LTE: max = bound; break;
+                    case LT:  max = exclusiveEnd.apply(bound); break;
+                    case GTE: min = bound; break;
+                    case GT:  min = exclusiveStart.apply(bound); break;
+                }
+            }
+
+            return new FilterRange<>(min, max);
+        }
+
+        @Override
+        public RowCollector row(Object... primaryKeys)
+        {
+            int pkSize = metadata.partitionKeyColumns().size();
+            int ckSize = metadata.clusteringColumns().size();
+            if (pkSize + ckSize != primaryKeys.length)
+                throw new IllegalArgumentException();
+
+            Object[] partitionKeyValues = new Object[pkSize];
+            Object[]   clusteringValues = new Object[ckSize];
+
+            System.arraycopy(primaryKeys, 0, partitionKeyValues, 0, pkSize);
+            System.arraycopy(primaryKeys, pkSize, clusteringValues, 0, ckSize);
+
+            DecoratedKey partitionKey = decomposePartitionKeys(metadata, 
partitionKeyValues);
+            Clustering<?> clustering = decomposeClusterings(metadata, 
clusteringValues);
+
+            if (!dataRange.contains(partitionKey) || 
!dataRange.clusteringIndexFilter(partitionKey).selects(clustering))
+                return drop -> {};
+
+            return partitions.computeIfAbsent(partitionKey, 
SimplePartition::new).row(clustering);
+        }
+
+        private final class SimplePartition implements PartitionCollector, 
RowsCollector
+        {
+            private final DecoratedKey key;
+            // we assume no duplicate rows, and impose the condition lazily
+            private SimpleRow[] rows;
+            private int rowCount;
+            private SimpleRow staticRow;
+            private boolean dropRows;
+
+            private SimplePartition(DecoratedKey key)
+            {
+                this.key = key;
+                this.rows = new SimpleRow[1];
+            }
+
+            @Override
+            public void collect(Consumer<RowsCollector> addTo)
+            {
+                addTo.accept(this);
+            }
+
+            @Override
+            public RowCollector add(Object... clusteringKeys)
+            {
+                int ckSize = metadata.clusteringColumns().size();
+                if (ckSize != clusteringKeys.length)
+                    throw new IllegalArgumentException();
+
+                return row(decomposeClusterings(metadata, clusteringKeys));
+            }
+
+            RowCollector row(Clustering<?> clustering)
+            {
+                if (nanoTime() > deadlineNanos)
+                    throw new InternalTimeoutException();
+
+                if (dropRows || 
!dataRange.clusteringIndexFilter(key).selects(clustering))
+                    return drop -> {};
+
+                if (totalRowCount >= limits.count())
+                {
+                    boolean filter;
+                    if (!isSorted || !isSortedByPartitionKey || 
lastFilteredPartitionCount == partitions.size())
+                    {
+                        filter = totalRowCount / 2 >= Math.max(1024, 
limits.count());
+                    }
+                    else
+                    {
+                        int rowsAddedSinceLastFiltered = totalRowCount - 
lastFilteredTotalRowCount;
+                        int threshold = Math.max(32, Math.min(1024, 
lastFilteredTotalRowCount / 2));
+                        filter = lastFilteredTotalRowCount == 0 || 
rowsAddedSinceLastFiltered >= threshold;
+                    }
+
+                    if (filter)
+                    {
+                        // first filter within each partition
+                        for (SimplePartition partition : partitions.values())
+                        {
+                            int curCount = partition.rowCount;
+                            int newCount = Math.min(curCount, 
limits.perPartitionCount());
+                            newCount = 
partition.filterAndSortAndTruncate(newCount);
+                            totalRowCount -= curCount - newCount;
+                        }
+
+                        // then drop any partitions that completely fall 
outside our limit
+                        Iterator<SimplePartition> iter = 
partitions.descendingMap().values().iterator();
+                        SimplePartition last;
+                        while (true)
+                        {
+                            SimplePartition next = last = iter.next();
+                            if (totalRowCount - next.rowCount < limits.count())
+                                break;
+
+                            iter.remove();
+                            totalRowCount -= next.rowCount;
+                            if (next == this)
+                                dropRows = true;
+                        }
+
+                        // possibly truncate the last partition if it 
partially falls outside the limit
+                        int overflow = Math.max(0, totalRowCount - 
limits.count());
+                        int curCount = last.rowCount;
+                        int newCount = curCount - overflow;
+                        newCount = last.filterAndSortAndTruncate(newCount);
+                        totalRowCount -= curCount - newCount;
+                        lastFilteredTotalRowCount = totalRowCount;
+                        lastFilteredPartitionCount = partitions.size();
+
+                        if (isSortedByPartitionKey && totalRowCount - newCount 
>= limits.count())
+                            throw new InternalDoneException();
+
+                        if (isSorted && totalRowCount >= limits.count())
+                            throw new InternalDoneException();
+
+                        if (dropRows)
+                            return drop -> {};
+                    }
+                }
+
+                SimpleRow result = new SimpleRow(clustering);
+                if (clustering.kind() == STATIC_CLUSTERING)
+                {
+                    Invariants.require(staticRow == null);
+                    staticRow = result;
+                }
+                else
+                {
+                    totalRowCount++;
+                    if (rowCount == rows.length)
+                        rows = Arrays.copyOf(rows, Math.max(8, rowCount * 2));
+                    rows[rowCount++] = result;
+                }
+                return result;
+            }
+
+            void filterAndSort()
+            {
+                int newCount = 0;
+                for (int i = 0 ; i < rowCount; ++i)
+                {
+                    if (rows[i].rowFilterIncludes())
+                    {
+                        if (newCount != i)
+                            rows[newCount] = rows[i];
+                        newCount++;
+                    }
+                }
+                totalRowCount -= (rowCount - newCount);
+                Arrays.fill(rows, newCount, rowCount, null);
+                rowCount = newCount;
+                Arrays.sort(rows, 0, newCount, rowComparator());
+            }
+
+            int filterAndSortAndTruncate(int newCount)
+            {
+                Invariants.requireArgument(newCount <= rowCount);
+                filterAndSort();
+                if (rowCount < newCount)
+                    return rowCount;
+
+                Arrays.fill(rows, newCount, rowCount, null);
+                rowCount = newCount;
+                return newCount;
+            }
+
+            private Comparator<SimpleRow> rowComparator()
+            {
+                Comparator<Clusterable> cmp = dataRange.isReversed() ? 
metadata.comparator.reversed() : metadata.comparator;
+                return (a, b) -> cmp.compare(a.clustering, b.clustering);

Review Comment:
   I prefer not to in cases where we are already allocating plenty of objects, 
and where we don't need to eek out every scintilla of performance.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to