holdenk commented on code in PR #55028:
URL: https://github.com/apache/spark/pull/55028#discussion_r3493366606


##########
common/network-common/src/main/java/org/apache/spark/network/crypto/GcmTransportCipher.java:
##########
@@ -116,23 +108,40 @@ static class GcmEncryptedMessage extends 
AbstractFileRegion {
         private final long encryptedCount;
 
         GcmEncryptedMessage(AesGcmHkdfStreaming aesGcmHkdfStreaming,
-                            Object plaintextMessage,
-                            ByteBuffer plaintextBuffer,
-                            ByteBuffer ciphertextBuffer) throws 
GeneralSecurityException {
+                            Object plaintextMessage) throws 
GeneralSecurityException {
             JavaUtils.checkArgument(
                     plaintextMessage instanceof ByteBuf || plaintextMessage 
instanceof FileRegion,
                     "Unrecognized message type: %s", 
plaintextMessage.getClass().getName());
             this.plaintextMessage = plaintextMessage;
-            this.plaintextBuffer = plaintextBuffer;
-            this.ciphertextBuffer = ciphertextBuffer;
+            this.plaintextBuffer =
+                
ByteBuffer.allocate(aesGcmHkdfStreaming.getPlaintextSegmentSize());
+            this.ciphertextBuffer =
+                
ByteBuffer.allocate(aesGcmHkdfStreaming.getCiphertextSegmentSize());
             // If the ciphertext buffer cannot be fully written the target, 
transferTo may
             // return with it containing some unwritten data. The initial call 
we'll explicitly
             // set its limit to 0 to indicate the first call to transferTo.
             this.ciphertextBuffer.limit(0);
 
             this.bytesToRead = getReadableBytes();
-            this.encryptedCount =
-                    LENGTH_HEADER_BYTES + 
aesGcmHkdfStreaming.expectedCiphertextSize(bytesToRead);
+            // Tink's expectedCiphertextSize(P) internally adds 
getCiphertextOffset() (the 24-byte
+            // streaming header) to P before computing the segment count:
+            //   fullSegments = (P + getCiphertextOffset()) / 
plaintextSegmentSize
+            // This formula counts the header as occupying capacity in the 
first segment, so the
+            // effective plaintext capacity of segment 0 is 
(plaintextSegmentSize -
+            // getCiphertextOffset()) = 32728 bytes rather than 32752.
+            //
+            // However, transferTo() writes the streaming header separately 
(via headerByteBuffer)
+            // and passes all P bytes to encryptSegment() calls. For P in 
(32728, 32752], Tink's
+            // formula predicts two ciphertext segments but transferTo() 
produces only one,
+            // inflating encryptedCount by TAG_SIZE_IN_BYTES (16 bytes). The 
receiver then waits
+            // indefinitely for 16 bytes that were never written, causing a 
shuffle fetch stall.
+            //
+            // Fix: subtract getCiphertextOffset() before calling 
expectedCiphertextSize(), then
+            // add getHeaderLength() explicitly to account for the 
separately-written header.
+            this.encryptedCount = LENGTH_HEADER_BYTES
+                    + aesGcmHkdfStreaming.getHeaderLength()
+                    + aesGcmHkdfStreaming.expectedCiphertextSize(
+                            bytesToRead - 
aesGcmHkdfStreaming.getCiphertextOffset());

Review Comment:
   what is the legnth header bytes for then? I am confused why we're adding two 
different headers and its cryptography so I want to be more sure.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to