jbellis commented on code in PR #2673: URL: https://github.com/apache/cassandra/pull/2673#discussion_r1357145878
########## src/java/org/apache/cassandra/index/sai/disk/v1/vector/OnHeapGraph.java: ########## @@ -0,0 +1,360 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.index.sai.disk.v1.vector; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Arrays; +import java.util.Collection; +import java.util.HashSet; +import java.util.Map; +import java.util.PriorityQueue; +import java.util.concurrent.ConcurrentSkipListMap; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Function; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import org.cliffc.high_scale_lib.NonBlockingHashMapLong; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import io.github.jbellis.jvector.disk.CompressedVectors; +import io.github.jbellis.jvector.disk.OnDiskGraphIndex; +import io.github.jbellis.jvector.graph.GraphIndexBuilder; +import io.github.jbellis.jvector.graph.GraphSearcher; +import io.github.jbellis.jvector.pq.ProductQuantization; +import io.github.jbellis.jvector.util.Bits; +import io.github.jbellis.jvector.vector.VectorEncoding; +import io.github.jbellis.jvector.vector.VectorSimilarityFunction; +import org.apache.cassandra.db.marshal.AbstractType; +import org.apache.cassandra.db.marshal.VectorType; +import org.apache.cassandra.exceptions.InvalidRequestException; +import org.apache.cassandra.index.sai.IndexContext; +import org.apache.cassandra.index.sai.disk.format.IndexComponent; +import org.apache.cassandra.index.sai.disk.format.IndexDescriptor; +import org.apache.cassandra.index.sai.disk.io.IndexFileUtils; +import org.apache.cassandra.index.sai.disk.v1.IndexWriterConfig; +import org.apache.cassandra.index.sai.disk.v1.segment.SegmentMetadata; +import org.apache.cassandra.io.util.SequentialWriter; +import org.apache.cassandra.utils.ByteBufferUtil; +import org.apache.lucene.util.StringHelper; + +public class OnHeapGraph<T> +{ + private static final Logger logger = LoggerFactory.getLogger(OnHeapGraph.class); + + private final RamAwareVectorValues vectorValues; + private final GraphIndexBuilder<float[]> builder; + private final VectorType<?> vectorType; + private final VectorSimilarityFunction similarityFunction; + private final Map<float[], VectorPostings<T>> postingsMap; + private final NonBlockingHashMapLong<VectorPostings<T>> postingsByOrdinal; + private final AtomicInteger nextOrdinal = new AtomicInteger(); + private volatile boolean hasDeletions; + + /** + * @param termComparator the vector type + * @param indexWriterConfig + * + * Will create a concurrent object. + */ + public OnHeapGraph(AbstractType<?> termComparator, IndexWriterConfig indexWriterConfig) + { + this(termComparator, indexWriterConfig, true); + } + + /** + * @param termComparator the vector type + * @param indexWriterConfig the {@link IndexWriterConfig} for the graph + * @param concurrent should be true for memtables, false for compaction. Concurrent allows us to search + * while building the graph; non-concurrent allows us to avoid synchronization costs. + */ + @SuppressWarnings("unchecked") + public OnHeapGraph(AbstractType<?> termComparator, IndexWriterConfig indexWriterConfig, boolean concurrent) + { + this.vectorType = (VectorType<?>) termComparator; + vectorValues = concurrent + ? new ConcurrentVectorValues(((VectorType<?>) termComparator).dimension) + : new CompactionVectorValues(((VectorType<Float>) termComparator)); + similarityFunction = indexWriterConfig.getSimilarityFunction(); + // We need to be able to inexpensively distinguish different vectors, with a slower path + // that identifies vectors that are equal but not the same reference. A comparison + // based Map (which only needs to look at vector elements until a difference is found) + // is thus a better option than hash-based (which has to look at all elements to compute the hash). + postingsMap = new ConcurrentSkipListMap<>(Arrays::compare); + postingsByOrdinal = new NonBlockingHashMapLong<>(); + + builder = new GraphIndexBuilder<>(vectorValues, + VectorEncoding.FLOAT32, + similarityFunction, + indexWriterConfig.getMaximumNodeConnections(), + indexWriterConfig.getConstructionBeamWidth(), + 1.2f, + 1.4f); + } + + public int size() + { + return vectorValues.size(); + } + + public boolean isEmpty() + { + return postingsMap.values().stream().allMatch(VectorPostings::isEmpty); + } + + /** + * @return the incremental bytes ysed by adding the given vector to the index + */ + public long add(ByteBuffer term, T key, InvalidVectorBehavior behavior) + { + assert term != null && term.remaining() != 0; + + var vector = vectorType.composeAsFloat(term); + if (behavior == InvalidVectorBehavior.IGNORE) + { + try + { + validateIndexable(vector, similarityFunction); + } + catch (InvalidRequestException e) + { + logger.trace("Ignoring invalid vector during index build against existing data: {}", vector, e); + return 0; + } + } + else + { + assert behavior == InvalidVectorBehavior.FAIL; + validateIndexable(vector, similarityFunction); + } + + var bytesUsed = new AtomicLong(); + var newVector = new AtomicBoolean(); + // if the vector is already in the graph, all that happens is that the postings list is updated Review Comment: this has a pretty nasty race condition when multiple rows are added concurrently with the same vector. fix is in vsearch 9e3ca22c -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]

