yifan-c commented on a change in pull request #868:
URL: https://github.com/apache/cassandra/pull/868#discussion_r567181504



##########
File path: 
test/distributed/org/apache/cassandra/distributed/upgrade/UpgradeTestBase.java
##########
@@ -177,6 +178,10 @@ public void run() throws Throwable
                             cluster.get(n).shutdown().get();
                             cluster.get(n).setVersion(version);
                             cluster.get(n).startup();
+                            // If using in-jvm Dtest networking, update the 
messaging versions
+                            // so that message filters are able to 
serialize/deserialize correctly.
+                            if (!cluster.get(n).config().has(Feature.NETWORK))
+                                cluster.updateMessagingVersions();

Review comment:
       Looks like `updateMessagingVersions()` is already called during 
`cluster.get(n).startup()`. 
   
   ```
   
AbstractCluster.Wrapper#startup(org.apache.cassandra.distributed.api.ICluster)
   ```
   
   Do we need to invoke it twice?

##########
File path: src/java/org/apache/cassandra/net/Message.java
##########
@@ -1285,10 +1298,18 @@ private int extractParamsSizePre40(ByteBuffer buf, int 
readerIndex, int readerLi
 
         private <T> int payloadSize(Message<T> message, int version)
         {
-            long payloadSize = message.payload != null && message.payload != 
NoPayload.noPayload
-                             ? 
message.verb().serializer().serializedSize(message.payload, version)
-                             : 0;
-            return Ints.checkedCast(payloadSize);
+            if (message.payload != null && message.payload != 
NoPayload.noPayload)
+            {
+                IVersionedAsymmetricSerializer<T,T> payloadSerializer = 
message.verb().serializer();
+                if (null == payloadSerializer) // per-response serializers are 
only required for in-jvm dtests for message filtering
+                    payloadSerializer = 
instance().callbacks.get(message.header.id, 
message.header.from).responseVerb.serializer();
+                long payloadSize = 
payloadSerializer.serializedSize(message.payload, version);
+                return Ints.checkedCast(payloadSize);
+            }
+            else
+            {
+                return 0;
+            }

Review comment:
       Can we also pass the payloadSerializer down to this method?

##########
File path: 
test/distributed/org/apache/cassandra/distributed/upgrade/UpgradeTest.java
##########
@@ -20,13 +20,48 @@
 
 import org.junit.Test;
 
+import org.apache.cassandra.distributed.UpgradeableCluster;
 import org.apache.cassandra.distributed.api.ConsistencyLevel;
 import org.apache.cassandra.distributed.api.Feature;
 import org.apache.cassandra.distributed.shared.Versions;
 import static org.apache.cassandra.distributed.shared.AssertUtils.*;
 
 public class UpgradeTest extends UpgradeTestBase
 {
+    public static void truncateWriteReadTest(UpgradeableCluster cluster, int 
instanceNum)

Review comment:
       What does the `truncateWriteReadTest` test? Is it for testing the 
mix-mode messaging filter?

##########
File path: src/java/org/apache/cassandra/net/Message.java
##########
@@ -914,9 +922,14 @@ private Header extractHeaderPre40(ByteBuffer buf, long 
currentTimeNanos, int ver
 
             if (message.payload != null && message.payload != 
NoPayload.noPayload)
             {
+                IVersionedAsymmetricSerializer<T,T> payloadSerializer = 
message.header.verb.serializer();
+                // per-response serializers are only required for in-jvm 
dtests for message filtering
+                if (null == payloadSerializer)
+                    payloadSerializer = 
instance().callbacks.get(message.header.id, 
message.header.from).responseVerb.serializer();
+

Review comment:
       Can we also pass the payloadSerializer down to this method? 

##########
File path: src/java/org/apache/cassandra/net/RequestCallbacks.java
##########
@@ -116,7 +116,9 @@ public void 
addWithExpiration(AbstractWriteResponseHandler<?> cb,
         assert previous == null : format("Callback already exists for id 
%d/%s! (%s)", message.id(), to.endpoint(), previous);
     }
 
-    <T> IVersionedAsymmetricSerializer<?, T> responseSerializer(long id, 
InetAddressAndPort peer)
+    @Deprecated // for 3.0 compatibility purposes only

Review comment:
       👍 to adding the `@Deprecated`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to