tjake commented on code in PR #1954:
URL: https://github.com/apache/cassandra/pull/1954#discussion_r1039024376
##########
src/java/org/apache/cassandra/db/Mutation.java:
##########
@@ -393,34 +388,130 @@ public static class MutationSerializer implements
IVersionedSerializer<Mutation>
{
public void serialize(Mutation mutation, DataOutputPlus out, int
version) throws IOException
{
+ serialization(mutation,
version).serialize(PartitionUpdate.serializer, mutation, out, version);
+ }
+
+ /**
+ * Called early during request processing to prevent that {@link
#serialization(Mutation)} is
+ * called concurrently.
+ * See {@link
org.apache.cassandra.service.StorageProxy#sendToHintedEndpoints(Mutation,
WriteEndpoints, WriteHandler, Verb.AckedRequest)}.
+ */
+ @SuppressWarnings("JavadocReference")
+ public void prepareSerializedBuffer(Mutation mutation, int version)
+ {
+ serialization(mutation, version);
+ }
+
+ /**
+ * Retrieve the cached serialization of this mutation, or computed and
cache said serialization if it doesn't
+ * exists yet. Note that this method is _not_ synchronized even though
it may (and will often) be called
+ * concurrently. Concurrent calls are still safe however, the only
risk is that the value is not cached yet,
+ * multiple concurrent calls may compute it multiple times instead of
just once. This is ok as in practice
+ * as we make sure this doesn't happen in the hot path by forcing the
initial caching in
+ * {@link
org.apache.cassandra.service.StorageProxy#sendToHintedEndpoints(Mutation,
WriteEndpoints, WriteHandler, Verb.AckedRequest)}
+ * via {@link #prepareSerializedBuffer(Mutation)}, which is the only
caller that passes
+ * {@code isPrepare==true}.
+ */
+ @SuppressWarnings("JavadocReference")
+ private Serialization serialization(Mutation mutation, int version)
+ {
+ int versionIndex = MessagingService.getVersionIndex(version);
+ // Retrieves the cached version, or build+cache it if it's not
cached already.
+ Serialization serialization =
mutation.serializations[versionIndex];
+ if (serialization == null)
+ {
+ // We need to use a capacity-limited DOB here.
+ // If a mutation consists of one PartitionUpdate with one
column that exceeds the
+ // "cacheable-mutation-size-limit", a capacity-limited DOB can
handle that case and
+ // throw a BufferCapacityExceededException. "Huge" serialized
mutations can have a
+ // bad impact to G1 GC, if the cached serialized mutation
results in a
+ // "humonguous object" and also frequent re-allocations of the
scratch buffer(s).
+ // I.e. large cached mutation objects cause GC pressure.
+ try (DataOutputBuffer dob =
DataOutputBuffer.limitedScratchBuffer(CACHEABLE_MUTATION_SIZE_LIMIT))
+ {
+ if (!serializeInternal(PartitionUpdate.serializer,
mutation, dob, version,true))
+ {
+ serialization = new NonCacheableSerialization();
+ }
+ else
+ {
+ serialization = new
CachedSerialization(dob.toByteArray());
+ }
+ }
+ catch (DataOutputBuffer.BufferCapacityExceededException tooBig)
Review Comment:
Yeah, I'm reworking this, good call out
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]