worryg0d commented on code in PR #1773:
URL:
https://github.com/apache/cassandra-gocql-driver/pull/1773#discussion_r1766774653
##########
frame.go:
##########
@@ -2070,3 +2085,251 @@ func (f *framer) writeBytesMap(m map[string][]byte) {
f.writeBytes(v)
}
}
+
+func (f *framer) prepareModernLayout() error {
+ // Ensure protocol version is V5 or higher
+ if f.proto < protoVersion5 {
+ panic("Modern layout is not supported with version V4 or less")
+ }
+
+ selfContained := true
+
+ var (
+ adjustedBuf []byte
+ tempBuf []byte
+ err error
+ )
+
+ // Process the buffer in chunks if it exceeds the max payload size
+ for len(f.buf) > maxPayloadSize {
+ if f.compres != nil {
+ tempBuf, err =
newCompressedFrame(f.buf[:maxPayloadSize], false, f.compres)
+ } else {
+ tempBuf, err =
newUncompressedFrame(f.buf[:maxPayloadSize], false)
+ }
+ if err != nil {
+ return err
+ }
+
+ adjustedBuf = append(adjustedBuf, tempBuf...)
+ f.buf = f.buf[maxPayloadSize:]
+ selfContained = false
+ }
+
+ // Process the remaining buffer
+ if f.compres != nil {
+ tempBuf, err = newCompressedFrame(f.buf, selfContained,
f.compres)
+ } else {
+ tempBuf, err = newUncompressedFrame(f.buf, selfContained)
+ }
+ if err != nil {
+ return err
+ }
+
+ adjustedBuf = append(adjustedBuf, tempBuf...)
+ f.buf = adjustedBuf
+
+ return nil
+}
+
+func readUncompressedFrame(r io.Reader) ([]byte, bool, error) {
+ const headerSize = 6
+ header := [headerSize + 1]byte{}
+
+ // Read the frame header
+ if _, err := io.ReadFull(r, header[:headerSize]); err != nil {
+ return nil, false, fmt.Errorf("gocql: failed to read
uncompressed frame, err: %w", err)
+ }
+
+ // Compute and verify the header CRC24
+ computedHeaderCRC24 := KoopmanChecksum(header[:3])
+ readHeaderCRC24 := binary.LittleEndian.Uint32(header[3:]) & 0xFFFFFF
+ if computedHeaderCRC24 != readHeaderCRC24 {
+ return nil, false, fmt.Errorf("gocql: header crc24 mismatch,
computed: %d, got: %d", computedHeaderCRC24, readHeaderCRC24)
+ }
+
+ // Extract the payload length and self-contained flag
+ headerInt := binary.LittleEndian.Uint32(header[:4])
+ payloadLen := int(headerInt & 0x1FFFF)
+ isSelfContained := (headerInt & (1 << 17)) != 0
+
+ // Read the payload
+ payload := make([]byte, payloadLen)
+ if _, err := io.ReadFull(r, payload); err != nil {
+ return nil, false, fmt.Errorf("gocql: failed to read
uncompressed frame payload, err: %w", err)
+ }
+
+ // Read and verify the payload CRC32
+ if _, err := io.ReadFull(r, header[:4]); err != nil {
+ return nil, false, fmt.Errorf("gocql: failed to read payload
crc32, err: %w", err)
+ }
+
+ computedPayloadCRC32 := ChecksumIEEE(payload)
+ readPayloadCRC32 := binary.LittleEndian.Uint32(header[:4])
+ if computedPayloadCRC32 != readPayloadCRC32 {
+ return nil, false, fmt.Errorf("gocql: payload crc32 mismatch,
computed: %d, got: %d", computedPayloadCRC32, readPayloadCRC32)
+ }
+
+ return payload, isSelfContained, nil
+}
+
+const maxPayloadSize = 128*1024 - 1
+
+func newUncompressedFrame(payload []byte, isSelfContained bool) ([]byte,
error) {
+ const (
+ headerSize = 6
+ selfContainedBit = 1 << 17
+ )
+
+ payloadLen := len(payload)
+ if payloadLen > maxPayloadSize {
+ return nil, fmt.Errorf("payload length (%d) exceeds maximum
size of 128 KiB", payloadLen)
+ }
+
+ header := make([]byte, headerSize)
+
+ // First 3 bytes: payload length and self-contained flag
+ headerInt := uint32(payloadLen) & 0x1FFFF
+ if isSelfContained {
+ headerInt |= selfContainedBit // Set the self-contained flag
+ }
+
+ // Encode the first 3 bytes as a single little-endian integer
+ header[0] = byte(headerInt)
+ header[1] = byte(headerInt >> 8)
+ header[2] = byte(headerInt >> 16)
+
+ // Calculate CRC24 for the first 3 bytes of the header
+ crc := KoopmanChecksum(header[:3])
+
+ // Encode CRC24 into the next 3 bytes of the header
+ header[3] = byte(crc)
+ header[4] = byte(crc >> 8)
+ header[5] = byte(crc >> 16)
+
+ // Create the frame
+ frameSize := headerSize + payloadLen + 4 // 4 bytes for CRC32
+ frame := make([]byte, frameSize)
+ copy(frame, header) // Copy the header to the frame
+ copy(frame[headerSize:], payload) // Copy the payload to the frame
+
+ // Calculate CRC32 for the payload
+ payloadCRC32 := ChecksumIEEE(payload)
+ binary.LittleEndian.PutUint32(frame[headerSize+payloadLen:],
payloadCRC32)
+
+ return frame, nil
+}
+
+func newCompressedFrame(uncompressedPayload []byte, isSelfContained bool,
compressor Compressor) ([]byte, error) {
+ uncompressedLen := len(uncompressedPayload)
+ if uncompressedLen > maxPayloadSize {
+ return nil, fmt.Errorf("uncompressed compressed payload length
exceedes max size of frame payload %d/%d", uncompressedLen, maxPayloadSize)
+ }
+
+ compressedPayload, err := compressor.Encode(uncompressedPayload)
+ if err != nil {
+ return nil, err
+ }
+
+ // Skip the first 4 bytes because the size of the uncompressed payload
is written in the frame header, not in the
+ // body of the compressed envelope
+ compressedPayload = compressedPayload[4:]
+
+ compressedLen := len(compressedPayload)
+
+ // Compression is not worth it
+ if uncompressedLen < compressedLen {
+ // native_protocol_v5.spec
+ // 2.2
+ // An uncompressed length of 0 signals that the compressed
payload
+ // should be used as-is and not decompressed.
+ compressedPayload = uncompressedPayload
Review Comment:
I agree, I added a test that covers this case as well.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]