ifesdjeen commented on code in PR #3910:
URL: https://github.com/apache/cassandra/pull/3910#discussion_r1961851753
##########
src/java/org/apache/cassandra/io/util/RebufferingInputStream.java:
##########
@@ -275,6 +275,18 @@ public long readUnsignedVInt() throws IOException
return retval;
}
+ @Override
+ public long readLeastSignificantBytes(int bytes) throws IOException
+ {
+ if (buffer.remaining() < 8)
+ return super.readLeastSignificantBytes(bytes);
+
+ long retval = buffer.getLong(buffer.position());
Review Comment:
nit: should we add an assert it is _exactly_ 8 bytes only?
##########
src/java/org/apache/cassandra/service/accord/AccordJournal.java:
##########
@@ -621,22 +621,91 @@ public Builder(TxnId txnId, Load load)
{
super(txnId, load);
}
- public ByteBuffer asByteBuffer(RedundantBefore redundantBefore, int
userVersion) throws IOException
+ public ByteBuffer asByteBuffer(int userVersion) throws IOException
{
try (DataOutputBuffer out = new DataOutputBuffer())
{
- serialize(out, redundantBefore, userVersion);
+ serialize(out, userVersion);
return out.asNewBuffer();
}
}
- public void serialize(DataOutputPlus out, RedundantBefore
redundantBefore, int userVersion) throws IOException
+ public void serialize(DataOutputPlus out, int userVersion) throws
IOException
{
Invariants.require(mask == 0);
Invariants.require(flags != 0);
int flags = validateFlags(this.flags);
- Writer.serialize(construct(redundantBefore), flags, out,
userVersion);
+ serialize(flags, out, userVersion);
+ }
+
+ private void serialize(int flags, DataOutputPlus out, int userVersion)
throws IOException
Review Comment:
Should we maybe use this one instead of `serialize(Command command, int
flags, DataOutputPlus out, int userVersion)` too so that we do not have this
logic twice? Essentially apply command to builder, and then serialize.
##########
src/java/org/apache/cassandra/db/marshal/ByteBufferAccessor.java:
##########
@@ -315,6 +315,35 @@ public int putFloat(ByteBuffer dst, int offset, float
value)
return TypeSizes.FLOAT_SIZE;
}
+ @Override
+ public int putLeastSignificantBytes(ByteBuffer dst, int offset, long
register, int bytes)
+ {
+ if (dst.remaining() < Long.BYTES)
+ {
+ return ValueAccessor.putLeastSignificantBytes(this, dst, offset,
register, bytes);
+ }
+ else
+ {
+ int pos = dst.position() + offset;
+ dst.putLong(pos, register << (64 - (bytes * 8)));
Review Comment:
nit: add invariant for number of bytes?
##########
src/java/org/apache/cassandra/db/compaction/CompactionIterator.java:
##########
@@ -782,38 +780,41 @@ protected Row applyToRow(Row row)
class AccordCommandsForKeyPurger extends AbstractPurger
{
final CommandsForKeyAccessor accessor;
- final Int2ObjectHashMap<RedundantBefore> redundantBefores;
+ final AccordCompactionInfos compactionInfos;
+
+ AccordCompactionInfo info;
int storeId;
- TokenKey partitionKey;
+ TokenKey tokenKey;
AccordCommandsForKeyPurger(CommandsForKeyAccessor accessor,
Supplier<IAccordService> accordService)
{
this.accessor = accessor;
- this.redundantBefores =
accordService.get().getCompactionInfo().redundantBefores;
+ this.compactionInfos = new AccordCompactionInfos(null,
accordService.get().getCompactionInfo());
}
protected void beginPartition(UnfilteredRowIterator partition)
{
- ByteBuffer[] partitionKeyComponents =
accessor.splitPartitionKey(partition.partitionKey());
- storeId = accessor.getStoreId(partitionKeyComponents);
- partitionKey = accessor.getKey(partitionKeyComponents);
+ ByteBuffer key = partition.partitionKey().getKey();
+ storeId = accessor.getStoreId(key);
Review Comment:
nit: this is a static method now
##########
src/java/org/apache/cassandra/db/marshal/ByteBufferAccessor.java:
##########
@@ -315,6 +315,35 @@ public int putFloat(ByteBuffer dst, int offset, float
value)
return TypeSizes.FLOAT_SIZE;
}
+ @Override
+ public int putLeastSignificantBytes(ByteBuffer dst, int offset, long
register, int bytes)
+ {
+ if (dst.remaining() < Long.BYTES)
+ {
+ return ValueAccessor.putLeastSignificantBytes(this, dst, offset,
register, bytes);
+ }
+ else
+ {
+ int pos = dst.position() + offset;
+ dst.putLong(pos, register << (64 - (bytes * 8)));
+ }
+ return bytes;
+ }
+
+ @Override
+ public long getLeastSignificantBytes(ByteBuffer dst, int offset, int bytes)
+ {
+ if (dst.remaining() < Long.BYTES)
+ {
+ return ValueAccessor.getLeastSignificantBytes(this, dst, offset,
bytes);
+ }
+ else
+ {
+ int pos = dst.position() + offset;
+ return dst.getLong(pos) >>> (64 - (bytes * 8));
Review Comment:
nit: add invariant for a maximum number of bytes?
##########
test/unit/org/apache/cassandra/service/accord/serializers/TokenKeyTest.java:
##########
@@ -0,0 +1,299 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service.accord.serializers;
+
+import java.math.BigInteger;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import org.junit.Test;
+
+import accord.utils.Gen;
+import accord.utils.Gens;
+import accord.utils.Invariants;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.marshal.ByteBufferAccessor;
+import org.apache.cassandra.dht.ByteOrderedPartitioner;
+import org.apache.cassandra.dht.IPartitioner;
+import org.apache.cassandra.dht.Murmur3Partitioner;
+import org.apache.cassandra.dht.Murmur3Partitioner.LongToken;
+import org.apache.cassandra.dht.RandomPartitioner;
+import org.apache.cassandra.dht.RandomPartitioner.BigIntegerToken;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.io.util.DataInputBuffer;
+import org.apache.cassandra.io.util.DataOutputBuffer;
+import org.apache.cassandra.schema.TableId;
+import org.apache.cassandra.service.accord.api.TokenKey;
+import org.apache.cassandra.utils.AccordGenerators;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.CassandraGenerators;
+import org.assertj.core.api.Assertions;
+
+import static accord.utils.Property.qt;
+import static org.apache.cassandra.service.accord.api.TokenKey.serializer;
+import static org.apache.cassandra.utils.AccordGenerators.fromQT;
+import static org.apache.cassandra.utils.CassandraGenerators.partitioners;
+import static org.apache.cassandra.utils.CassandraGenerators.token;
+
+public class TokenKeyTest
+{
+ static
+ {
+ DatabaseDescriptor.clientInitialization();
+ // AccordRoutingKey$TokenKey reaches into DD to get partitioner, so
need to set that up...
+ DatabaseDescriptor.setPartitionerUnsafe(Murmur3Partitioner.instance);
+ }
+
+ @Test
+ public void serde()
+ {
+
qt().withSeed(0L).forAll(fromQT(partitioners().assuming(IPartitioner::accordSupported)).flatMap(partitioner
-> routingKeyGen(fromQT(CassandraGenerators.TABLE_ID_GEN),
fromQT(token(partitioner)), partitioner)))
Review Comment:
nit:unhardcode seed?
##########
src/java/org/apache/cassandra/service/accord/serializers/KeySerializers.java:
##########
@@ -518,12 +523,14 @@ public long serializedSize(Seekables<?, ?> t, int version)
}
}
+ // this serializer is designed to permits using the collection in its
serialized form with minimal in-memory state.
+ // it also saves some memory by avoiding duplicating prefixes (which
happens to also assist faster lookups)
public abstract static class AbstractKeysSerializer<K extends RoutableKey,
KS extends AbstractKeys<K>> implements IVersionedSerializer<KS>
{
- final IVersionedSerializer<K> keySerializer;
+ final AccordKeySerializer<K> keySerializer;
final IntFunction<K[]> allocate;
- public AbstractKeysSerializer(IVersionedSerializer<K> keySerializer,
IntFunction<K[]> allocate)
+ public AbstractKeysSerializer(AccordKeySerializer<K> keySerializer,
IntFunction<K[]> allocate)
Review Comment:
nit: on L553, we can use `skip` instead of `deserialize` now that skip is
available
##########
src/java/org/apache/cassandra/service/accord/TokenRange.java:
##########
@@ -111,10 +111,10 @@ public TokenRange newRange(RoutingKey start, RoutingKey
end)
public org.apache.cassandra.dht.Range<Token> toKeyspaceRange()
{
IPartitioner partitioner = DatabaseDescriptor.getPartitioner();
- AccordRoutingKey start = start();
- AccordRoutingKey end = end();
- Token left = start instanceof SentinelKey ?
partitioner.getMinimumToken() : start.token();
- Token right = end instanceof SentinelKey ?
partitioner.getMinimumToken() : end.token();
+ TokenKey start = start();
Review Comment:
Oh, nice. This has bothered me quite a bit, but I was not sure what
SentinelKey was at that moment.
##########
src/java/org/apache/cassandra/db/marshal/ValueAccessor.java:
##########
@@ -493,4 +510,72 @@ public static <L, R> boolean equals(L left,
ValueAccessor<L> leftAccessor, R rig
{
return compare(left, leftAccessor, right, rightAccessor) == 0;
}
+
+ public static <V> int putLeastSignificantBytes(ValueAccessor<V> accessor,
V dst, int offset, long register, int bytes)
+ {
+ switch (bytes)
+ {
+ case 0:
+ break;
+ case 1:
+ accessor.putByte(dst, offset, (byte)register);
+ break;
+ case 2:
+ accessor.putShort(dst, offset, (short)register);
+ break;
+ case 3:
+ accessor.putShort(dst, offset, (short)(register >>> 8));
+ accessor.putByte(dst, offset, (byte)register);
+ break;
+ case 4:
+ accessor.putInt(dst, offset, (int)register);
+ break;
+ case 5:
+ accessor.putInt(dst, offset, (int)(register >>> 8));
+ accessor.putByte(dst, offset, (byte)register);
+ break;
+ case 6:
+ accessor.putInt(dst, offset, (int)(register >>> 16));
+ accessor.putShort(dst, offset, (short)register);
+ break;
+ case 7:
+ accessor.putInt(dst, offset, (int)(register >>> 24));
+ accessor.putShort(dst, offset, (short)(register >> 8));
+ accessor.putByte(dst, offset, (byte)register);
+ break;
+ case 8:
+ accessor.putLong(dst, offset, register);
+ break;
+ default:
+ throw new IllegalArgumentException();
+ }
+ return bytes;
+ }
+
+ public static <V> long getLeastSignificantBytes(ValueAccessor<V> accessor,
V dst, int offset, int bytes)
+ {
+ switch (bytes)
+ {
+ case 0: return 0;
+ case 1: return accessor.getByte(dst, offset);
+ case 2: return accessor.getShort(dst, offset);
+ case 3:
+ return ((long)accessor.getShort(dst, offset) << 8)
+ | (long)accessor.getByte(dst, offset + 1);
Review Comment:
I _think_ we should have `+ 2` here.
##########
src/java/org/apache/cassandra/service/accord/serializers/KeySerializers.java:
##########
@@ -565,48 +572,313 @@ public long serializedSize(KS keys, int version)
}
}
- public abstract static class AbstractRangesSerializer<RS extends
AbstractRanges> implements IVersionedSerializer<RS>
+ // this serializer is designed to permits using the collection in its
serialized form with minimal in-memory state.
Review Comment:
nit: "to permit"
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]