joao-r-reis commented on code in PR #1822:
URL: 
https://github.com/apache/cassandra-gocql-driver/pull/1822#discussion_r1806337865


##########
conn.go:
##########
@@ -1756,6 +1888,41 @@ func (c *Conn) awaitSchemaAgreement(ctx context.Context) 
(err error) {
        return fmt.Errorf("gocql: cluster schema versions not consistent: %+v", 
schemas)
 }
 
+// recvLastsFrames reads proto v5 frames from Conn.r and writes decoded 
payload to dst.
+// It reads data until the bytesToRead is reached.
+// If Conn.compressor is not nil, it processes Compressed Format frames.
+func (c *Conn) recvLastsFrames(dst *bytes.Buffer, bytesToRead int) error {
+       var read int
+       var segment []byte
+       var err error
+       for read != bytesToRead {
+               // Read frame based on compression
+               if c.compressor != nil {
+                       segment, _, err = readCompressedFrame(c.r, c.compressor)
+               } else {
+                       segment, _, err = readUncompressedFrame(c.r)
+               }
+               if err != nil {
+                       return fmt.Errorf("gocql: failed to read non 
self-contained frame: %w", err)
+               }
+
+               // Write the segment to the destination writer
+               n, _ := dst.Write(segment)
+               read += n

Review Comment:
   We're assuming here that this segment contains a single partial envelope 
since we're copying the entire payload into the buffer without any checks. I 
think this is a correct assumption but I still think it wouldn't hurt to check 
that we're not adding more bytes to the buffer than what we expect. If this 
check fails we can return a custom protocol error so `go.serve` logs and closes 
the connection.



##########
conn.go:
##########
@@ -1756,6 +1888,41 @@ func (c *Conn) awaitSchemaAgreement(ctx context.Context) 
(err error) {
        return fmt.Errorf("gocql: cluster schema versions not consistent: %+v", 
schemas)
 }
 
+// recvLastsFrames reads proto v5 frames from Conn.r and writes decoded 
payload to dst.
+// It reads data until the bytesToRead is reached.
+// If Conn.compressor is not nil, it processes Compressed Format frames.
+func (c *Conn) recvLastsFrames(dst *bytes.Buffer, bytesToRead int) error {
+       var read int
+       var segment []byte
+       var err error
+       for read != bytesToRead {
+               // Read frame based on compression
+               if c.compressor != nil {
+                       segment, _, err = readCompressedFrame(c.r, c.compressor)

Review Comment:
   NIT: rename `segment` to `payload` or `segmentPayload` or even `frame` to 
clarify that this byte slice only contains the payload (frame in old notation / 
envelope in new notation) and not necessarily the entire segment



##########
frame.go:
##########
@@ -2070,3 +2133,247 @@ func (f *framer) writeBytesMap(m map[string][]byte) {
                f.writeBytes(v)
        }
 }
+
+func (f *framer) prepareModernLayout() error {
+       // Ensure protocol version is V5 or higher
+       if f.proto < protoVersion5 {
+               panic("Modern layout is not supported with version V4 or less")
+       }
+
+       selfContained := true
+
+       var (
+               adjustedBuf []byte
+               tempBuf     []byte
+               err         error
+       )
+
+       // Process the buffer in chunks if it exceeds the max payload size
+       for len(f.buf) > maxPayloadSize {
+               if f.compres != nil {
+                       tempBuf, err = 
newCompressedFrame(f.buf[:maxPayloadSize], false, f.compres)
+               } else {
+                       tempBuf, err = 
newUncompressedFrame(f.buf[:maxPayloadSize], false)
+               }
+               if err != nil {
+                       return err
+               }
+
+               adjustedBuf = append(adjustedBuf, tempBuf...)
+               f.buf = f.buf[maxPayloadSize:]
+               selfContained = false
+       }
+
+       // Process the remaining buffer
+       if f.compres != nil {
+               tempBuf, err = newCompressedFrame(f.buf, selfContained, 
f.compres)
+       } else {
+               tempBuf, err = newUncompressedFrame(f.buf, selfContained)
+       }
+       if err != nil {
+               return err
+       }
+
+       adjustedBuf = append(adjustedBuf, tempBuf...)
+       f.buf = adjustedBuf
+
+       return nil
+}
+
+func readUncompressedFrame(r io.Reader) ([]byte, bool, error) {
+       const headerSize = 6
+       header := [headerSize + 1]byte{}
+
+       // Read the frame header
+       if _, err := io.ReadFull(r, header[:headerSize]); err != nil {
+               return nil, false, fmt.Errorf("gocql: failed to read 
uncompressed frame, err: %w", err)
+       }
+
+       // Compute and verify the header CRC24
+       computedHeaderCRC24 := KoopmanChecksum(header[:3])
+       readHeaderCRC24 := binary.LittleEndian.Uint32(header[3:]) & 0xFFFFFF
+       if computedHeaderCRC24 != readHeaderCRC24 {
+               return nil, false, fmt.Errorf("gocql: crc24 mismatch in frame 
header, computed: %d, got: %d", computedHeaderCRC24, readHeaderCRC24)
+       }
+
+       // Extract the payload length and self-contained flag
+       headerInt := binary.LittleEndian.Uint32(header[:4])
+       payloadLen := int(headerInt & 0x1FFFF)
+       isSelfContained := (headerInt & (1 << 17)) != 0
+
+       // Read the payload
+       payload := make([]byte, payloadLen)
+       if _, err := io.ReadFull(r, payload); err != nil {
+               return nil, false, fmt.Errorf("gocql: failed to read 
uncompressed frame payload, err: %w", err)
+       }
+
+       // Read and verify the payload CRC32
+       if _, err := io.ReadFull(r, header[:4]); err != nil {
+               return nil, false, fmt.Errorf("gocql: failed to read payload 
crc32, err: %w", err)
+       }
+
+       computedPayloadCRC32 := ChecksumIEEE(payload)
+       readPayloadCRC32 := binary.LittleEndian.Uint32(header[:4])
+       if computedPayloadCRC32 != readPayloadCRC32 {
+               return nil, false, fmt.Errorf("gocql: payload crc32 mismatch, 
computed: %d, got: %d", computedPayloadCRC32, readPayloadCRC32)
+       }
+
+       return payload, isSelfContained, nil
+}
+
+func newUncompressedFrame(payload []byte, isSelfContained bool) ([]byte, 
error) {

Review Comment:
   rename to `newUncompressedSegment`



##########
frame.go:
##########
@@ -2070,3 +2133,247 @@ func (f *framer) writeBytesMap(m map[string][]byte) {
                f.writeBytes(v)
        }
 }
+
+func (f *framer) prepareModernLayout() error {
+       // Ensure protocol version is V5 or higher
+       if f.proto < protoVersion5 {
+               panic("Modern layout is not supported with version V4 or less")
+       }
+
+       selfContained := true
+
+       var (
+               adjustedBuf []byte
+               tempBuf     []byte
+               err         error
+       )
+
+       // Process the buffer in chunks if it exceeds the max payload size
+       for len(f.buf) > maxPayloadSize {
+               if f.compres != nil {
+                       tempBuf, err = 
newCompressedFrame(f.buf[:maxPayloadSize], false, f.compres)
+               } else {
+                       tempBuf, err = 
newUncompressedFrame(f.buf[:maxPayloadSize], false)
+               }
+               if err != nil {
+                       return err
+               }
+
+               adjustedBuf = append(adjustedBuf, tempBuf...)
+               f.buf = f.buf[maxPayloadSize:]
+               selfContained = false
+       }
+
+       // Process the remaining buffer
+       if f.compres != nil {
+               tempBuf, err = newCompressedFrame(f.buf, selfContained, 
f.compres)
+       } else {
+               tempBuf, err = newUncompressedFrame(f.buf, selfContained)
+       }
+       if err != nil {
+               return err
+       }
+
+       adjustedBuf = append(adjustedBuf, tempBuf...)
+       f.buf = adjustedBuf
+
+       return nil
+}
+
+func readUncompressedFrame(r io.Reader) ([]byte, bool, error) {
+       const headerSize = 6
+       header := [headerSize + 1]byte{}
+
+       // Read the frame header
+       if _, err := io.ReadFull(r, header[:headerSize]); err != nil {
+               return nil, false, fmt.Errorf("gocql: failed to read 
uncompressed frame, err: %w", err)
+       }
+
+       // Compute and verify the header CRC24
+       computedHeaderCRC24 := KoopmanChecksum(header[:3])
+       readHeaderCRC24 := binary.LittleEndian.Uint32(header[3:]) & 0xFFFFFF
+       if computedHeaderCRC24 != readHeaderCRC24 {
+               return nil, false, fmt.Errorf("gocql: crc24 mismatch in frame 
header, computed: %d, got: %d", computedHeaderCRC24, readHeaderCRC24)
+       }
+
+       // Extract the payload length and self-contained flag
+       headerInt := binary.LittleEndian.Uint32(header[:4])
+       payloadLen := int(headerInt & 0x1FFFF)
+       isSelfContained := (headerInt & (1 << 17)) != 0
+
+       // Read the payload
+       payload := make([]byte, payloadLen)
+       if _, err := io.ReadFull(r, payload); err != nil {
+               return nil, false, fmt.Errorf("gocql: failed to read 
uncompressed frame payload, err: %w", err)
+       }
+
+       // Read and verify the payload CRC32
+       if _, err := io.ReadFull(r, header[:4]); err != nil {
+               return nil, false, fmt.Errorf("gocql: failed to read payload 
crc32, err: %w", err)
+       }
+
+       computedPayloadCRC32 := ChecksumIEEE(payload)
+       readPayloadCRC32 := binary.LittleEndian.Uint32(header[:4])
+       if computedPayloadCRC32 != readPayloadCRC32 {
+               return nil, false, fmt.Errorf("gocql: payload crc32 mismatch, 
computed: %d, got: %d", computedPayloadCRC32, readPayloadCRC32)
+       }
+
+       return payload, isSelfContained, nil
+}
+
+func newUncompressedFrame(payload []byte, isSelfContained bool) ([]byte, 
error) {
+       const (
+               headerSize       = 6
+               selfContainedBit = 1 << 17
+       )
+
+       payloadLen := len(payload)
+       if payloadLen > maxPayloadSize {
+               return nil, fmt.Errorf("payload length (%d) exceeds maximum 
size of 128 KiB", payloadLen)
+       }
+
+       // Create the frame
+       frameSize := headerSize + payloadLen + 4 // 4 bytes for CRC32
+       frame := make([]byte, frameSize)
+
+       // First 3 bytes: payload length and self-contained flag
+       headerInt := uint32(payloadLen) & 0x1FFFF
+       if isSelfContained {
+               headerInt |= selfContainedBit // Set the self-contained flag
+       }
+
+       // Encode the first 3 bytes as a single little-endian integer
+       frame[0] = byte(headerInt)
+       frame[1] = byte(headerInt >> 8)
+       frame[2] = byte(headerInt >> 16)
+
+       // Calculate CRC24 for the first 3 bytes of the header
+       crc := KoopmanChecksum(frame[:3])
+
+       // Encode CRC24 into the next 3 bytes of the header
+       frame[3] = byte(crc)
+       frame[4] = byte(crc >> 8)
+       frame[5] = byte(crc >> 16)
+
+       copy(frame[headerSize:], payload) // Copy the payload to the frame
+
+       // Calculate CRC32 for the payload
+       payloadCRC32 := ChecksumIEEE(payload)
+       binary.LittleEndian.PutUint32(frame[headerSize+payloadLen:], 
payloadCRC32)
+
+       return frame, nil
+}
+
+func newCompressedFrame(uncompressedPayload []byte, isSelfContained bool, 
compressor Compressor) ([]byte, error) {

Review Comment:
   rename to `newCompressedSegment`



##########
frame.go:
##########
@@ -2070,3 +2133,247 @@ func (f *framer) writeBytesMap(m map[string][]byte) {
                f.writeBytes(v)
        }
 }
+
+func (f *framer) prepareModernLayout() error {
+       // Ensure protocol version is V5 or higher
+       if f.proto < protoVersion5 {
+               panic("Modern layout is not supported with version V4 or less")
+       }
+
+       selfContained := true
+
+       var (
+               adjustedBuf []byte
+               tempBuf     []byte
+               err         error
+       )
+
+       // Process the buffer in chunks if it exceeds the max payload size
+       for len(f.buf) > maxPayloadSize {
+               if f.compres != nil {
+                       tempBuf, err = 
newCompressedFrame(f.buf[:maxPayloadSize], false, f.compres)
+               } else {
+                       tempBuf, err = 
newUncompressedFrame(f.buf[:maxPayloadSize], false)
+               }
+               if err != nil {
+                       return err
+               }
+
+               adjustedBuf = append(adjustedBuf, tempBuf...)
+               f.buf = f.buf[maxPayloadSize:]
+               selfContained = false
+       }
+
+       // Process the remaining buffer
+       if f.compres != nil {
+               tempBuf, err = newCompressedFrame(f.buf, selfContained, 
f.compres)
+       } else {
+               tempBuf, err = newUncompressedFrame(f.buf, selfContained)
+       }
+       if err != nil {
+               return err
+       }
+
+       adjustedBuf = append(adjustedBuf, tempBuf...)
+       f.buf = adjustedBuf
+
+       return nil
+}
+
+func readUncompressedFrame(r io.Reader) ([]byte, bool, error) {
+       const headerSize = 6
+       header := [headerSize + 1]byte{}
+
+       // Read the frame header
+       if _, err := io.ReadFull(r, header[:headerSize]); err != nil {
+               return nil, false, fmt.Errorf("gocql: failed to read 
uncompressed frame, err: %w", err)
+       }
+
+       // Compute and verify the header CRC24
+       computedHeaderCRC24 := KoopmanChecksum(header[:3])
+       readHeaderCRC24 := binary.LittleEndian.Uint32(header[3:]) & 0xFFFFFF
+       if computedHeaderCRC24 != readHeaderCRC24 {
+               return nil, false, fmt.Errorf("gocql: crc24 mismatch in frame 
header, computed: %d, got: %d", computedHeaderCRC24, readHeaderCRC24)
+       }
+
+       // Extract the payload length and self-contained flag
+       headerInt := binary.LittleEndian.Uint32(header[:4])

Review Comment:
   Header size is only 3 bytes but we're passing a 4 byte slice which is 
confusing. 
   
   `headerInt := uint32(header[0]) | uint32(header[1])<<8 | 
uint32(header[2])<<16`



##########
frame.go:
##########
@@ -2070,3 +2133,247 @@ func (f *framer) writeBytesMap(m map[string][]byte) {
                f.writeBytes(v)
        }
 }
+
+func (f *framer) prepareModernLayout() error {
+       // Ensure protocol version is V5 or higher
+       if f.proto < protoVersion5 {
+               panic("Modern layout is not supported with version V4 or less")
+       }
+
+       selfContained := true
+
+       var (
+               adjustedBuf []byte
+               tempBuf     []byte
+               err         error
+       )
+
+       // Process the buffer in chunks if it exceeds the max payload size
+       for len(f.buf) > maxPayloadSize {
+               if f.compres != nil {
+                       tempBuf, err = 
newCompressedFrame(f.buf[:maxPayloadSize], false, f.compres)
+               } else {
+                       tempBuf, err = 
newUncompressedFrame(f.buf[:maxPayloadSize], false)
+               }
+               if err != nil {
+                       return err
+               }
+
+               adjustedBuf = append(adjustedBuf, tempBuf...)
+               f.buf = f.buf[maxPayloadSize:]
+               selfContained = false
+       }
+
+       // Process the remaining buffer
+       if f.compres != nil {
+               tempBuf, err = newCompressedFrame(f.buf, selfContained, 
f.compres)
+       } else {
+               tempBuf, err = newUncompressedFrame(f.buf, selfContained)
+       }
+       if err != nil {
+               return err
+       }
+
+       adjustedBuf = append(adjustedBuf, tempBuf...)
+       f.buf = adjustedBuf
+
+       return nil
+}
+
+func readUncompressedFrame(r io.Reader) ([]byte, bool, error) {
+       const headerSize = 6
+       header := [headerSize + 1]byte{}
+
+       // Read the frame header
+       if _, err := io.ReadFull(r, header[:headerSize]); err != nil {
+               return nil, false, fmt.Errorf("gocql: failed to read 
uncompressed frame, err: %w", err)
+       }
+
+       // Compute and verify the header CRC24
+       computedHeaderCRC24 := KoopmanChecksum(header[:3])

Review Comment:
   NIT: `KoopmanChecksum(header[:headerSize])`



##########
frame.go:
##########
@@ -2070,3 +2133,247 @@ func (f *framer) writeBytesMap(m map[string][]byte) {
                f.writeBytes(v)
        }
 }
+
+func (f *framer) prepareModernLayout() error {
+       // Ensure protocol version is V5 or higher
+       if f.proto < protoVersion5 {
+               panic("Modern layout is not supported with version V4 or less")
+       }
+
+       selfContained := true
+
+       var (
+               adjustedBuf []byte
+               tempBuf     []byte
+               err         error
+       )
+
+       // Process the buffer in chunks if it exceeds the max payload size
+       for len(f.buf) > maxPayloadSize {
+               if f.compres != nil {
+                       tempBuf, err = 
newCompressedFrame(f.buf[:maxPayloadSize], false, f.compres)
+               } else {
+                       tempBuf, err = 
newUncompressedFrame(f.buf[:maxPayloadSize], false)
+               }
+               if err != nil {
+                       return err
+               }
+
+               adjustedBuf = append(adjustedBuf, tempBuf...)
+               f.buf = f.buf[maxPayloadSize:]
+               selfContained = false
+       }
+
+       // Process the remaining buffer
+       if f.compres != nil {
+               tempBuf, err = newCompressedFrame(f.buf, selfContained, 
f.compres)
+       } else {
+               tempBuf, err = newUncompressedFrame(f.buf, selfContained)
+       }
+       if err != nil {
+               return err
+       }
+
+       adjustedBuf = append(adjustedBuf, tempBuf...)
+       f.buf = adjustedBuf
+
+       return nil
+}
+
+func readUncompressedFrame(r io.Reader) ([]byte, bool, error) {
+       const headerSize = 6
+       header := [headerSize + 1]byte{}
+
+       // Read the frame header
+       if _, err := io.ReadFull(r, header[:headerSize]); err != nil {
+               return nil, false, fmt.Errorf("gocql: failed to read 
uncompressed frame, err: %w", err)
+       }
+
+       // Compute and verify the header CRC24
+       computedHeaderCRC24 := KoopmanChecksum(header[:3])
+       readHeaderCRC24 := binary.LittleEndian.Uint32(header[3:]) & 0xFFFFFF
+       if computedHeaderCRC24 != readHeaderCRC24 {
+               return nil, false, fmt.Errorf("gocql: crc24 mismatch in frame 
header, computed: %d, got: %d", computedHeaderCRC24, readHeaderCRC24)
+       }
+
+       // Extract the payload length and self-contained flag
+       headerInt := binary.LittleEndian.Uint32(header[:4])
+       payloadLen := int(headerInt & 0x1FFFF)
+       isSelfContained := (headerInt & (1 << 17)) != 0
+
+       // Read the payload
+       payload := make([]byte, payloadLen)
+       if _, err := io.ReadFull(r, payload); err != nil {
+               return nil, false, fmt.Errorf("gocql: failed to read 
uncompressed frame payload, err: %w", err)
+       }
+
+       // Read and verify the payload CRC32
+       if _, err := io.ReadFull(r, header[:4]); err != nil {
+               return nil, false, fmt.Errorf("gocql: failed to read payload 
crc32, err: %w", err)
+       }
+
+       computedPayloadCRC32 := ChecksumIEEE(payload)
+       readPayloadCRC32 := binary.LittleEndian.Uint32(header[:4])
+       if computedPayloadCRC32 != readPayloadCRC32 {
+               return nil, false, fmt.Errorf("gocql: payload crc32 mismatch, 
computed: %d, got: %d", computedPayloadCRC32, readPayloadCRC32)
+       }
+
+       return payload, isSelfContained, nil
+}
+
+func newUncompressedFrame(payload []byte, isSelfContained bool) ([]byte, 
error) {
+       const (
+               headerSize       = 6
+               selfContainedBit = 1 << 17
+       )
+
+       payloadLen := len(payload)
+       if payloadLen > maxPayloadSize {
+               return nil, fmt.Errorf("payload length (%d) exceeds maximum 
size of 128 KiB", payloadLen)
+       }
+
+       // Create the frame
+       frameSize := headerSize + payloadLen + 4 // 4 bytes for CRC32
+       frame := make([]byte, frameSize)
+
+       // First 3 bytes: payload length and self-contained flag
+       headerInt := uint32(payloadLen) & 0x1FFFF
+       if isSelfContained {
+               headerInt |= selfContainedBit // Set the self-contained flag
+       }
+
+       // Encode the first 3 bytes as a single little-endian integer
+       frame[0] = byte(headerInt)
+       frame[1] = byte(headerInt >> 8)
+       frame[2] = byte(headerInt >> 16)
+
+       // Calculate CRC24 for the first 3 bytes of the header
+       crc := KoopmanChecksum(frame[:3])
+
+       // Encode CRC24 into the next 3 bytes of the header
+       frame[3] = byte(crc)
+       frame[4] = byte(crc >> 8)
+       frame[5] = byte(crc >> 16)
+
+       copy(frame[headerSize:], payload) // Copy the payload to the frame
+
+       // Calculate CRC32 for the payload
+       payloadCRC32 := ChecksumIEEE(payload)
+       binary.LittleEndian.PutUint32(frame[headerSize+payloadLen:], 
payloadCRC32)
+
+       return frame, nil
+}
+
+func newCompressedFrame(uncompressedPayload []byte, isSelfContained bool, 
compressor Compressor) ([]byte, error) {
+       uncompressedLen := len(uncompressedPayload)
+       if uncompressedLen > maxPayloadSize {
+               return nil, fmt.Errorf("uncompressed compressed payload length 
exceedes max size of frame payload %d/%d", uncompressedLen, maxPayloadSize)
+       }
+
+       compressedPayload, err := compressor.Encode(uncompressedPayload)
+       if err != nil {
+               return nil, err
+       }
+
+       // Skip the first 4 bytes because the size of the uncompressed payload 
is written in the frame header, not in the
+       // body of the compressed envelope
+       compressedPayload = compressedPayload[4:]
+
+       compressedLen := len(compressedPayload)
+
+       // Compression is not worth it
+       if uncompressedLen < compressedLen {
+               // native_protocol_v5.spec
+               // 2.2
+               //  An uncompressed length of 0 signals that the compressed 
payload
+               //  should be used as-is and not decompressed.
+               compressedPayload = uncompressedPayload
+               compressedLen = uncompressedLen
+               uncompressedLen = 0
+       }
+
+       // Combine compressed and uncompressed lengths and set the 
self-contained flag if needed
+       combined := uint64(compressedLen) | uint64(uncompressedLen)<<17
+       if isSelfContained {
+               combined |= 1 << 34
+       }
+
+       var headerBuf [8]byte
+
+       // Write the combined value into the header buffer
+       binary.LittleEndian.PutUint64(headerBuf[:], combined)
+
+       // Create a buffer with enough capacity to hold the header, compressed 
payload, and checksums
+       buf := bytes.NewBuffer(make([]byte, 0, 8+compressedLen+4))
+
+       // Write the first 5 bytes of the header (compressed and uncompressed 
sizes)
+       buf.Write(headerBuf[:5])
+
+       // Compute and write the CRC24 checksum of the first 5 bytes
+       headerChecksum := KoopmanChecksum(headerBuf[:5])
+       binary.LittleEndian.PutUint32(headerBuf[:], headerChecksum)
+       buf.Write(headerBuf[:3])
+       buf.Write(compressedPayload)
+
+       // Compute and write the CRC32 checksum of the payload
+       payloadChecksum := ChecksumIEEE(compressedPayload)
+       binary.LittleEndian.PutUint32(headerBuf[:], payloadChecksum)
+       buf.Write(headerBuf[:4])
+
+       return buf.Bytes(), nil
+}
+
+func readCompressedFrame(r io.Reader, compressor Compressor) ([]byte, bool, 
error) {
+       var (
+               headerBuf [8]byte
+               err       error
+       )
+
+       if _, err = io.ReadFull(r, headerBuf[:]); err != nil {
+               return nil, false, err
+       }
+
+       // Reading checksum from frame header
+       readHeaderChecksum := uint32(headerBuf[5]) | uint32(headerBuf[6])<<8 | 
uint32(headerBuf[7])<<16
+       if computedHeaderChecksum := KoopmanChecksum(headerBuf[:5]); 
computedHeaderChecksum != readHeaderChecksum {
+               return nil, false, fmt.Errorf("gocql: crc24 mismatch in frame 
header, read: %d, computed: %d", readHeaderChecksum, computedHeaderChecksum)
+       }
+
+       // First 17 bits - payload size after compression
+       compressedLen := uint32(headerBuf[0]) | uint32(headerBuf[1])<<8 | 
uint32(headerBuf[2]&0x1)<<16
+
+       // The next 17 bits - payload size before compression
+       uncompressedLen := (uint32(headerBuf[2]) >> 1) | 
uint32(headerBuf[3])<<7 | uint32(headerBuf[4]&0b11)<<15
+
+       // Self-contained flag
+       selfContained := (headerBuf[4] & 0b100) != 0
+
+       compressedPayload := make([]byte, compressedLen)
+       if _, err = io.ReadFull(r, compressedPayload); err != nil {
+               return nil, false, fmt.Errorf("gocql: failed to read compressed 
frame payload, err: %w", err)
+       }
+
+       if _, err = io.ReadFull(r, headerBuf[:4]); err != nil {

Review Comment:
   NIT: `io.ReadFull(r, headerBuf[:crc32Size])`



##########
frame.go:
##########
@@ -2070,3 +2133,247 @@ func (f *framer) writeBytesMap(m map[string][]byte) {
                f.writeBytes(v)
        }
 }
+
+func (f *framer) prepareModernLayout() error {
+       // Ensure protocol version is V5 or higher
+       if f.proto < protoVersion5 {
+               panic("Modern layout is not supported with version V4 or less")
+       }
+
+       selfContained := true
+
+       var (
+               adjustedBuf []byte
+               tempBuf     []byte
+               err         error
+       )
+
+       // Process the buffer in chunks if it exceeds the max payload size
+       for len(f.buf) > maxPayloadSize {
+               if f.compres != nil {
+                       tempBuf, err = 
newCompressedFrame(f.buf[:maxPayloadSize], false, f.compres)
+               } else {
+                       tempBuf, err = 
newUncompressedFrame(f.buf[:maxPayloadSize], false)
+               }
+               if err != nil {
+                       return err
+               }
+
+               adjustedBuf = append(adjustedBuf, tempBuf...)
+               f.buf = f.buf[maxPayloadSize:]
+               selfContained = false
+       }
+
+       // Process the remaining buffer
+       if f.compres != nil {
+               tempBuf, err = newCompressedFrame(f.buf, selfContained, 
f.compres)
+       } else {
+               tempBuf, err = newUncompressedFrame(f.buf, selfContained)
+       }
+       if err != nil {
+               return err
+       }
+
+       adjustedBuf = append(adjustedBuf, tempBuf...)
+       f.buf = adjustedBuf
+
+       return nil
+}
+
+func readUncompressedFrame(r io.Reader) ([]byte, bool, error) {
+       const headerSize = 6

Review Comment:
   NIT: I'd say put this as `3` and add a constant for `crc24Size`



##########
frame.go:
##########
@@ -2070,3 +2133,247 @@ func (f *framer) writeBytesMap(m map[string][]byte) {
                f.writeBytes(v)
        }
 }
+
+func (f *framer) prepareModernLayout() error {
+       // Ensure protocol version is V5 or higher
+       if f.proto < protoVersion5 {
+               panic("Modern layout is not supported with version V4 or less")
+       }
+
+       selfContained := true
+
+       var (
+               adjustedBuf []byte
+               tempBuf     []byte
+               err         error
+       )
+
+       // Process the buffer in chunks if it exceeds the max payload size
+       for len(f.buf) > maxPayloadSize {
+               if f.compres != nil {
+                       tempBuf, err = 
newCompressedFrame(f.buf[:maxPayloadSize], false, f.compres)
+               } else {
+                       tempBuf, err = 
newUncompressedFrame(f.buf[:maxPayloadSize], false)
+               }
+               if err != nil {
+                       return err
+               }
+
+               adjustedBuf = append(adjustedBuf, tempBuf...)
+               f.buf = f.buf[maxPayloadSize:]
+               selfContained = false
+       }
+
+       // Process the remaining buffer
+       if f.compres != nil {
+               tempBuf, err = newCompressedFrame(f.buf, selfContained, 
f.compres)
+       } else {
+               tempBuf, err = newUncompressedFrame(f.buf, selfContained)
+       }
+       if err != nil {
+               return err
+       }
+
+       adjustedBuf = append(adjustedBuf, tempBuf...)
+       f.buf = adjustedBuf
+
+       return nil
+}
+
+func readUncompressedFrame(r io.Reader) ([]byte, bool, error) {
+       const headerSize = 6
+       header := [headerSize + 1]byte{}
+
+       // Read the frame header
+       if _, err := io.ReadFull(r, header[:headerSize]); err != nil {
+               return nil, false, fmt.Errorf("gocql: failed to read 
uncompressed frame, err: %w", err)
+       }
+
+       // Compute and verify the header CRC24
+       computedHeaderCRC24 := KoopmanChecksum(header[:3])
+       readHeaderCRC24 := binary.LittleEndian.Uint32(header[3:]) & 0xFFFFFF
+       if computedHeaderCRC24 != readHeaderCRC24 {
+               return nil, false, fmt.Errorf("gocql: crc24 mismatch in frame 
header, computed: %d, got: %d", computedHeaderCRC24, readHeaderCRC24)
+       }
+
+       // Extract the payload length and self-contained flag
+       headerInt := binary.LittleEndian.Uint32(header[:4])
+       payloadLen := int(headerInt & 0x1FFFF)
+       isSelfContained := (headerInt & (1 << 17)) != 0
+
+       // Read the payload
+       payload := make([]byte, payloadLen)
+       if _, err := io.ReadFull(r, payload); err != nil {
+               return nil, false, fmt.Errorf("gocql: failed to read 
uncompressed frame payload, err: %w", err)
+       }
+
+       // Read and verify the payload CRC32
+       if _, err := io.ReadFull(r, header[:4]); err != nil {
+               return nil, false, fmt.Errorf("gocql: failed to read payload 
crc32, err: %w", err)
+       }
+
+       computedPayloadCRC32 := ChecksumIEEE(payload)
+       readPayloadCRC32 := binary.LittleEndian.Uint32(header[:4])
+       if computedPayloadCRC32 != readPayloadCRC32 {
+               return nil, false, fmt.Errorf("gocql: payload crc32 mismatch, 
computed: %d, got: %d", computedPayloadCRC32, readPayloadCRC32)
+       }
+
+       return payload, isSelfContained, nil
+}
+
+func newUncompressedFrame(payload []byte, isSelfContained bool) ([]byte, 
error) {
+       const (
+               headerSize       = 6
+               selfContainedBit = 1 << 17
+       )
+
+       payloadLen := len(payload)
+       if payloadLen > maxPayloadSize {
+               return nil, fmt.Errorf("payload length (%d) exceeds maximum 
size of 128 KiB", payloadLen)
+       }
+
+       // Create the frame
+       frameSize := headerSize + payloadLen + 4 // 4 bytes for CRC32
+       frame := make([]byte, frameSize)
+
+       // First 3 bytes: payload length and self-contained flag
+       headerInt := uint32(payloadLen) & 0x1FFFF
+       if isSelfContained {
+               headerInt |= selfContainedBit // Set the self-contained flag
+       }
+
+       // Encode the first 3 bytes as a single little-endian integer
+       frame[0] = byte(headerInt)
+       frame[1] = byte(headerInt >> 8)
+       frame[2] = byte(headerInt >> 16)
+
+       // Calculate CRC24 for the first 3 bytes of the header
+       crc := KoopmanChecksum(frame[:3])
+
+       // Encode CRC24 into the next 3 bytes of the header
+       frame[3] = byte(crc)
+       frame[4] = byte(crc >> 8)
+       frame[5] = byte(crc >> 16)
+
+       copy(frame[headerSize:], payload) // Copy the payload to the frame
+
+       // Calculate CRC32 for the payload
+       payloadCRC32 := ChecksumIEEE(payload)
+       binary.LittleEndian.PutUint32(frame[headerSize+payloadLen:], 
payloadCRC32)
+
+       return frame, nil
+}
+
+func newCompressedFrame(uncompressedPayload []byte, isSelfContained bool, 
compressor Compressor) ([]byte, error) {
+       uncompressedLen := len(uncompressedPayload)
+       if uncompressedLen > maxPayloadSize {
+               return nil, fmt.Errorf("uncompressed compressed payload length 
exceedes max size of frame payload %d/%d", uncompressedLen, maxPayloadSize)
+       }
+
+       compressedPayload, err := compressor.Encode(uncompressedPayload)
+       if err != nil {
+               return nil, err
+       }
+
+       // Skip the first 4 bytes because the size of the uncompressed payload 
is written in the frame header, not in the
+       // body of the compressed envelope
+       compressedPayload = compressedPayload[4:]
+
+       compressedLen := len(compressedPayload)
+
+       // Compression is not worth it
+       if uncompressedLen < compressedLen {
+               // native_protocol_v5.spec
+               // 2.2
+               //  An uncompressed length of 0 signals that the compressed 
payload
+               //  should be used as-is and not decompressed.
+               compressedPayload = uncompressedPayload
+               compressedLen = uncompressedLen
+               uncompressedLen = 0
+       }
+
+       // Combine compressed and uncompressed lengths and set the 
self-contained flag if needed
+       combined := uint64(compressedLen) | uint64(uncompressedLen)<<17
+       if isSelfContained {
+               combined |= 1 << 34
+       }
+
+       var headerBuf [8]byte
+
+       // Write the combined value into the header buffer
+       binary.LittleEndian.PutUint64(headerBuf[:], combined)
+
+       // Create a buffer with enough capacity to hold the header, compressed 
payload, and checksums
+       buf := bytes.NewBuffer(make([]byte, 0, 8+compressedLen+4))
+
+       // Write the first 5 bytes of the header (compressed and uncompressed 
sizes)
+       buf.Write(headerBuf[:5])
+
+       // Compute and write the CRC24 checksum of the first 5 bytes
+       headerChecksum := KoopmanChecksum(headerBuf[:5])
+       binary.LittleEndian.PutUint32(headerBuf[:], headerChecksum)
+       buf.Write(headerBuf[:3])

Review Comment:
   Hmm this works but it is a bit confusing (`PutUint32` but then we only write 
24 bits). How about:
   ```
   // LittleEndian 3 bytes
   headerBuf[0] = byte(headerChecksum)
   headerBuf[1] = byte(headerChecksum >> 8)
   headerBuf[2] = byte(headerChecksum >> 16)
   buf.Write(headerBuf[:3])
   ```



##########
frame.go:
##########
@@ -2070,3 +2133,247 @@ func (f *framer) writeBytesMap(m map[string][]byte) {
                f.writeBytes(v)
        }
 }
+
+func (f *framer) prepareModernLayout() error {
+       // Ensure protocol version is V5 or higher
+       if f.proto < protoVersion5 {
+               panic("Modern layout is not supported with version V4 or less")
+       }
+
+       selfContained := true
+
+       var (
+               adjustedBuf []byte
+               tempBuf     []byte
+               err         error
+       )
+
+       // Process the buffer in chunks if it exceeds the max payload size
+       for len(f.buf) > maxPayloadSize {
+               if f.compres != nil {
+                       tempBuf, err = 
newCompressedFrame(f.buf[:maxPayloadSize], false, f.compres)
+               } else {
+                       tempBuf, err = 
newUncompressedFrame(f.buf[:maxPayloadSize], false)
+               }
+               if err != nil {
+                       return err
+               }
+
+               adjustedBuf = append(adjustedBuf, tempBuf...)
+               f.buf = f.buf[maxPayloadSize:]
+               selfContained = false
+       }
+
+       // Process the remaining buffer
+       if f.compres != nil {
+               tempBuf, err = newCompressedFrame(f.buf, selfContained, 
f.compres)
+       } else {
+               tempBuf, err = newUncompressedFrame(f.buf, selfContained)
+       }
+       if err != nil {
+               return err
+       }
+
+       adjustedBuf = append(adjustedBuf, tempBuf...)
+       f.buf = adjustedBuf
+
+       return nil
+}
+
+func readUncompressedFrame(r io.Reader) ([]byte, bool, error) {
+       const headerSize = 6
+       header := [headerSize + 1]byte{}
+
+       // Read the frame header
+       if _, err := io.ReadFull(r, header[:headerSize]); err != nil {
+               return nil, false, fmt.Errorf("gocql: failed to read 
uncompressed frame, err: %w", err)
+       }
+
+       // Compute and verify the header CRC24
+       computedHeaderCRC24 := KoopmanChecksum(header[:3])
+       readHeaderCRC24 := binary.LittleEndian.Uint32(header[3:]) & 0xFFFFFF
+       if computedHeaderCRC24 != readHeaderCRC24 {
+               return nil, false, fmt.Errorf("gocql: crc24 mismatch in frame 
header, computed: %d, got: %d", computedHeaderCRC24, readHeaderCRC24)
+       }
+
+       // Extract the payload length and self-contained flag
+       headerInt := binary.LittleEndian.Uint32(header[:4])
+       payloadLen := int(headerInt & 0x1FFFF)
+       isSelfContained := (headerInt & (1 << 17)) != 0
+
+       // Read the payload
+       payload := make([]byte, payloadLen)
+       if _, err := io.ReadFull(r, payload); err != nil {
+               return nil, false, fmt.Errorf("gocql: failed to read 
uncompressed frame payload, err: %w", err)
+       }
+
+       // Read and verify the payload CRC32
+       if _, err := io.ReadFull(r, header[:4]); err != nil {
+               return nil, false, fmt.Errorf("gocql: failed to read payload 
crc32, err: %w", err)
+       }
+
+       computedPayloadCRC32 := ChecksumIEEE(payload)
+       readPayloadCRC32 := binary.LittleEndian.Uint32(header[:4])
+       if computedPayloadCRC32 != readPayloadCRC32 {
+               return nil, false, fmt.Errorf("gocql: payload crc32 mismatch, 
computed: %d, got: %d", computedPayloadCRC32, readPayloadCRC32)
+       }
+
+       return payload, isSelfContained, nil
+}
+
+func newUncompressedFrame(payload []byte, isSelfContained bool) ([]byte, 
error) {
+       const (
+               headerSize       = 6
+               selfContainedBit = 1 << 17
+       )
+
+       payloadLen := len(payload)
+       if payloadLen > maxPayloadSize {
+               return nil, fmt.Errorf("payload length (%d) exceeds maximum 
size of 128 KiB", payloadLen)
+       }
+
+       // Create the frame
+       frameSize := headerSize + payloadLen + 4 // 4 bytes for CRC32
+       frame := make([]byte, frameSize)
+
+       // First 3 bytes: payload length and self-contained flag
+       headerInt := uint32(payloadLen) & 0x1FFFF

Review Comment:
   I don't think the `& 0x1FFFF` part is necessary since we already checked 
that this value is within the acceptable payload length.



##########
conn.go:
##########
@@ -777,6 +801,47 @@ func (c *Conn) handleTimeout() {
        }
 }
 
+func (c *Conn) recvProtoV5Frame(ctx context.Context) error {
+       var (
+               payload         []byte
+               isSelfContained bool
+               err             error
+       )
+
+       // Read frame based on compression
+       if c.compressor != nil {
+               payload, isSelfContained, err = readCompressedFrame(c.r, 
c.compressor)

Review Comment:
   rename `payload` to `frame`



##########
crc.go:
##########
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package gocql
+
+import (
+       "hash/crc32"
+)
+
+var (
+       // Initial CRC32 bytes: 0xFA, 0x2D, 0x55, 0xCA
+       initialCRC32Bytes = []byte{0xfa, 0x2d, 0x55, 0xca}
+)
+
+// ChecksumIEEE calculates the CRC32 checksum of the given byte slice.
+func ChecksumIEEE(b []byte) uint32 {

Review Comment:
   NIT: I think it would be better for readability to reference `Crc32` and 
`Crc24` in the name of these two functions instead of `IEEE` and `Koopman`



##########
conn.go:
##########
@@ -1756,6 +1888,41 @@ func (c *Conn) awaitSchemaAgreement(ctx context.Context) 
(err error) {
        return fmt.Errorf("gocql: cluster schema versions not consistent: %+v", 
schemas)
 }
 
+// recvLastsFrames reads proto v5 frames from Conn.r and writes decoded 
payload to dst.
+// It reads data until the bytesToRead is reached.
+// If Conn.compressor is not nil, it processes Compressed Format frames.
+func (c *Conn) recvLastsFrames(dst *bytes.Buffer, bytesToRead int) error {
+       var read int
+       var segment []byte
+       var err error
+       for read != bytesToRead {
+               // Read frame based on compression
+               if c.compressor != nil {
+                       segment, _, err = readCompressedFrame(c.r, c.compressor)
+               } else {
+                       segment, _, err = readUncompressedFrame(c.r)
+               }
+               if err != nil {
+                       return fmt.Errorf("gocql: failed to read non 
self-contained frame: %w", err)
+               }
+
+               // Write the segment to the destination writer
+               n, _ := dst.Write(segment)
+               read += n
+       }
+
+       return nil
+}
+
+func (c *Conn) processAllEnvelopesInFrame(ctx context.Context, r 
*bytes.Reader) error {

Review Comment:
   rename to `processAllFramesInSegment`



##########
conn.go:
##########
@@ -1756,6 +1888,41 @@ func (c *Conn) awaitSchemaAgreement(ctx context.Context) 
(err error) {
        return fmt.Errorf("gocql: cluster schema versions not consistent: %+v", 
schemas)
 }
 
+// recvLastsFrames reads proto v5 frames from Conn.r and writes decoded 
payload to dst.
+// It reads data until the bytesToRead is reached.
+// If Conn.compressor is not nil, it processes Compressed Format frames.
+func (c *Conn) recvLastsFrames(dst *bytes.Buffer, bytesToRead int) error {

Review Comment:
   NIT: rename to `recvPartialFrames`
   Or if you prefer `recvLastFrames` then just fix the typo



##########
frame.go:
##########
@@ -2070,3 +2133,247 @@ func (f *framer) writeBytesMap(m map[string][]byte) {
                f.writeBytes(v)
        }
 }
+
+func (f *framer) prepareModernLayout() error {
+       // Ensure protocol version is V5 or higher
+       if f.proto < protoVersion5 {
+               panic("Modern layout is not supported with version V4 or less")
+       }
+
+       selfContained := true
+
+       var (
+               adjustedBuf []byte
+               tempBuf     []byte
+               err         error
+       )
+
+       // Process the buffer in chunks if it exceeds the max payload size
+       for len(f.buf) > maxPayloadSize {
+               if f.compres != nil {
+                       tempBuf, err = 
newCompressedFrame(f.buf[:maxPayloadSize], false, f.compres)
+               } else {
+                       tempBuf, err = 
newUncompressedFrame(f.buf[:maxPayloadSize], false)
+               }
+               if err != nil {
+                       return err
+               }
+
+               adjustedBuf = append(adjustedBuf, tempBuf...)
+               f.buf = f.buf[maxPayloadSize:]
+               selfContained = false
+       }
+
+       // Process the remaining buffer
+       if f.compres != nil {
+               tempBuf, err = newCompressedFrame(f.buf, selfContained, 
f.compres)
+       } else {
+               tempBuf, err = newUncompressedFrame(f.buf, selfContained)
+       }
+       if err != nil {
+               return err
+       }
+
+       adjustedBuf = append(adjustedBuf, tempBuf...)
+       f.buf = adjustedBuf
+
+       return nil
+}
+
+func readUncompressedFrame(r io.Reader) ([]byte, bool, error) {
+       const headerSize = 6
+       header := [headerSize + 1]byte{}
+
+       // Read the frame header
+       if _, err := io.ReadFull(r, header[:headerSize]); err != nil {
+               return nil, false, fmt.Errorf("gocql: failed to read 
uncompressed frame, err: %w", err)
+       }
+
+       // Compute and verify the header CRC24
+       computedHeaderCRC24 := KoopmanChecksum(header[:3])
+       readHeaderCRC24 := binary.LittleEndian.Uint32(header[3:]) & 0xFFFFFF

Review Comment:
   Replace with `readHeaderCRC24 := uint32(header[3]) | uint32(header[4])<<8 | 
uint32(header[5])<<16`
   so that we can declare the header as a `6` byte array instead of `7`
   



##########
conn.go:
##########
@@ -1756,6 +1888,41 @@ func (c *Conn) awaitSchemaAgreement(ctx context.Context) 
(err error) {
        return fmt.Errorf("gocql: cluster schema versions not consistent: %+v", 
schemas)
 }
 
+// recvLastsFrames reads proto v5 frames from Conn.r and writes decoded 
payload to dst.

Review Comment:
   NIT: `reads proto v5 segments from Conn.r and writes decoded partial frames 
to dst.`



##########
conn.go:
##########
@@ -694,7 +718,7 @@ func (c *Conn) recv(ctx context.Context) error {
        } else if head.stream == -1 {
                // TODO: handle cassandra event frames, we shouldnt get any 
currently
                framer := newFramer(c.compressor, c.version)
-               if err := framer.readFrame(c, &head); err != nil {
+               if err := framer.readFrame(r, &head); err != nil {
                        return err
                }
                go c.session.handleEvent(framer)

Review Comment:
   Line 730:
   ```
   if err := framer.readFrame(c, &head); err != nil {
                        return err
   }
   ```
   This should be changed to `r` as well right? 
   
   I don't like that `Conn` implements `io.Reader` and also has a 
`*bufio.Reader` field, it makes it very easy to use one when we actually wanted 
the other... 
   
   I think this is even the case on `readUncompressedFrame` and 
`readCompressedFrame`... It's using `c.r` directly but it should be using `c` 
so the additional logic from `Conn.Read()` is used right?
   
   I think we should create a type (maybe `connReader`?) that wraps the 
`*bufio.Reader` object and then move the implementation of `Conn.Read()` to 
that new type so we ensure that we only have one object that implements 
`io.Reader`. This way `c.r` becomes an object of `*connReader` instead of 
`*bufio.Reader`, `Conn` no longer implements `io.Reader` and everytime we want 
to actually read from the socket we have to use the `c.r` field instead of `c` 
directly.



##########
conn.go:
##########
@@ -1756,6 +1888,41 @@ func (c *Conn) awaitSchemaAgreement(ctx context.Context) 
(err error) {
        return fmt.Errorf("gocql: cluster schema versions not consistent: %+v", 
schemas)
 }
 
+// recvLastsFrames reads proto v5 frames from Conn.r and writes decoded 
payload to dst.
+// It reads data until the bytesToRead is reached.
+// If Conn.compressor is not nil, it processes Compressed Format frames.
+func (c *Conn) recvLastsFrames(dst *bytes.Buffer, bytesToRead int) error {
+       var read int
+       var segment []byte
+       var err error
+       for read != bytesToRead {
+               // Read frame based on compression
+               if c.compressor != nil {
+                       segment, _, err = readCompressedFrame(c.r, c.compressor)
+               } else {
+                       segment, _, err = readUncompressedFrame(c.r)

Review Comment:
   We could check the self contained flag to make sure it is false and if it's 
not then return a custom "protocol error" so `go.serve` logs it and closes the 
connection.



##########
conn.go:
##########
@@ -777,6 +801,47 @@ func (c *Conn) handleTimeout() {
        }
 }
 
+func (c *Conn) recvProtoV5Frame(ctx context.Context) error {

Review Comment:
   rename `recvProtoV5Frame` to `recvSegment` (I'd say not to include the 
protocol version in the name since there will eventually be a protocol v6 that 
will very likely follow this format)



##########
frame.go:
##########
@@ -2070,3 +2133,247 @@ func (f *framer) writeBytesMap(m map[string][]byte) {
                f.writeBytes(v)
        }
 }
+
+func (f *framer) prepareModernLayout() error {
+       // Ensure protocol version is V5 or higher
+       if f.proto < protoVersion5 {
+               panic("Modern layout is not supported with version V4 or less")
+       }
+
+       selfContained := true
+
+       var (
+               adjustedBuf []byte
+               tempBuf     []byte
+               err         error
+       )
+
+       // Process the buffer in chunks if it exceeds the max payload size
+       for len(f.buf) > maxPayloadSize {
+               if f.compres != nil {
+                       tempBuf, err = 
newCompressedFrame(f.buf[:maxPayloadSize], false, f.compres)
+               } else {
+                       tempBuf, err = 
newUncompressedFrame(f.buf[:maxPayloadSize], false)
+               }
+               if err != nil {
+                       return err
+               }
+
+               adjustedBuf = append(adjustedBuf, tempBuf...)
+               f.buf = f.buf[maxPayloadSize:]
+               selfContained = false
+       }
+
+       // Process the remaining buffer
+       if f.compres != nil {
+               tempBuf, err = newCompressedFrame(f.buf, selfContained, 
f.compres)
+       } else {
+               tempBuf, err = newUncompressedFrame(f.buf, selfContained)
+       }
+       if err != nil {
+               return err
+       }
+
+       adjustedBuf = append(adjustedBuf, tempBuf...)
+       f.buf = adjustedBuf
+
+       return nil
+}
+
+func readUncompressedFrame(r io.Reader) ([]byte, bool, error) {
+       const headerSize = 6
+       header := [headerSize + 1]byte{}
+
+       // Read the frame header
+       if _, err := io.ReadFull(r, header[:headerSize]); err != nil {
+               return nil, false, fmt.Errorf("gocql: failed to read 
uncompressed frame, err: %w", err)
+       }
+
+       // Compute and verify the header CRC24
+       computedHeaderCRC24 := KoopmanChecksum(header[:3])
+       readHeaderCRC24 := binary.LittleEndian.Uint32(header[3:]) & 0xFFFFFF
+       if computedHeaderCRC24 != readHeaderCRC24 {
+               return nil, false, fmt.Errorf("gocql: crc24 mismatch in frame 
header, computed: %d, got: %d", computedHeaderCRC24, readHeaderCRC24)
+       }
+
+       // Extract the payload length and self-contained flag
+       headerInt := binary.LittleEndian.Uint32(header[:4])
+       payloadLen := int(headerInt & 0x1FFFF)

Review Comment:
   NIT: make `0x1FFFF` a constant (max segment payload size)



##########
conn.go:
##########
@@ -777,6 +801,47 @@ func (c *Conn) handleTimeout() {
        }
 }
 
+func (c *Conn) recvProtoV5Frame(ctx context.Context) error {
+       var (
+               payload         []byte
+               isSelfContained bool
+               err             error
+       )
+
+       // Read frame based on compression
+       if c.compressor != nil {
+               payload, isSelfContained, err = readCompressedFrame(c.r, 
c.compressor)
+       } else {
+               payload, isSelfContained, err = readUncompressedFrame(c.r)
+       }
+       if err != nil {
+               return err
+       }
+
+       if isSelfContained {
+               return c.processAllEnvelopesInFrame(ctx, 
bytes.NewReader(payload))
+       }
+
+       head, err := readHeader(bytes.NewReader(payload), c.headerBuf[:])
+       if err != nil {
+               return err
+       }
+
+       const envelopeHeaderLength = 9

Review Comment:
   rename to `frameHeaderLength`



##########
frame.go:
##########
@@ -2070,3 +2133,247 @@ func (f *framer) writeBytesMap(m map[string][]byte) {
                f.writeBytes(v)
        }
 }
+
+func (f *framer) prepareModernLayout() error {
+       // Ensure protocol version is V5 or higher
+       if f.proto < protoVersion5 {
+               panic("Modern layout is not supported with version V4 or less")
+       }
+
+       selfContained := true
+
+       var (
+               adjustedBuf []byte
+               tempBuf     []byte
+               err         error
+       )
+
+       // Process the buffer in chunks if it exceeds the max payload size
+       for len(f.buf) > maxPayloadSize {
+               if f.compres != nil {
+                       tempBuf, err = 
newCompressedFrame(f.buf[:maxPayloadSize], false, f.compres)
+               } else {
+                       tempBuf, err = 
newUncompressedFrame(f.buf[:maxPayloadSize], false)
+               }
+               if err != nil {
+                       return err
+               }
+
+               adjustedBuf = append(adjustedBuf, tempBuf...)
+               f.buf = f.buf[maxPayloadSize:]
+               selfContained = false
+       }
+
+       // Process the remaining buffer
+       if f.compres != nil {
+               tempBuf, err = newCompressedFrame(f.buf, selfContained, 
f.compres)
+       } else {
+               tempBuf, err = newUncompressedFrame(f.buf, selfContained)
+       }
+       if err != nil {
+               return err
+       }
+
+       adjustedBuf = append(adjustedBuf, tempBuf...)
+       f.buf = adjustedBuf
+
+       return nil
+}
+
+func readUncompressedFrame(r io.Reader) ([]byte, bool, error) {
+       const headerSize = 6
+       header := [headerSize + 1]byte{}
+
+       // Read the frame header
+       if _, err := io.ReadFull(r, header[:headerSize]); err != nil {
+               return nil, false, fmt.Errorf("gocql: failed to read 
uncompressed frame, err: %w", err)
+       }
+
+       // Compute and verify the header CRC24
+       computedHeaderCRC24 := KoopmanChecksum(header[:3])
+       readHeaderCRC24 := binary.LittleEndian.Uint32(header[3:]) & 0xFFFFFF
+       if computedHeaderCRC24 != readHeaderCRC24 {
+               return nil, false, fmt.Errorf("gocql: crc24 mismatch in frame 
header, computed: %d, got: %d", computedHeaderCRC24, readHeaderCRC24)
+       }
+
+       // Extract the payload length and self-contained flag
+       headerInt := binary.LittleEndian.Uint32(header[:4])
+       payloadLen := int(headerInt & 0x1FFFF)
+       isSelfContained := (headerInt & (1 << 17)) != 0
+
+       // Read the payload
+       payload := make([]byte, payloadLen)
+       if _, err := io.ReadFull(r, payload); err != nil {
+               return nil, false, fmt.Errorf("gocql: failed to read 
uncompressed frame payload, err: %w", err)
+       }
+
+       // Read and verify the payload CRC32
+       if _, err := io.ReadFull(r, header[:4]); err != nil {
+               return nil, false, fmt.Errorf("gocql: failed to read payload 
crc32, err: %w", err)
+       }
+
+       computedPayloadCRC32 := ChecksumIEEE(payload)
+       readPayloadCRC32 := binary.LittleEndian.Uint32(header[:4])
+       if computedPayloadCRC32 != readPayloadCRC32 {
+               return nil, false, fmt.Errorf("gocql: payload crc32 mismatch, 
computed: %d, got: %d", computedPayloadCRC32, readPayloadCRC32)
+       }
+
+       return payload, isSelfContained, nil
+}
+
+func newUncompressedFrame(payload []byte, isSelfContained bool) ([]byte, 
error) {
+       const (
+               headerSize       = 6
+               selfContainedBit = 1 << 17
+       )
+
+       payloadLen := len(payload)
+       if payloadLen > maxPayloadSize {
+               return nil, fmt.Errorf("payload length (%d) exceeds maximum 
size of 128 KiB", payloadLen)
+       }
+
+       // Create the frame
+       frameSize := headerSize + payloadLen + 4 // 4 bytes for CRC32
+       frame := make([]byte, frameSize)
+
+       // First 3 bytes: payload length and self-contained flag
+       headerInt := uint32(payloadLen) & 0x1FFFF
+       if isSelfContained {
+               headerInt |= selfContainedBit // Set the self-contained flag
+       }
+
+       // Encode the first 3 bytes as a single little-endian integer
+       frame[0] = byte(headerInt)
+       frame[1] = byte(headerInt >> 8)
+       frame[2] = byte(headerInt >> 16)
+
+       // Calculate CRC24 for the first 3 bytes of the header
+       crc := KoopmanChecksum(frame[:3])
+
+       // Encode CRC24 into the next 3 bytes of the header
+       frame[3] = byte(crc)
+       frame[4] = byte(crc >> 8)
+       frame[5] = byte(crc >> 16)
+
+       copy(frame[headerSize:], payload) // Copy the payload to the frame
+
+       // Calculate CRC32 for the payload
+       payloadCRC32 := ChecksumIEEE(payload)
+       binary.LittleEndian.PutUint32(frame[headerSize+payloadLen:], 
payloadCRC32)
+
+       return frame, nil
+}
+
+func newCompressedFrame(uncompressedPayload []byte, isSelfContained bool, 
compressor Compressor) ([]byte, error) {
+       uncompressedLen := len(uncompressedPayload)
+       if uncompressedLen > maxPayloadSize {
+               return nil, fmt.Errorf("uncompressed compressed payload length 
exceedes max size of frame payload %d/%d", uncompressedLen, maxPayloadSize)
+       }
+
+       compressedPayload, err := compressor.Encode(uncompressedPayload)
+       if err != nil {
+               return nil, err
+       }
+
+       // Skip the first 4 bytes because the size of the uncompressed payload 
is written in the frame header, not in the
+       // body of the compressed envelope
+       compressedPayload = compressedPayload[4:]
+
+       compressedLen := len(compressedPayload)
+
+       // Compression is not worth it
+       if uncompressedLen < compressedLen {
+               // native_protocol_v5.spec
+               // 2.2
+               //  An uncompressed length of 0 signals that the compressed 
payload
+               //  should be used as-is and not decompressed.
+               compressedPayload = uncompressedPayload
+               compressedLen = uncompressedLen
+               uncompressedLen = 0
+       }
+
+       // Combine compressed and uncompressed lengths and set the 
self-contained flag if needed
+       combined := uint64(compressedLen) | uint64(uncompressedLen)<<17
+       if isSelfContained {
+               combined |= 1 << 34
+       }
+
+       var headerBuf [8]byte
+
+       // Write the combined value into the header buffer
+       binary.LittleEndian.PutUint64(headerBuf[:], combined)
+
+       // Create a buffer with enough capacity to hold the header, compressed 
payload, and checksums
+       buf := bytes.NewBuffer(make([]byte, 0, 8+compressedLen+4))
+
+       // Write the first 5 bytes of the header (compressed and uncompressed 
sizes)
+       buf.Write(headerBuf[:5])
+
+       // Compute and write the CRC24 checksum of the first 5 bytes
+       headerChecksum := KoopmanChecksum(headerBuf[:5])
+       binary.LittleEndian.PutUint32(headerBuf[:], headerChecksum)
+       buf.Write(headerBuf[:3])
+       buf.Write(compressedPayload)
+
+       // Compute and write the CRC32 checksum of the payload
+       payloadChecksum := ChecksumIEEE(compressedPayload)
+       binary.LittleEndian.PutUint32(headerBuf[:], payloadChecksum)
+       buf.Write(headerBuf[:4])
+
+       return buf.Bytes(), nil
+}
+
+func readCompressedFrame(r io.Reader, compressor Compressor) ([]byte, bool, 
error) {

Review Comment:
   rename to `readCompressedSegment`



##########
frame.go:
##########
@@ -2070,3 +2133,247 @@ func (f *framer) writeBytesMap(m map[string][]byte) {
                f.writeBytes(v)
        }
 }
+
+func (f *framer) prepareModernLayout() error {
+       // Ensure protocol version is V5 or higher
+       if f.proto < protoVersion5 {
+               panic("Modern layout is not supported with version V4 or less")
+       }
+
+       selfContained := true
+
+       var (
+               adjustedBuf []byte
+               tempBuf     []byte
+               err         error
+       )
+
+       // Process the buffer in chunks if it exceeds the max payload size
+       for len(f.buf) > maxPayloadSize {
+               if f.compres != nil {
+                       tempBuf, err = 
newCompressedFrame(f.buf[:maxPayloadSize], false, f.compres)
+               } else {
+                       tempBuf, err = 
newUncompressedFrame(f.buf[:maxPayloadSize], false)
+               }
+               if err != nil {
+                       return err
+               }
+
+               adjustedBuf = append(adjustedBuf, tempBuf...)
+               f.buf = f.buf[maxPayloadSize:]
+               selfContained = false
+       }
+
+       // Process the remaining buffer
+       if f.compres != nil {
+               tempBuf, err = newCompressedFrame(f.buf, selfContained, 
f.compres)
+       } else {
+               tempBuf, err = newUncompressedFrame(f.buf, selfContained)
+       }
+       if err != nil {
+               return err
+       }
+
+       adjustedBuf = append(adjustedBuf, tempBuf...)
+       f.buf = adjustedBuf
+
+       return nil
+}
+
+func readUncompressedFrame(r io.Reader) ([]byte, bool, error) {
+       const headerSize = 6
+       header := [headerSize + 1]byte{}
+
+       // Read the frame header
+       if _, err := io.ReadFull(r, header[:headerSize]); err != nil {
+               return nil, false, fmt.Errorf("gocql: failed to read 
uncompressed frame, err: %w", err)
+       }
+
+       // Compute and verify the header CRC24
+       computedHeaderCRC24 := KoopmanChecksum(header[:3])
+       readHeaderCRC24 := binary.LittleEndian.Uint32(header[3:]) & 0xFFFFFF
+       if computedHeaderCRC24 != readHeaderCRC24 {
+               return nil, false, fmt.Errorf("gocql: crc24 mismatch in frame 
header, computed: %d, got: %d", computedHeaderCRC24, readHeaderCRC24)
+       }
+
+       // Extract the payload length and self-contained flag
+       headerInt := binary.LittleEndian.Uint32(header[:4])
+       payloadLen := int(headerInt & 0x1FFFF)
+       isSelfContained := (headerInt & (1 << 17)) != 0
+
+       // Read the payload
+       payload := make([]byte, payloadLen)
+       if _, err := io.ReadFull(r, payload); err != nil {
+               return nil, false, fmt.Errorf("gocql: failed to read 
uncompressed frame payload, err: %w", err)
+       }
+
+       // Read and verify the payload CRC32
+       if _, err := io.ReadFull(r, header[:4]); err != nil {

Review Comment:
   NIT: make `4` a constant (crc32 length), `io.ReadFull(r, header[:crc32Size])`



##########
frame.go:
##########
@@ -2070,3 +2133,247 @@ func (f *framer) writeBytesMap(m map[string][]byte) {
                f.writeBytes(v)
        }
 }
+
+func (f *framer) prepareModernLayout() error {
+       // Ensure protocol version is V5 or higher
+       if f.proto < protoVersion5 {
+               panic("Modern layout is not supported with version V4 or less")
+       }
+
+       selfContained := true
+
+       var (
+               adjustedBuf []byte
+               tempBuf     []byte
+               err         error
+       )
+
+       // Process the buffer in chunks if it exceeds the max payload size
+       for len(f.buf) > maxPayloadSize {
+               if f.compres != nil {
+                       tempBuf, err = 
newCompressedFrame(f.buf[:maxPayloadSize], false, f.compres)
+               } else {
+                       tempBuf, err = 
newUncompressedFrame(f.buf[:maxPayloadSize], false)
+               }
+               if err != nil {
+                       return err
+               }
+
+               adjustedBuf = append(adjustedBuf, tempBuf...)
+               f.buf = f.buf[maxPayloadSize:]
+               selfContained = false
+       }
+
+       // Process the remaining buffer
+       if f.compres != nil {
+               tempBuf, err = newCompressedFrame(f.buf, selfContained, 
f.compres)
+       } else {
+               tempBuf, err = newUncompressedFrame(f.buf, selfContained)
+       }
+       if err != nil {
+               return err
+       }
+
+       adjustedBuf = append(adjustedBuf, tempBuf...)
+       f.buf = adjustedBuf
+
+       return nil
+}
+
+func readUncompressedFrame(r io.Reader) ([]byte, bool, error) {
+       const headerSize = 6
+       header := [headerSize + 1]byte{}
+
+       // Read the frame header
+       if _, err := io.ReadFull(r, header[:headerSize]); err != nil {

Review Comment:
   `io.ReadFull(r, header[:])` after making the array size of `6` instead of 
`7` (see comment below)



##########
frame.go:
##########
@@ -2070,3 +2133,247 @@ func (f *framer) writeBytesMap(m map[string][]byte) {
                f.writeBytes(v)
        }
 }
+
+func (f *framer) prepareModernLayout() error {
+       // Ensure protocol version is V5 or higher
+       if f.proto < protoVersion5 {
+               panic("Modern layout is not supported with version V4 or less")
+       }
+
+       selfContained := true
+
+       var (
+               adjustedBuf []byte
+               tempBuf     []byte
+               err         error
+       )
+
+       // Process the buffer in chunks if it exceeds the max payload size
+       for len(f.buf) > maxPayloadSize {
+               if f.compres != nil {
+                       tempBuf, err = 
newCompressedFrame(f.buf[:maxPayloadSize], false, f.compres)
+               } else {
+                       tempBuf, err = 
newUncompressedFrame(f.buf[:maxPayloadSize], false)
+               }
+               if err != nil {
+                       return err
+               }
+
+               adjustedBuf = append(adjustedBuf, tempBuf...)
+               f.buf = f.buf[maxPayloadSize:]
+               selfContained = false
+       }
+
+       // Process the remaining buffer
+       if f.compres != nil {
+               tempBuf, err = newCompressedFrame(f.buf, selfContained, 
f.compres)
+       } else {
+               tempBuf, err = newUncompressedFrame(f.buf, selfContained)
+       }
+       if err != nil {
+               return err
+       }
+
+       adjustedBuf = append(adjustedBuf, tempBuf...)
+       f.buf = adjustedBuf
+
+       return nil
+}
+
+func readUncompressedFrame(r io.Reader) ([]byte, bool, error) {
+       const headerSize = 6
+       header := [headerSize + 1]byte{}
+
+       // Read the frame header
+       if _, err := io.ReadFull(r, header[:headerSize]); err != nil {
+               return nil, false, fmt.Errorf("gocql: failed to read 
uncompressed frame, err: %w", err)
+       }
+
+       // Compute and verify the header CRC24
+       computedHeaderCRC24 := KoopmanChecksum(header[:3])
+       readHeaderCRC24 := binary.LittleEndian.Uint32(header[3:]) & 0xFFFFFF
+       if computedHeaderCRC24 != readHeaderCRC24 {
+               return nil, false, fmt.Errorf("gocql: crc24 mismatch in frame 
header, computed: %d, got: %d", computedHeaderCRC24, readHeaderCRC24)
+       }
+
+       // Extract the payload length and self-contained flag
+       headerInt := binary.LittleEndian.Uint32(header[:4])
+       payloadLen := int(headerInt & 0x1FFFF)
+       isSelfContained := (headerInt & (1 << 17)) != 0
+
+       // Read the payload
+       payload := make([]byte, payloadLen)
+       if _, err := io.ReadFull(r, payload); err != nil {
+               return nil, false, fmt.Errorf("gocql: failed to read 
uncompressed frame payload, err: %w", err)
+       }
+
+       // Read and verify the payload CRC32
+       if _, err := io.ReadFull(r, header[:4]); err != nil {
+               return nil, false, fmt.Errorf("gocql: failed to read payload 
crc32, err: %w", err)
+       }
+
+       computedPayloadCRC32 := ChecksumIEEE(payload)
+       readPayloadCRC32 := binary.LittleEndian.Uint32(header[:4])
+       if computedPayloadCRC32 != readPayloadCRC32 {
+               return nil, false, fmt.Errorf("gocql: payload crc32 mismatch, 
computed: %d, got: %d", computedPayloadCRC32, readPayloadCRC32)
+       }
+
+       return payload, isSelfContained, nil
+}
+
+func newUncompressedFrame(payload []byte, isSelfContained bool) ([]byte, 
error) {
+       const (
+               headerSize       = 6
+               selfContainedBit = 1 << 17
+       )
+
+       payloadLen := len(payload)
+       if payloadLen > maxPayloadSize {
+               return nil, fmt.Errorf("payload length (%d) exceeds maximum 
size of 128 KiB", payloadLen)

Review Comment:
   NIT: use the constant for the maximum payload size in the string



##########
frame.go:
##########
@@ -2070,3 +2133,247 @@ func (f *framer) writeBytesMap(m map[string][]byte) {
                f.writeBytes(v)
        }
 }
+
+func (f *framer) prepareModernLayout() error {
+       // Ensure protocol version is V5 or higher
+       if f.proto < protoVersion5 {
+               panic("Modern layout is not supported with version V4 or less")
+       }
+
+       selfContained := true
+
+       var (
+               adjustedBuf []byte
+               tempBuf     []byte
+               err         error
+       )
+
+       // Process the buffer in chunks if it exceeds the max payload size
+       for len(f.buf) > maxPayloadSize {
+               if f.compres != nil {
+                       tempBuf, err = 
newCompressedFrame(f.buf[:maxPayloadSize], false, f.compres)
+               } else {
+                       tempBuf, err = 
newUncompressedFrame(f.buf[:maxPayloadSize], false)
+               }
+               if err != nil {
+                       return err
+               }
+
+               adjustedBuf = append(adjustedBuf, tempBuf...)
+               f.buf = f.buf[maxPayloadSize:]
+               selfContained = false
+       }
+
+       // Process the remaining buffer
+       if f.compres != nil {
+               tempBuf, err = newCompressedFrame(f.buf, selfContained, 
f.compres)
+       } else {
+               tempBuf, err = newUncompressedFrame(f.buf, selfContained)
+       }
+       if err != nil {
+               return err
+       }
+
+       adjustedBuf = append(adjustedBuf, tempBuf...)
+       f.buf = adjustedBuf
+
+       return nil
+}
+
+func readUncompressedFrame(r io.Reader) ([]byte, bool, error) {
+       const headerSize = 6
+       header := [headerSize + 1]byte{}

Review Comment:
   NIT: `headerBuf := [headerSize + crc24Size]byte{}`



##########
go.mod:
##########
@@ -20,11 +20,14 @@ module github.com/gocql/gocql
 require (
        github.com/bitly/go-hostpool v0.0.0-20171023180738-a3a6125de932 // 
indirect
        github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869 // 
indirect
+       github.com/gocql/gocql/lz4 v0.0.0-20240925165811-953e0df999ca
        github.com/golang/snappy v0.0.3
        github.com/hailocab/go-hostpool v0.0.0-20160125115350-e80d13ce29ed
        github.com/kr/pretty v0.1.0 // indirect
-       github.com/stretchr/testify v1.3.0 // indirect
+       github.com/stretchr/testify v1.9.0
        gopkg.in/inf.v0 v0.9.1
 )
 
 go 1.13
+
+replace github.com/gocql/gocql/lz4 => ./lz4

Review Comment:
   This doesn't seem intended?



##########
conn.go:
##########
@@ -1086,7 +1155,29 @@ func (c *Conn) exec(ctx context.Context, req 
frameBuilder, tracer Tracer) (*fram
                return nil, err
        }
 
-       n, err := c.w.writeContext(ctx, framer.buf)
+       var n int
+
+       if c.version > protoVersion4 && startupCompleted {
+               err = framer.prepareModernLayout()
+               if err != nil {

Review Comment:
   Do we need this `if` here? Can't we just check below before calling 
`c.w.writeContext` and let that error handling code do this for us?
   ```
   if err == nil {
       n, err = c.w.writeContext(ctx, framer.buf)
   }
   if err != nil {
   // existing error handling
   }
   ```



##########
frame.go:
##########
@@ -2070,3 +2133,247 @@ func (f *framer) writeBytesMap(m map[string][]byte) {
                f.writeBytes(v)
        }
 }
+
+func (f *framer) prepareModernLayout() error {
+       // Ensure protocol version is V5 or higher
+       if f.proto < protoVersion5 {
+               panic("Modern layout is not supported with version V4 or less")
+       }
+
+       selfContained := true
+
+       var (
+               adjustedBuf []byte
+               tempBuf     []byte
+               err         error
+       )
+
+       // Process the buffer in chunks if it exceeds the max payload size
+       for len(f.buf) > maxPayloadSize {
+               if f.compres != nil {
+                       tempBuf, err = 
newCompressedFrame(f.buf[:maxPayloadSize], false, f.compres)
+               } else {
+                       tempBuf, err = 
newUncompressedFrame(f.buf[:maxPayloadSize], false)
+               }
+               if err != nil {
+                       return err
+               }
+
+               adjustedBuf = append(adjustedBuf, tempBuf...)
+               f.buf = f.buf[maxPayloadSize:]
+               selfContained = false
+       }
+
+       // Process the remaining buffer
+       if f.compres != nil {
+               tempBuf, err = newCompressedFrame(f.buf, selfContained, 
f.compres)
+       } else {
+               tempBuf, err = newUncompressedFrame(f.buf, selfContained)
+       }
+       if err != nil {
+               return err
+       }
+
+       adjustedBuf = append(adjustedBuf, tempBuf...)
+       f.buf = adjustedBuf
+
+       return nil
+}
+
+func readUncompressedFrame(r io.Reader) ([]byte, bool, error) {
+       const headerSize = 6
+       header := [headerSize + 1]byte{}
+
+       // Read the frame header
+       if _, err := io.ReadFull(r, header[:headerSize]); err != nil {
+               return nil, false, fmt.Errorf("gocql: failed to read 
uncompressed frame, err: %w", err)
+       }
+
+       // Compute and verify the header CRC24
+       computedHeaderCRC24 := KoopmanChecksum(header[:3])
+       readHeaderCRC24 := binary.LittleEndian.Uint32(header[3:]) & 0xFFFFFF
+       if computedHeaderCRC24 != readHeaderCRC24 {
+               return nil, false, fmt.Errorf("gocql: crc24 mismatch in frame 
header, computed: %d, got: %d", computedHeaderCRC24, readHeaderCRC24)
+       }
+
+       // Extract the payload length and self-contained flag
+       headerInt := binary.LittleEndian.Uint32(header[:4])
+       payloadLen := int(headerInt & 0x1FFFF)
+       isSelfContained := (headerInt & (1 << 17)) != 0
+
+       // Read the payload
+       payload := make([]byte, payloadLen)
+       if _, err := io.ReadFull(r, payload); err != nil {
+               return nil, false, fmt.Errorf("gocql: failed to read 
uncompressed frame payload, err: %w", err)
+       }
+
+       // Read and verify the payload CRC32
+       if _, err := io.ReadFull(r, header[:4]); err != nil {
+               return nil, false, fmt.Errorf("gocql: failed to read payload 
crc32, err: %w", err)
+       }
+
+       computedPayloadCRC32 := ChecksumIEEE(payload)
+       readPayloadCRC32 := binary.LittleEndian.Uint32(header[:4])
+       if computedPayloadCRC32 != readPayloadCRC32 {
+               return nil, false, fmt.Errorf("gocql: payload crc32 mismatch, 
computed: %d, got: %d", computedPayloadCRC32, readPayloadCRC32)
+       }
+
+       return payload, isSelfContained, nil
+}
+
+func newUncompressedFrame(payload []byte, isSelfContained bool) ([]byte, 
error) {
+       const (
+               headerSize       = 6
+               selfContainedBit = 1 << 17
+       )
+
+       payloadLen := len(payload)
+       if payloadLen > maxPayloadSize {
+               return nil, fmt.Errorf("payload length (%d) exceeds maximum 
size of 128 KiB", payloadLen)
+       }
+
+       // Create the frame
+       frameSize := headerSize + payloadLen + 4 // 4 bytes for CRC32
+       frame := make([]byte, frameSize)
+
+       // First 3 bytes: payload length and self-contained flag
+       headerInt := uint32(payloadLen) & 0x1FFFF
+       if isSelfContained {
+               headerInt |= selfContainedBit // Set the self-contained flag
+       }
+
+       // Encode the first 3 bytes as a single little-endian integer
+       frame[0] = byte(headerInt)
+       frame[1] = byte(headerInt >> 8)
+       frame[2] = byte(headerInt >> 16)
+
+       // Calculate CRC24 for the first 3 bytes of the header
+       crc := KoopmanChecksum(frame[:3])
+
+       // Encode CRC24 into the next 3 bytes of the header
+       frame[3] = byte(crc)
+       frame[4] = byte(crc >> 8)
+       frame[5] = byte(crc >> 16)
+
+       copy(frame[headerSize:], payload) // Copy the payload to the frame
+
+       // Calculate CRC32 for the payload
+       payloadCRC32 := ChecksumIEEE(payload)
+       binary.LittleEndian.PutUint32(frame[headerSize+payloadLen:], 
payloadCRC32)
+
+       return frame, nil
+}
+
+func newCompressedFrame(uncompressedPayload []byte, isSelfContained bool, 
compressor Compressor) ([]byte, error) {
+       uncompressedLen := len(uncompressedPayload)
+       if uncompressedLen > maxPayloadSize {
+               return nil, fmt.Errorf("uncompressed compressed payload length 
exceedes max size of frame payload %d/%d", uncompressedLen, maxPayloadSize)
+       }
+
+       compressedPayload, err := compressor.Encode(uncompressedPayload)
+       if err != nil {
+               return nil, err
+       }
+
+       // Skip the first 4 bytes because the size of the uncompressed payload 
is written in the frame header, not in the
+       // body of the compressed envelope
+       compressedPayload = compressedPayload[4:]
+
+       compressedLen := len(compressedPayload)
+
+       // Compression is not worth it
+       if uncompressedLen < compressedLen {
+               // native_protocol_v5.spec
+               // 2.2
+               //  An uncompressed length of 0 signals that the compressed 
payload
+               //  should be used as-is and not decompressed.
+               compressedPayload = uncompressedPayload
+               compressedLen = uncompressedLen
+               uncompressedLen = 0
+       }
+
+       // Combine compressed and uncompressed lengths and set the 
self-contained flag if needed
+       combined := uint64(compressedLen) | uint64(uncompressedLen)<<17
+       if isSelfContained {
+               combined |= 1 << 34
+       }
+
+       var headerBuf [8]byte
+
+       // Write the combined value into the header buffer
+       binary.LittleEndian.PutUint64(headerBuf[:], combined)
+
+       // Create a buffer with enough capacity to hold the header, compressed 
payload, and checksums
+       buf := bytes.NewBuffer(make([]byte, 0, 8+compressedLen+4))
+
+       // Write the first 5 bytes of the header (compressed and uncompressed 
sizes)
+       buf.Write(headerBuf[:5])
+
+       // Compute and write the CRC24 checksum of the first 5 bytes
+       headerChecksum := KoopmanChecksum(headerBuf[:5])
+       binary.LittleEndian.PutUint32(headerBuf[:], headerChecksum)
+       buf.Write(headerBuf[:3])
+       buf.Write(compressedPayload)
+
+       // Compute and write the CRC32 checksum of the payload
+       payloadChecksum := ChecksumIEEE(compressedPayload)
+       binary.LittleEndian.PutUint32(headerBuf[:], payloadChecksum)
+       buf.Write(headerBuf[:4])
+
+       return buf.Bytes(), nil
+}
+
+func readCompressedFrame(r io.Reader, compressor Compressor) ([]byte, bool, 
error) {
+       var (
+               headerBuf [8]byte
+               err       error
+       )
+
+       if _, err = io.ReadFull(r, headerBuf[:]); err != nil {
+               return nil, false, err
+       }
+
+       // Reading checksum from frame header
+       readHeaderChecksum := uint32(headerBuf[5]) | uint32(headerBuf[6])<<8 | 
uint32(headerBuf[7])<<16
+       if computedHeaderChecksum := KoopmanChecksum(headerBuf[:5]); 
computedHeaderChecksum != readHeaderChecksum {
+               return nil, false, fmt.Errorf("gocql: crc24 mismatch in frame 
header, read: %d, computed: %d", readHeaderChecksum, computedHeaderChecksum)
+       }
+
+       // First 17 bits - payload size after compression
+       compressedLen := uint32(headerBuf[0]) | uint32(headerBuf[1])<<8 | 
uint32(headerBuf[2]&0x1)<<16
+
+       // The next 17 bits - payload size before compression
+       uncompressedLen := (uint32(headerBuf[2]) >> 1) | 
uint32(headerBuf[3])<<7 | uint32(headerBuf[4]&0b11)<<15
+
+       // Self-contained flag
+       selfContained := (headerBuf[4] & 0b100) != 0
+
+       compressedPayload := make([]byte, compressedLen)
+       if _, err = io.ReadFull(r, compressedPayload); err != nil {
+               return nil, false, fmt.Errorf("gocql: failed to read compressed 
frame payload, err: %w", err)
+       }
+
+       if _, err = io.ReadFull(r, headerBuf[:4]); err != nil {
+               return nil, false, fmt.Errorf("gocql: failed to read payload 
crc32, err: %w", err)
+       }
+
+       // Ensuring if payload checksum matches
+       readPayloadChecksum := binary.LittleEndian.Uint32(headerBuf[:4])

Review Comment:
   NIT: `binary.LittleEndian.Uint32(headerBuf[:crc32Size])`



##########
go.mod:
##########
@@ -20,11 +20,14 @@ module github.com/gocql/gocql
 require (
        github.com/bitly/go-hostpool v0.0.0-20171023180738-a3a6125de932 // 
indirect
        github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869 // 
indirect
+       github.com/gocql/gocql/lz4 v0.0.0-20240925165811-953e0df999ca

Review Comment:
   This doesn't seem intended?



##########
frame.go:
##########
@@ -2070,3 +2133,247 @@ func (f *framer) writeBytesMap(m map[string][]byte) {
                f.writeBytes(v)
        }
 }
+
+func (f *framer) prepareModernLayout() error {
+       // Ensure protocol version is V5 or higher
+       if f.proto < protoVersion5 {
+               panic("Modern layout is not supported with version V4 or less")
+       }
+
+       selfContained := true
+
+       var (
+               adjustedBuf []byte
+               tempBuf     []byte
+               err         error
+       )
+
+       // Process the buffer in chunks if it exceeds the max payload size
+       for len(f.buf) > maxPayloadSize {
+               if f.compres != nil {
+                       tempBuf, err = 
newCompressedFrame(f.buf[:maxPayloadSize], false, f.compres)
+               } else {
+                       tempBuf, err = 
newUncompressedFrame(f.buf[:maxPayloadSize], false)
+               }
+               if err != nil {
+                       return err
+               }
+
+               adjustedBuf = append(adjustedBuf, tempBuf...)
+               f.buf = f.buf[maxPayloadSize:]
+               selfContained = false
+       }
+
+       // Process the remaining buffer
+       if f.compres != nil {
+               tempBuf, err = newCompressedFrame(f.buf, selfContained, 
f.compres)
+       } else {
+               tempBuf, err = newUncompressedFrame(f.buf, selfContained)
+       }
+       if err != nil {
+               return err
+       }
+
+       adjustedBuf = append(adjustedBuf, tempBuf...)
+       f.buf = adjustedBuf
+
+       return nil
+}
+
+func readUncompressedFrame(r io.Reader) ([]byte, bool, error) {
+       const headerSize = 6
+       header := [headerSize + 1]byte{}
+
+       // Read the frame header
+       if _, err := io.ReadFull(r, header[:headerSize]); err != nil {
+               return nil, false, fmt.Errorf("gocql: failed to read 
uncompressed frame, err: %w", err)
+       }
+
+       // Compute and verify the header CRC24
+       computedHeaderCRC24 := KoopmanChecksum(header[:3])
+       readHeaderCRC24 := binary.LittleEndian.Uint32(header[3:]) & 0xFFFFFF
+       if computedHeaderCRC24 != readHeaderCRC24 {
+               return nil, false, fmt.Errorf("gocql: crc24 mismatch in frame 
header, computed: %d, got: %d", computedHeaderCRC24, readHeaderCRC24)
+       }
+
+       // Extract the payload length and self-contained flag
+       headerInt := binary.LittleEndian.Uint32(header[:4])
+       payloadLen := int(headerInt & 0x1FFFF)
+       isSelfContained := (headerInt & (1 << 17)) != 0
+
+       // Read the payload
+       payload := make([]byte, payloadLen)
+       if _, err := io.ReadFull(r, payload); err != nil {
+               return nil, false, fmt.Errorf("gocql: failed to read 
uncompressed frame payload, err: %w", err)
+       }
+
+       // Read and verify the payload CRC32
+       if _, err := io.ReadFull(r, header[:4]); err != nil {
+               return nil, false, fmt.Errorf("gocql: failed to read payload 
crc32, err: %w", err)
+       }
+
+       computedPayloadCRC32 := ChecksumIEEE(payload)
+       readPayloadCRC32 := binary.LittleEndian.Uint32(header[:4])
+       if computedPayloadCRC32 != readPayloadCRC32 {
+               return nil, false, fmt.Errorf("gocql: payload crc32 mismatch, 
computed: %d, got: %d", computedPayloadCRC32, readPayloadCRC32)
+       }
+
+       return payload, isSelfContained, nil
+}
+
+func newUncompressedFrame(payload []byte, isSelfContained bool) ([]byte, 
error) {
+       const (
+               headerSize       = 6
+               selfContainedBit = 1 << 17
+       )
+
+       payloadLen := len(payload)
+       if payloadLen > maxPayloadSize {
+               return nil, fmt.Errorf("payload length (%d) exceeds maximum 
size of 128 KiB", payloadLen)
+       }
+
+       // Create the frame
+       frameSize := headerSize + payloadLen + 4 // 4 bytes for CRC32
+       frame := make([]byte, frameSize)
+
+       // First 3 bytes: payload length and self-contained flag
+       headerInt := uint32(payloadLen) & 0x1FFFF
+       if isSelfContained {
+               headerInt |= selfContainedBit // Set the self-contained flag
+       }
+
+       // Encode the first 3 bytes as a single little-endian integer
+       frame[0] = byte(headerInt)
+       frame[1] = byte(headerInt >> 8)
+       frame[2] = byte(headerInt >> 16)
+
+       // Calculate CRC24 for the first 3 bytes of the header
+       crc := KoopmanChecksum(frame[:3])
+
+       // Encode CRC24 into the next 3 bytes of the header
+       frame[3] = byte(crc)
+       frame[4] = byte(crc >> 8)
+       frame[5] = byte(crc >> 16)
+
+       copy(frame[headerSize:], payload) // Copy the payload to the frame
+
+       // Calculate CRC32 for the payload
+       payloadCRC32 := ChecksumIEEE(payload)
+       binary.LittleEndian.PutUint32(frame[headerSize+payloadLen:], 
payloadCRC32)
+
+       return frame, nil
+}
+
+func newCompressedFrame(uncompressedPayload []byte, isSelfContained bool, 
compressor Compressor) ([]byte, error) {
+       uncompressedLen := len(uncompressedPayload)
+       if uncompressedLen > maxPayloadSize {
+               return nil, fmt.Errorf("uncompressed compressed payload length 
exceedes max size of frame payload %d/%d", uncompressedLen, maxPayloadSize)
+       }
+
+       compressedPayload, err := compressor.Encode(uncompressedPayload)
+       if err != nil {
+               return nil, err
+       }
+
+       // Skip the first 4 bytes because the size of the uncompressed payload 
is written in the frame header, not in the
+       // body of the compressed envelope
+       compressedPayload = compressedPayload[4:]
+
+       compressedLen := len(compressedPayload)
+
+       // Compression is not worth it
+       if uncompressedLen < compressedLen {
+               // native_protocol_v5.spec
+               // 2.2
+               //  An uncompressed length of 0 signals that the compressed 
payload
+               //  should be used as-is and not decompressed.
+               compressedPayload = uncompressedPayload
+               compressedLen = uncompressedLen
+               uncompressedLen = 0
+       }
+
+       // Combine compressed and uncompressed lengths and set the 
self-contained flag if needed
+       combined := uint64(compressedLen) | uint64(uncompressedLen)<<17
+       if isSelfContained {
+               combined |= 1 << 34
+       }
+
+       var headerBuf [8]byte
+
+       // Write the combined value into the header buffer
+       binary.LittleEndian.PutUint64(headerBuf[:], combined)
+
+       // Create a buffer with enough capacity to hold the header, compressed 
payload, and checksums
+       buf := bytes.NewBuffer(make([]byte, 0, 8+compressedLen+4))
+
+       // Write the first 5 bytes of the header (compressed and uncompressed 
sizes)
+       buf.Write(headerBuf[:5])
+
+       // Compute and write the CRC24 checksum of the first 5 bytes
+       headerChecksum := KoopmanChecksum(headerBuf[:5])
+       binary.LittleEndian.PutUint32(headerBuf[:], headerChecksum)
+       buf.Write(headerBuf[:3])
+       buf.Write(compressedPayload)
+
+       // Compute and write the CRC32 checksum of the payload
+       payloadChecksum := ChecksumIEEE(compressedPayload)
+       binary.LittleEndian.PutUint32(headerBuf[:], payloadChecksum)
+       buf.Write(headerBuf[:4])
+
+       return buf.Bytes(), nil
+}
+
+func readCompressedFrame(r io.Reader, compressor Compressor) ([]byte, bool, 
error) {
+       var (

Review Comment:
   NIT: add constants: `headerSize = 5`, `crc24Size = 3`



##########
frame.go:
##########
@@ -2070,3 +2133,247 @@ func (f *framer) writeBytesMap(m map[string][]byte) {
                f.writeBytes(v)
        }
 }
+
+func (f *framer) prepareModernLayout() error {
+       // Ensure protocol version is V5 or higher
+       if f.proto < protoVersion5 {
+               panic("Modern layout is not supported with version V4 or less")
+       }
+
+       selfContained := true
+
+       var (
+               adjustedBuf []byte
+               tempBuf     []byte
+               err         error
+       )
+
+       // Process the buffer in chunks if it exceeds the max payload size
+       for len(f.buf) > maxPayloadSize {
+               if f.compres != nil {
+                       tempBuf, err = 
newCompressedFrame(f.buf[:maxPayloadSize], false, f.compres)
+               } else {
+                       tempBuf, err = 
newUncompressedFrame(f.buf[:maxPayloadSize], false)
+               }
+               if err != nil {
+                       return err
+               }
+
+               adjustedBuf = append(adjustedBuf, tempBuf...)
+               f.buf = f.buf[maxPayloadSize:]
+               selfContained = false
+       }
+
+       // Process the remaining buffer
+       if f.compres != nil {
+               tempBuf, err = newCompressedFrame(f.buf, selfContained, 
f.compres)
+       } else {
+               tempBuf, err = newUncompressedFrame(f.buf, selfContained)
+       }
+       if err != nil {
+               return err
+       }
+
+       adjustedBuf = append(adjustedBuf, tempBuf...)
+       f.buf = adjustedBuf
+
+       return nil
+}
+
+func readUncompressedFrame(r io.Reader) ([]byte, bool, error) {
+       const headerSize = 6
+       header := [headerSize + 1]byte{}
+
+       // Read the frame header
+       if _, err := io.ReadFull(r, header[:headerSize]); err != nil {
+               return nil, false, fmt.Errorf("gocql: failed to read 
uncompressed frame, err: %w", err)
+       }
+
+       // Compute and verify the header CRC24
+       computedHeaderCRC24 := KoopmanChecksum(header[:3])
+       readHeaderCRC24 := binary.LittleEndian.Uint32(header[3:]) & 0xFFFFFF
+       if computedHeaderCRC24 != readHeaderCRC24 {
+               return nil, false, fmt.Errorf("gocql: crc24 mismatch in frame 
header, computed: %d, got: %d", computedHeaderCRC24, readHeaderCRC24)
+       }
+
+       // Extract the payload length and self-contained flag
+       headerInt := binary.LittleEndian.Uint32(header[:4])
+       payloadLen := int(headerInt & 0x1FFFF)
+       isSelfContained := (headerInt & (1 << 17)) != 0
+
+       // Read the payload
+       payload := make([]byte, payloadLen)
+       if _, err := io.ReadFull(r, payload); err != nil {
+               return nil, false, fmt.Errorf("gocql: failed to read 
uncompressed frame payload, err: %w", err)
+       }
+
+       // Read and verify the payload CRC32
+       if _, err := io.ReadFull(r, header[:4]); err != nil {
+               return nil, false, fmt.Errorf("gocql: failed to read payload 
crc32, err: %w", err)
+       }
+
+       computedPayloadCRC32 := ChecksumIEEE(payload)
+       readPayloadCRC32 := binary.LittleEndian.Uint32(header[:4])
+       if computedPayloadCRC32 != readPayloadCRC32 {
+               return nil, false, fmt.Errorf("gocql: payload crc32 mismatch, 
computed: %d, got: %d", computedPayloadCRC32, readPayloadCRC32)
+       }
+
+       return payload, isSelfContained, nil
+}
+
+func newUncompressedFrame(payload []byte, isSelfContained bool) ([]byte, 
error) {
+       const (
+               headerSize       = 6
+               selfContainedBit = 1 << 17
+       )
+
+       payloadLen := len(payload)
+       if payloadLen > maxPayloadSize {
+               return nil, fmt.Errorf("payload length (%d) exceeds maximum 
size of 128 KiB", payloadLen)
+       }
+
+       // Create the frame
+       frameSize := headerSize + payloadLen + 4 // 4 bytes for CRC32
+       frame := make([]byte, frameSize)
+
+       // First 3 bytes: payload length and self-contained flag
+       headerInt := uint32(payloadLen) & 0x1FFFF
+       if isSelfContained {
+               headerInt |= selfContainedBit // Set the self-contained flag
+       }
+
+       // Encode the first 3 bytes as a single little-endian integer
+       frame[0] = byte(headerInt)
+       frame[1] = byte(headerInt >> 8)
+       frame[2] = byte(headerInt >> 16)
+
+       // Calculate CRC24 for the first 3 bytes of the header
+       crc := KoopmanChecksum(frame[:3])
+
+       // Encode CRC24 into the next 3 bytes of the header
+       frame[3] = byte(crc)
+       frame[4] = byte(crc >> 8)
+       frame[5] = byte(crc >> 16)
+
+       copy(frame[headerSize:], payload) // Copy the payload to the frame
+
+       // Calculate CRC32 for the payload
+       payloadCRC32 := ChecksumIEEE(payload)
+       binary.LittleEndian.PutUint32(frame[headerSize+payloadLen:], 
payloadCRC32)
+
+       return frame, nil
+}
+
+func newCompressedFrame(uncompressedPayload []byte, isSelfContained bool, 
compressor Compressor) ([]byte, error) {
+       uncompressedLen := len(uncompressedPayload)
+       if uncompressedLen > maxPayloadSize {
+               return nil, fmt.Errorf("uncompressed compressed payload length 
exceedes max size of frame payload %d/%d", uncompressedLen, maxPayloadSize)
+       }
+
+       compressedPayload, err := compressor.Encode(uncompressedPayload)
+       if err != nil {
+               return nil, err
+       }
+
+       // Skip the first 4 bytes because the size of the uncompressed payload 
is written in the frame header, not in the
+       // body of the compressed envelope
+       compressedPayload = compressedPayload[4:]
+
+       compressedLen := len(compressedPayload)
+
+       // Compression is not worth it
+       if uncompressedLen < compressedLen {
+               // native_protocol_v5.spec
+               // 2.2
+               //  An uncompressed length of 0 signals that the compressed 
payload
+               //  should be used as-is and not decompressed.
+               compressedPayload = uncompressedPayload
+               compressedLen = uncompressedLen
+               uncompressedLen = 0
+       }
+
+       // Combine compressed and uncompressed lengths and set the 
self-contained flag if needed
+       combined := uint64(compressedLen) | uint64(uncompressedLen)<<17
+       if isSelfContained {
+               combined |= 1 << 34
+       }
+
+       var headerBuf [8]byte
+
+       // Write the combined value into the header buffer
+       binary.LittleEndian.PutUint64(headerBuf[:], combined)
+
+       // Create a buffer with enough capacity to hold the header, compressed 
payload, and checksums
+       buf := bytes.NewBuffer(make([]byte, 0, 8+compressedLen+4))
+
+       // Write the first 5 bytes of the header (compressed and uncompressed 
sizes)
+       buf.Write(headerBuf[:5])
+
+       // Compute and write the CRC24 checksum of the first 5 bytes
+       headerChecksum := KoopmanChecksum(headerBuf[:5])
+       binary.LittleEndian.PutUint32(headerBuf[:], headerChecksum)
+       buf.Write(headerBuf[:3])
+       buf.Write(compressedPayload)
+
+       // Compute and write the CRC32 checksum of the payload
+       payloadChecksum := ChecksumIEEE(compressedPayload)
+       binary.LittleEndian.PutUint32(headerBuf[:], payloadChecksum)
+       buf.Write(headerBuf[:4])
+
+       return buf.Bytes(), nil
+}
+
+func readCompressedFrame(r io.Reader, compressor Compressor) ([]byte, bool, 
error) {
+       var (
+               headerBuf [8]byte
+               err       error
+       )
+
+       if _, err = io.ReadFull(r, headerBuf[:]); err != nil {
+               return nil, false, err
+       }
+
+       // Reading checksum from frame header
+       readHeaderChecksum := uint32(headerBuf[5]) | uint32(headerBuf[6])<<8 | 
uint32(headerBuf[7])<<16
+       if computedHeaderChecksum := KoopmanChecksum(headerBuf[:5]); 
computedHeaderChecksum != readHeaderChecksum {

Review Comment:
   NIT: `KoopmanChecksum(headerBuf[:headerSize]);`



##########
conn_test.go:
##########
@@ -1300,3 +1301,96 @@ func (srv *TestServer) readFrame(conn net.Conn) 
(*framer, error) {
 
        return framer, nil
 }
+
+func TestConnProcessAllEnvelopesInSingleFrame(t *testing.T) {

Review Comment:
   `TestConnProcessAllFramesInSingleSegment`



##########
frame.go:
##########
@@ -2070,3 +2133,247 @@ func (f *framer) writeBytesMap(m map[string][]byte) {
                f.writeBytes(v)
        }
 }
+
+func (f *framer) prepareModernLayout() error {
+       // Ensure protocol version is V5 or higher
+       if f.proto < protoVersion5 {
+               panic("Modern layout is not supported with version V4 or less")
+       }
+
+       selfContained := true
+
+       var (
+               adjustedBuf []byte
+               tempBuf     []byte
+               err         error
+       )
+
+       // Process the buffer in chunks if it exceeds the max payload size
+       for len(f.buf) > maxPayloadSize {
+               if f.compres != nil {
+                       tempBuf, err = 
newCompressedFrame(f.buf[:maxPayloadSize], false, f.compres)
+               } else {
+                       tempBuf, err = 
newUncompressedFrame(f.buf[:maxPayloadSize], false)
+               }
+               if err != nil {
+                       return err
+               }
+
+               adjustedBuf = append(adjustedBuf, tempBuf...)
+               f.buf = f.buf[maxPayloadSize:]
+               selfContained = false
+       }
+
+       // Process the remaining buffer
+       if f.compres != nil {
+               tempBuf, err = newCompressedFrame(f.buf, selfContained, 
f.compres)
+       } else {
+               tempBuf, err = newUncompressedFrame(f.buf, selfContained)
+       }
+       if err != nil {
+               return err
+       }
+
+       adjustedBuf = append(adjustedBuf, tempBuf...)
+       f.buf = adjustedBuf
+
+       return nil
+}
+
+func readUncompressedFrame(r io.Reader) ([]byte, bool, error) {
+       const headerSize = 6
+       header := [headerSize + 1]byte{}
+
+       // Read the frame header
+       if _, err := io.ReadFull(r, header[:headerSize]); err != nil {
+               return nil, false, fmt.Errorf("gocql: failed to read 
uncompressed frame, err: %w", err)
+       }
+
+       // Compute and verify the header CRC24
+       computedHeaderCRC24 := KoopmanChecksum(header[:3])
+       readHeaderCRC24 := binary.LittleEndian.Uint32(header[3:]) & 0xFFFFFF
+       if computedHeaderCRC24 != readHeaderCRC24 {
+               return nil, false, fmt.Errorf("gocql: crc24 mismatch in frame 
header, computed: %d, got: %d", computedHeaderCRC24, readHeaderCRC24)
+       }
+
+       // Extract the payload length and self-contained flag
+       headerInt := binary.LittleEndian.Uint32(header[:4])
+       payloadLen := int(headerInt & 0x1FFFF)
+       isSelfContained := (headerInt & (1 << 17)) != 0
+
+       // Read the payload
+       payload := make([]byte, payloadLen)
+       if _, err := io.ReadFull(r, payload); err != nil {
+               return nil, false, fmt.Errorf("gocql: failed to read 
uncompressed frame payload, err: %w", err)
+       }
+
+       // Read and verify the payload CRC32
+       if _, err := io.ReadFull(r, header[:4]); err != nil {
+               return nil, false, fmt.Errorf("gocql: failed to read payload 
crc32, err: %w", err)
+       }
+
+       computedPayloadCRC32 := ChecksumIEEE(payload)
+       readPayloadCRC32 := binary.LittleEndian.Uint32(header[:4])

Review Comment:
   NIT `binary.LittleEndian.Uint32(header[:crc32Size])`



##########
frame.go:
##########
@@ -2070,3 +2133,247 @@ func (f *framer) writeBytesMap(m map[string][]byte) {
                f.writeBytes(v)
        }
 }
+
+func (f *framer) prepareModernLayout() error {
+       // Ensure protocol version is V5 or higher
+       if f.proto < protoVersion5 {
+               panic("Modern layout is not supported with version V4 or less")
+       }
+
+       selfContained := true
+
+       var (
+               adjustedBuf []byte
+               tempBuf     []byte
+               err         error
+       )
+
+       // Process the buffer in chunks if it exceeds the max payload size
+       for len(f.buf) > maxPayloadSize {
+               if f.compres != nil {
+                       tempBuf, err = 
newCompressedFrame(f.buf[:maxPayloadSize], false, f.compres)
+               } else {
+                       tempBuf, err = 
newUncompressedFrame(f.buf[:maxPayloadSize], false)
+               }
+               if err != nil {
+                       return err
+               }
+
+               adjustedBuf = append(adjustedBuf, tempBuf...)
+               f.buf = f.buf[maxPayloadSize:]
+               selfContained = false
+       }
+
+       // Process the remaining buffer
+       if f.compres != nil {
+               tempBuf, err = newCompressedFrame(f.buf, selfContained, 
f.compres)
+       } else {
+               tempBuf, err = newUncompressedFrame(f.buf, selfContained)
+       }
+       if err != nil {
+               return err
+       }
+
+       adjustedBuf = append(adjustedBuf, tempBuf...)
+       f.buf = adjustedBuf
+
+       return nil
+}
+
+func readUncompressedFrame(r io.Reader) ([]byte, bool, error) {
+       const headerSize = 6
+       header := [headerSize + 1]byte{}
+
+       // Read the frame header
+       if _, err := io.ReadFull(r, header[:headerSize]); err != nil {
+               return nil, false, fmt.Errorf("gocql: failed to read 
uncompressed frame, err: %w", err)
+       }
+
+       // Compute and verify the header CRC24
+       computedHeaderCRC24 := KoopmanChecksum(header[:3])
+       readHeaderCRC24 := binary.LittleEndian.Uint32(header[3:]) & 0xFFFFFF
+       if computedHeaderCRC24 != readHeaderCRC24 {
+               return nil, false, fmt.Errorf("gocql: crc24 mismatch in frame 
header, computed: %d, got: %d", computedHeaderCRC24, readHeaderCRC24)
+       }
+
+       // Extract the payload length and self-contained flag
+       headerInt := binary.LittleEndian.Uint32(header[:4])
+       payloadLen := int(headerInt & 0x1FFFF)
+       isSelfContained := (headerInt & (1 << 17)) != 0
+
+       // Read the payload
+       payload := make([]byte, payloadLen)
+       if _, err := io.ReadFull(r, payload); err != nil {
+               return nil, false, fmt.Errorf("gocql: failed to read 
uncompressed frame payload, err: %w", err)
+       }
+
+       // Read and verify the payload CRC32
+       if _, err := io.ReadFull(r, header[:4]); err != nil {
+               return nil, false, fmt.Errorf("gocql: failed to read payload 
crc32, err: %w", err)
+       }
+
+       computedPayloadCRC32 := ChecksumIEEE(payload)
+       readPayloadCRC32 := binary.LittleEndian.Uint32(header[:4])
+       if computedPayloadCRC32 != readPayloadCRC32 {
+               return nil, false, fmt.Errorf("gocql: payload crc32 mismatch, 
computed: %d, got: %d", computedPayloadCRC32, readPayloadCRC32)
+       }
+
+       return payload, isSelfContained, nil
+}
+
+func newUncompressedFrame(payload []byte, isSelfContained bool) ([]byte, 
error) {
+       const (
+               headerSize       = 6
+               selfContainedBit = 1 << 17
+       )
+
+       payloadLen := len(payload)
+       if payloadLen > maxPayloadSize {
+               return nil, fmt.Errorf("payload length (%d) exceeds maximum 
size of 128 KiB", payloadLen)
+       }
+
+       // Create the frame
+       frameSize := headerSize + payloadLen + 4 // 4 bytes for CRC32
+       frame := make([]byte, frameSize)
+
+       // First 3 bytes: payload length and self-contained flag
+       headerInt := uint32(payloadLen) & 0x1FFFF
+       if isSelfContained {
+               headerInt |= selfContainedBit // Set the self-contained flag
+       }
+
+       // Encode the first 3 bytes as a single little-endian integer
+       frame[0] = byte(headerInt)
+       frame[1] = byte(headerInt >> 8)
+       frame[2] = byte(headerInt >> 16)
+
+       // Calculate CRC24 for the first 3 bytes of the header
+       crc := KoopmanChecksum(frame[:3])
+
+       // Encode CRC24 into the next 3 bytes of the header
+       frame[3] = byte(crc)
+       frame[4] = byte(crc >> 8)
+       frame[5] = byte(crc >> 16)
+
+       copy(frame[headerSize:], payload) // Copy the payload to the frame
+
+       // Calculate CRC32 for the payload
+       payloadCRC32 := ChecksumIEEE(payload)
+       binary.LittleEndian.PutUint32(frame[headerSize+payloadLen:], 
payloadCRC32)
+
+       return frame, nil
+}
+
+func newCompressedFrame(uncompressedPayload []byte, isSelfContained bool, 
compressor Compressor) ([]byte, error) {
+       uncompressedLen := len(uncompressedPayload)
+       if uncompressedLen > maxPayloadSize {
+               return nil, fmt.Errorf("uncompressed compressed payload length 
exceedes max size of frame payload %d/%d", uncompressedLen, maxPayloadSize)
+       }
+
+       compressedPayload, err := compressor.Encode(uncompressedPayload)
+       if err != nil {
+               return nil, err
+       }
+
+       // Skip the first 4 bytes because the size of the uncompressed payload 
is written in the frame header, not in the
+       // body of the compressed envelope
+       compressedPayload = compressedPayload[4:]
+
+       compressedLen := len(compressedPayload)
+
+       // Compression is not worth it
+       if uncompressedLen < compressedLen {
+               // native_protocol_v5.spec
+               // 2.2
+               //  An uncompressed length of 0 signals that the compressed 
payload
+               //  should be used as-is and not decompressed.
+               compressedPayload = uncompressedPayload
+               compressedLen = uncompressedLen
+               uncompressedLen = 0
+       }
+
+       // Combine compressed and uncompressed lengths and set the 
self-contained flag if needed
+       combined := uint64(compressedLen) | uint64(uncompressedLen)<<17
+       if isSelfContained {
+               combined |= 1 << 34
+       }
+
+       var headerBuf [8]byte
+
+       // Write the combined value into the header buffer
+       binary.LittleEndian.PutUint64(headerBuf[:], combined)
+
+       // Create a buffer with enough capacity to hold the header, compressed 
payload, and checksums
+       buf := bytes.NewBuffer(make([]byte, 0, 8+compressedLen+4))
+
+       // Write the first 5 bytes of the header (compressed and uncompressed 
sizes)
+       buf.Write(headerBuf[:5])

Review Comment:
   NIT: `buf.Write(headerBuf[:headerSize])`



##########
frame.go:
##########
@@ -2070,3 +2133,247 @@ func (f *framer) writeBytesMap(m map[string][]byte) {
                f.writeBytes(v)
        }
 }
+
+func (f *framer) prepareModernLayout() error {
+       // Ensure protocol version is V5 or higher
+       if f.proto < protoVersion5 {
+               panic("Modern layout is not supported with version V4 or less")
+       }
+
+       selfContained := true
+
+       var (
+               adjustedBuf []byte
+               tempBuf     []byte
+               err         error
+       )
+
+       // Process the buffer in chunks if it exceeds the max payload size
+       for len(f.buf) > maxPayloadSize {
+               if f.compres != nil {
+                       tempBuf, err = 
newCompressedFrame(f.buf[:maxPayloadSize], false, f.compres)
+               } else {
+                       tempBuf, err = 
newUncompressedFrame(f.buf[:maxPayloadSize], false)
+               }
+               if err != nil {
+                       return err
+               }
+
+               adjustedBuf = append(adjustedBuf, tempBuf...)
+               f.buf = f.buf[maxPayloadSize:]
+               selfContained = false
+       }
+
+       // Process the remaining buffer
+       if f.compres != nil {
+               tempBuf, err = newCompressedFrame(f.buf, selfContained, 
f.compres)
+       } else {
+               tempBuf, err = newUncompressedFrame(f.buf, selfContained)
+       }
+       if err != nil {
+               return err
+       }
+
+       adjustedBuf = append(adjustedBuf, tempBuf...)
+       f.buf = adjustedBuf
+
+       return nil
+}
+
+func readUncompressedFrame(r io.Reader) ([]byte, bool, error) {
+       const headerSize = 6
+       header := [headerSize + 1]byte{}
+
+       // Read the frame header
+       if _, err := io.ReadFull(r, header[:headerSize]); err != nil {
+               return nil, false, fmt.Errorf("gocql: failed to read 
uncompressed frame, err: %w", err)
+       }
+
+       // Compute and verify the header CRC24
+       computedHeaderCRC24 := KoopmanChecksum(header[:3])
+       readHeaderCRC24 := binary.LittleEndian.Uint32(header[3:]) & 0xFFFFFF
+       if computedHeaderCRC24 != readHeaderCRC24 {
+               return nil, false, fmt.Errorf("gocql: crc24 mismatch in frame 
header, computed: %d, got: %d", computedHeaderCRC24, readHeaderCRC24)
+       }
+
+       // Extract the payload length and self-contained flag
+       headerInt := binary.LittleEndian.Uint32(header[:4])
+       payloadLen := int(headerInt & 0x1FFFF)
+       isSelfContained := (headerInt & (1 << 17)) != 0
+
+       // Read the payload
+       payload := make([]byte, payloadLen)
+       if _, err := io.ReadFull(r, payload); err != nil {
+               return nil, false, fmt.Errorf("gocql: failed to read 
uncompressed frame payload, err: %w", err)
+       }
+
+       // Read and verify the payload CRC32
+       if _, err := io.ReadFull(r, header[:4]); err != nil {
+               return nil, false, fmt.Errorf("gocql: failed to read payload 
crc32, err: %w", err)
+       }
+
+       computedPayloadCRC32 := ChecksumIEEE(payload)
+       readPayloadCRC32 := binary.LittleEndian.Uint32(header[:4])
+       if computedPayloadCRC32 != readPayloadCRC32 {
+               return nil, false, fmt.Errorf("gocql: payload crc32 mismatch, 
computed: %d, got: %d", computedPayloadCRC32, readPayloadCRC32)
+       }
+
+       return payload, isSelfContained, nil
+}
+
+func newUncompressedFrame(payload []byte, isSelfContained bool) ([]byte, 
error) {
+       const (
+               headerSize       = 6
+               selfContainedBit = 1 << 17
+       )
+
+       payloadLen := len(payload)
+       if payloadLen > maxPayloadSize {
+               return nil, fmt.Errorf("payload length (%d) exceeds maximum 
size of 128 KiB", payloadLen)
+       }
+
+       // Create the frame
+       frameSize := headerSize + payloadLen + 4 // 4 bytes for CRC32
+       frame := make([]byte, frameSize)

Review Comment:
   ```
           // Create the frame
        segmentSize := headerSize + payloadLen + crc32Size
        segment := make([]byte, segmentSize)
   ```



##########
frame.go:
##########
@@ -2070,3 +2133,247 @@ func (f *framer) writeBytesMap(m map[string][]byte) {
                f.writeBytes(v)
        }
 }
+
+func (f *framer) prepareModernLayout() error {
+       // Ensure protocol version is V5 or higher
+       if f.proto < protoVersion5 {
+               panic("Modern layout is not supported with version V4 or less")
+       }
+
+       selfContained := true
+
+       var (
+               adjustedBuf []byte
+               tempBuf     []byte
+               err         error
+       )
+
+       // Process the buffer in chunks if it exceeds the max payload size
+       for len(f.buf) > maxPayloadSize {
+               if f.compres != nil {
+                       tempBuf, err = 
newCompressedFrame(f.buf[:maxPayloadSize], false, f.compres)
+               } else {
+                       tempBuf, err = 
newUncompressedFrame(f.buf[:maxPayloadSize], false)
+               }
+               if err != nil {
+                       return err
+               }
+
+               adjustedBuf = append(adjustedBuf, tempBuf...)
+               f.buf = f.buf[maxPayloadSize:]
+               selfContained = false
+       }
+
+       // Process the remaining buffer
+       if f.compres != nil {
+               tempBuf, err = newCompressedFrame(f.buf, selfContained, 
f.compres)
+       } else {
+               tempBuf, err = newUncompressedFrame(f.buf, selfContained)
+       }
+       if err != nil {
+               return err
+       }
+
+       adjustedBuf = append(adjustedBuf, tempBuf...)
+       f.buf = adjustedBuf
+
+       return nil
+}
+
+func readUncompressedFrame(r io.Reader) ([]byte, bool, error) {
+       const headerSize = 6
+       header := [headerSize + 1]byte{}
+
+       // Read the frame header
+       if _, err := io.ReadFull(r, header[:headerSize]); err != nil {
+               return nil, false, fmt.Errorf("gocql: failed to read 
uncompressed frame, err: %w", err)
+       }
+
+       // Compute and verify the header CRC24
+       computedHeaderCRC24 := KoopmanChecksum(header[:3])
+       readHeaderCRC24 := binary.LittleEndian.Uint32(header[3:]) & 0xFFFFFF
+       if computedHeaderCRC24 != readHeaderCRC24 {
+               return nil, false, fmt.Errorf("gocql: crc24 mismatch in frame 
header, computed: %d, got: %d", computedHeaderCRC24, readHeaderCRC24)
+       }
+
+       // Extract the payload length and self-contained flag
+       headerInt := binary.LittleEndian.Uint32(header[:4])
+       payloadLen := int(headerInt & 0x1FFFF)
+       isSelfContained := (headerInt & (1 << 17)) != 0
+
+       // Read the payload
+       payload := make([]byte, payloadLen)
+       if _, err := io.ReadFull(r, payload); err != nil {
+               return nil, false, fmt.Errorf("gocql: failed to read 
uncompressed frame payload, err: %w", err)
+       }
+
+       // Read and verify the payload CRC32
+       if _, err := io.ReadFull(r, header[:4]); err != nil {
+               return nil, false, fmt.Errorf("gocql: failed to read payload 
crc32, err: %w", err)
+       }
+
+       computedPayloadCRC32 := ChecksumIEEE(payload)
+       readPayloadCRC32 := binary.LittleEndian.Uint32(header[:4])
+       if computedPayloadCRC32 != readPayloadCRC32 {
+               return nil, false, fmt.Errorf("gocql: payload crc32 mismatch, 
computed: %d, got: %d", computedPayloadCRC32, readPayloadCRC32)
+       }
+
+       return payload, isSelfContained, nil
+}
+
+func newUncompressedFrame(payload []byte, isSelfContained bool) ([]byte, 
error) {
+       const (
+               headerSize       = 6
+               selfContainedBit = 1 << 17
+       )
+
+       payloadLen := len(payload)
+       if payloadLen > maxPayloadSize {
+               return nil, fmt.Errorf("payload length (%d) exceeds maximum 
size of 128 KiB", payloadLen)
+       }
+
+       // Create the frame
+       frameSize := headerSize + payloadLen + 4 // 4 bytes for CRC32
+       frame := make([]byte, frameSize)
+
+       // First 3 bytes: payload length and self-contained flag
+       headerInt := uint32(payloadLen) & 0x1FFFF
+       if isSelfContained {
+               headerInt |= selfContainedBit // Set the self-contained flag
+       }
+
+       // Encode the first 3 bytes as a single little-endian integer
+       frame[0] = byte(headerInt)
+       frame[1] = byte(headerInt >> 8)
+       frame[2] = byte(headerInt >> 16)
+
+       // Calculate CRC24 for the first 3 bytes of the header
+       crc := KoopmanChecksum(frame[:3])
+
+       // Encode CRC24 into the next 3 bytes of the header
+       frame[3] = byte(crc)
+       frame[4] = byte(crc >> 8)
+       frame[5] = byte(crc >> 16)
+
+       copy(frame[headerSize:], payload) // Copy the payload to the frame
+
+       // Calculate CRC32 for the payload
+       payloadCRC32 := ChecksumIEEE(payload)
+       binary.LittleEndian.PutUint32(frame[headerSize+payloadLen:], 
payloadCRC32)
+
+       return frame, nil
+}
+
+func newCompressedFrame(uncompressedPayload []byte, isSelfContained bool, 
compressor Compressor) ([]byte, error) {
+       uncompressedLen := len(uncompressedPayload)
+       if uncompressedLen > maxPayloadSize {
+               return nil, fmt.Errorf("uncompressed compressed payload length 
exceedes max size of frame payload %d/%d", uncompressedLen, maxPayloadSize)
+       }
+
+       compressedPayload, err := compressor.Encode(uncompressedPayload)
+       if err != nil {
+               return nil, err
+       }
+
+       // Skip the first 4 bytes because the size of the uncompressed payload 
is written in the frame header, not in the
+       // body of the compressed envelope

Review Comment:
   ```
   // Skip the first 4 bytes because the size of the uncompressed payload is 
written in the segment header, not in the
   // body of the compressed frame
   ```



##########
frame.go:
##########
@@ -2070,3 +2133,247 @@ func (f *framer) writeBytesMap(m map[string][]byte) {
                f.writeBytes(v)
        }
 }
+
+func (f *framer) prepareModernLayout() error {
+       // Ensure protocol version is V5 or higher
+       if f.proto < protoVersion5 {
+               panic("Modern layout is not supported with version V4 or less")
+       }
+
+       selfContained := true
+
+       var (
+               adjustedBuf []byte
+               tempBuf     []byte
+               err         error
+       )
+
+       // Process the buffer in chunks if it exceeds the max payload size
+       for len(f.buf) > maxPayloadSize {
+               if f.compres != nil {
+                       tempBuf, err = 
newCompressedFrame(f.buf[:maxPayloadSize], false, f.compres)
+               } else {
+                       tempBuf, err = 
newUncompressedFrame(f.buf[:maxPayloadSize], false)
+               }
+               if err != nil {
+                       return err
+               }
+
+               adjustedBuf = append(adjustedBuf, tempBuf...)
+               f.buf = f.buf[maxPayloadSize:]
+               selfContained = false
+       }
+
+       // Process the remaining buffer
+       if f.compres != nil {
+               tempBuf, err = newCompressedFrame(f.buf, selfContained, 
f.compres)
+       } else {
+               tempBuf, err = newUncompressedFrame(f.buf, selfContained)
+       }
+       if err != nil {
+               return err
+       }
+
+       adjustedBuf = append(adjustedBuf, tempBuf...)
+       f.buf = adjustedBuf
+
+       return nil
+}
+
+func readUncompressedFrame(r io.Reader) ([]byte, bool, error) {
+       const headerSize = 6
+       header := [headerSize + 1]byte{}
+
+       // Read the frame header
+       if _, err := io.ReadFull(r, header[:headerSize]); err != nil {
+               return nil, false, fmt.Errorf("gocql: failed to read 
uncompressed frame, err: %w", err)
+       }
+
+       // Compute and verify the header CRC24
+       computedHeaderCRC24 := KoopmanChecksum(header[:3])
+       readHeaderCRC24 := binary.LittleEndian.Uint32(header[3:]) & 0xFFFFFF
+       if computedHeaderCRC24 != readHeaderCRC24 {
+               return nil, false, fmt.Errorf("gocql: crc24 mismatch in frame 
header, computed: %d, got: %d", computedHeaderCRC24, readHeaderCRC24)
+       }
+
+       // Extract the payload length and self-contained flag
+       headerInt := binary.LittleEndian.Uint32(header[:4])
+       payloadLen := int(headerInt & 0x1FFFF)
+       isSelfContained := (headerInt & (1 << 17)) != 0
+
+       // Read the payload
+       payload := make([]byte, payloadLen)
+       if _, err := io.ReadFull(r, payload); err != nil {
+               return nil, false, fmt.Errorf("gocql: failed to read 
uncompressed frame payload, err: %w", err)
+       }
+
+       // Read and verify the payload CRC32
+       if _, err := io.ReadFull(r, header[:4]); err != nil {
+               return nil, false, fmt.Errorf("gocql: failed to read payload 
crc32, err: %w", err)
+       }
+
+       computedPayloadCRC32 := ChecksumIEEE(payload)
+       readPayloadCRC32 := binary.LittleEndian.Uint32(header[:4])
+       if computedPayloadCRC32 != readPayloadCRC32 {
+               return nil, false, fmt.Errorf("gocql: payload crc32 mismatch, 
computed: %d, got: %d", computedPayloadCRC32, readPayloadCRC32)
+       }
+
+       return payload, isSelfContained, nil
+}
+
+func newUncompressedFrame(payload []byte, isSelfContained bool) ([]byte, 
error) {
+       const (
+               headerSize       = 6
+               selfContainedBit = 1 << 17
+       )
+
+       payloadLen := len(payload)
+       if payloadLen > maxPayloadSize {
+               return nil, fmt.Errorf("payload length (%d) exceeds maximum 
size of 128 KiB", payloadLen)
+       }
+
+       // Create the frame
+       frameSize := headerSize + payloadLen + 4 // 4 bytes for CRC32
+       frame := make([]byte, frameSize)
+
+       // First 3 bytes: payload length and self-contained flag
+       headerInt := uint32(payloadLen) & 0x1FFFF
+       if isSelfContained {
+               headerInt |= selfContainedBit // Set the self-contained flag
+       }
+
+       // Encode the first 3 bytes as a single little-endian integer
+       frame[0] = byte(headerInt)
+       frame[1] = byte(headerInt >> 8)
+       frame[2] = byte(headerInt >> 16)
+
+       // Calculate CRC24 for the first 3 bytes of the header
+       crc := KoopmanChecksum(frame[:3])
+
+       // Encode CRC24 into the next 3 bytes of the header
+       frame[3] = byte(crc)
+       frame[4] = byte(crc >> 8)
+       frame[5] = byte(crc >> 16)
+
+       copy(frame[headerSize:], payload) // Copy the payload to the frame
+
+       // Calculate CRC32 for the payload
+       payloadCRC32 := ChecksumIEEE(payload)
+       binary.LittleEndian.PutUint32(frame[headerSize+payloadLen:], 
payloadCRC32)
+
+       return frame, nil
+}
+
+func newCompressedFrame(uncompressedPayload []byte, isSelfContained bool, 
compressor Compressor) ([]byte, error) {
+       uncompressedLen := len(uncompressedPayload)
+       if uncompressedLen > maxPayloadSize {
+               return nil, fmt.Errorf("uncompressed compressed payload length 
exceedes max size of frame payload %d/%d", uncompressedLen, maxPayloadSize)
+       }
+
+       compressedPayload, err := compressor.Encode(uncompressedPayload)
+       if err != nil {
+               return nil, err
+       }
+
+       // Skip the first 4 bytes because the size of the uncompressed payload 
is written in the frame header, not in the
+       // body of the compressed envelope
+       compressedPayload = compressedPayload[4:]
+
+       compressedLen := len(compressedPayload)
+
+       // Compression is not worth it
+       if uncompressedLen < compressedLen {
+               // native_protocol_v5.spec
+               // 2.2
+               //  An uncompressed length of 0 signals that the compressed 
payload
+               //  should be used as-is and not decompressed.
+               compressedPayload = uncompressedPayload
+               compressedLen = uncompressedLen
+               uncompressedLen = 0
+       }
+
+       // Combine compressed and uncompressed lengths and set the 
self-contained flag if needed
+       combined := uint64(compressedLen) | uint64(uncompressedLen)<<17
+       if isSelfContained {
+               combined |= 1 << 34
+       }
+
+       var headerBuf [8]byte
+
+       // Write the combined value into the header buffer
+       binary.LittleEndian.PutUint64(headerBuf[:], combined)
+
+       // Create a buffer with enough capacity to hold the header, compressed 
payload, and checksums
+       buf := bytes.NewBuffer(make([]byte, 0, 8+compressedLen+4))
+
+       // Write the first 5 bytes of the header (compressed and uncompressed 
sizes)
+       buf.Write(headerBuf[:5])
+
+       // Compute and write the CRC24 checksum of the first 5 bytes
+       headerChecksum := KoopmanChecksum(headerBuf[:5])

Review Comment:
   NIT: `headerChecksum := KoopmanChecksum(headerBuf[:headerSize])`



##########
frame.go:
##########
@@ -2070,3 +2133,247 @@ func (f *framer) writeBytesMap(m map[string][]byte) {
                f.writeBytes(v)
        }
 }
+
+func (f *framer) prepareModernLayout() error {
+       // Ensure protocol version is V5 or higher
+       if f.proto < protoVersion5 {
+               panic("Modern layout is not supported with version V4 or less")
+       }
+
+       selfContained := true
+
+       var (
+               adjustedBuf []byte
+               tempBuf     []byte
+               err         error
+       )
+
+       // Process the buffer in chunks if it exceeds the max payload size
+       for len(f.buf) > maxPayloadSize {
+               if f.compres != nil {
+                       tempBuf, err = 
newCompressedFrame(f.buf[:maxPayloadSize], false, f.compres)
+               } else {
+                       tempBuf, err = 
newUncompressedFrame(f.buf[:maxPayloadSize], false)
+               }
+               if err != nil {
+                       return err
+               }
+
+               adjustedBuf = append(adjustedBuf, tempBuf...)
+               f.buf = f.buf[maxPayloadSize:]
+               selfContained = false
+       }
+
+       // Process the remaining buffer
+       if f.compres != nil {
+               tempBuf, err = newCompressedFrame(f.buf, selfContained, 
f.compres)
+       } else {
+               tempBuf, err = newUncompressedFrame(f.buf, selfContained)
+       }
+       if err != nil {
+               return err
+       }
+
+       adjustedBuf = append(adjustedBuf, tempBuf...)
+       f.buf = adjustedBuf
+
+       return nil
+}
+
+func readUncompressedFrame(r io.Reader) ([]byte, bool, error) {
+       const headerSize = 6
+       header := [headerSize + 1]byte{}
+
+       // Read the frame header
+       if _, err := io.ReadFull(r, header[:headerSize]); err != nil {
+               return nil, false, fmt.Errorf("gocql: failed to read 
uncompressed frame, err: %w", err)
+       }
+
+       // Compute and verify the header CRC24
+       computedHeaderCRC24 := KoopmanChecksum(header[:3])
+       readHeaderCRC24 := binary.LittleEndian.Uint32(header[3:]) & 0xFFFFFF
+       if computedHeaderCRC24 != readHeaderCRC24 {
+               return nil, false, fmt.Errorf("gocql: crc24 mismatch in frame 
header, computed: %d, got: %d", computedHeaderCRC24, readHeaderCRC24)
+       }
+
+       // Extract the payload length and self-contained flag
+       headerInt := binary.LittleEndian.Uint32(header[:4])
+       payloadLen := int(headerInt & 0x1FFFF)
+       isSelfContained := (headerInt & (1 << 17)) != 0
+
+       // Read the payload
+       payload := make([]byte, payloadLen)
+       if _, err := io.ReadFull(r, payload); err != nil {
+               return nil, false, fmt.Errorf("gocql: failed to read 
uncompressed frame payload, err: %w", err)
+       }
+
+       // Read and verify the payload CRC32
+       if _, err := io.ReadFull(r, header[:4]); err != nil {
+               return nil, false, fmt.Errorf("gocql: failed to read payload 
crc32, err: %w", err)
+       }
+
+       computedPayloadCRC32 := ChecksumIEEE(payload)
+       readPayloadCRC32 := binary.LittleEndian.Uint32(header[:4])
+       if computedPayloadCRC32 != readPayloadCRC32 {
+               return nil, false, fmt.Errorf("gocql: payload crc32 mismatch, 
computed: %d, got: %d", computedPayloadCRC32, readPayloadCRC32)
+       }
+
+       return payload, isSelfContained, nil
+}
+
+func newUncompressedFrame(payload []byte, isSelfContained bool) ([]byte, 
error) {
+       const (
+               headerSize       = 6
+               selfContainedBit = 1 << 17
+       )
+
+       payloadLen := len(payload)
+       if payloadLen > maxPayloadSize {
+               return nil, fmt.Errorf("payload length (%d) exceeds maximum 
size of 128 KiB", payloadLen)
+       }
+
+       // Create the frame
+       frameSize := headerSize + payloadLen + 4 // 4 bytes for CRC32
+       frame := make([]byte, frameSize)
+
+       // First 3 bytes: payload length and self-contained flag
+       headerInt := uint32(payloadLen) & 0x1FFFF
+       if isSelfContained {
+               headerInt |= selfContainedBit // Set the self-contained flag
+       }
+
+       // Encode the first 3 bytes as a single little-endian integer
+       frame[0] = byte(headerInt)
+       frame[1] = byte(headerInt >> 8)
+       frame[2] = byte(headerInt >> 16)
+
+       // Calculate CRC24 for the first 3 bytes of the header
+       crc := KoopmanChecksum(frame[:3])
+
+       // Encode CRC24 into the next 3 bytes of the header
+       frame[3] = byte(crc)
+       frame[4] = byte(crc >> 8)
+       frame[5] = byte(crc >> 16)
+
+       copy(frame[headerSize:], payload) // Copy the payload to the frame
+
+       // Calculate CRC32 for the payload
+       payloadCRC32 := ChecksumIEEE(payload)
+       binary.LittleEndian.PutUint32(frame[headerSize+payloadLen:], 
payloadCRC32)
+
+       return frame, nil
+}
+
+func newCompressedFrame(uncompressedPayload []byte, isSelfContained bool, 
compressor Compressor) ([]byte, error) {
+       uncompressedLen := len(uncompressedPayload)
+       if uncompressedLen > maxPayloadSize {
+               return nil, fmt.Errorf("uncompressed compressed payload length 
exceedes max size of frame payload %d/%d", uncompressedLen, maxPayloadSize)
+       }
+
+       compressedPayload, err := compressor.Encode(uncompressedPayload)
+       if err != nil {
+               return nil, err
+       }
+
+       // Skip the first 4 bytes because the size of the uncompressed payload 
is written in the frame header, not in the
+       // body of the compressed envelope
+       compressedPayload = compressedPayload[4:]
+
+       compressedLen := len(compressedPayload)
+
+       // Compression is not worth it
+       if uncompressedLen < compressedLen {
+               // native_protocol_v5.spec
+               // 2.2
+               //  An uncompressed length of 0 signals that the compressed 
payload
+               //  should be used as-is and not decompressed.
+               compressedPayload = uncompressedPayload
+               compressedLen = uncompressedLen
+               uncompressedLen = 0
+       }
+
+       // Combine compressed and uncompressed lengths and set the 
self-contained flag if needed
+       combined := uint64(compressedLen) | uint64(uncompressedLen)<<17
+       if isSelfContained {
+               combined |= 1 << 34
+       }
+
+       var headerBuf [8]byte

Review Comment:
   NIT: `var headerBuf [headerSize + crc24Size]byte`



##########
conn.go:
##########
@@ -1756,6 +1888,41 @@ func (c *Conn) awaitSchemaAgreement(ctx context.Context) 
(err error) {
        return fmt.Errorf("gocql: cluster schema versions not consistent: %+v", 
schemas)
 }
 
+// recvLastsFrames reads proto v5 frames from Conn.r and writes decoded 
payload to dst.
+// It reads data until the bytesToRead is reached.
+// If Conn.compressor is not nil, it processes Compressed Format frames.
+func (c *Conn) recvLastsFrames(dst *bytes.Buffer, bytesToRead int) error {

Review Comment:
   NIT: also can we move this function so it is located right after 
`recvProtoV5Frame`? It would help with code navigation



##########
frame.go:
##########
@@ -2070,3 +2133,247 @@ func (f *framer) writeBytesMap(m map[string][]byte) {
                f.writeBytes(v)
        }
 }
+
+func (f *framer) prepareModernLayout() error {
+       // Ensure protocol version is V5 or higher
+       if f.proto < protoVersion5 {
+               panic("Modern layout is not supported with version V4 or less")
+       }
+
+       selfContained := true
+
+       var (
+               adjustedBuf []byte
+               tempBuf     []byte
+               err         error
+       )
+
+       // Process the buffer in chunks if it exceeds the max payload size
+       for len(f.buf) > maxPayloadSize {
+               if f.compres != nil {
+                       tempBuf, err = 
newCompressedFrame(f.buf[:maxPayloadSize], false, f.compres)
+               } else {
+                       tempBuf, err = 
newUncompressedFrame(f.buf[:maxPayloadSize], false)
+               }
+               if err != nil {
+                       return err
+               }
+
+               adjustedBuf = append(adjustedBuf, tempBuf...)
+               f.buf = f.buf[maxPayloadSize:]
+               selfContained = false
+       }
+
+       // Process the remaining buffer
+       if f.compres != nil {
+               tempBuf, err = newCompressedFrame(f.buf, selfContained, 
f.compres)
+       } else {
+               tempBuf, err = newUncompressedFrame(f.buf, selfContained)
+       }
+       if err != nil {
+               return err
+       }
+
+       adjustedBuf = append(adjustedBuf, tempBuf...)
+       f.buf = adjustedBuf
+
+       return nil
+}
+
+func readUncompressedFrame(r io.Reader) ([]byte, bool, error) {
+       const headerSize = 6
+       header := [headerSize + 1]byte{}
+
+       // Read the frame header
+       if _, err := io.ReadFull(r, header[:headerSize]); err != nil {
+               return nil, false, fmt.Errorf("gocql: failed to read 
uncompressed frame, err: %w", err)
+       }
+
+       // Compute and verify the header CRC24
+       computedHeaderCRC24 := KoopmanChecksum(header[:3])
+       readHeaderCRC24 := binary.LittleEndian.Uint32(header[3:]) & 0xFFFFFF
+       if computedHeaderCRC24 != readHeaderCRC24 {
+               return nil, false, fmt.Errorf("gocql: crc24 mismatch in frame 
header, computed: %d, got: %d", computedHeaderCRC24, readHeaderCRC24)
+       }
+
+       // Extract the payload length and self-contained flag
+       headerInt := binary.LittleEndian.Uint32(header[:4])
+       payloadLen := int(headerInt & 0x1FFFF)
+       isSelfContained := (headerInt & (1 << 17)) != 0
+
+       // Read the payload
+       payload := make([]byte, payloadLen)
+       if _, err := io.ReadFull(r, payload); err != nil {
+               return nil, false, fmt.Errorf("gocql: failed to read 
uncompressed frame payload, err: %w", err)
+       }
+
+       // Read and verify the payload CRC32
+       if _, err := io.ReadFull(r, header[:4]); err != nil {
+               return nil, false, fmt.Errorf("gocql: failed to read payload 
crc32, err: %w", err)
+       }
+
+       computedPayloadCRC32 := ChecksumIEEE(payload)
+       readPayloadCRC32 := binary.LittleEndian.Uint32(header[:4])
+       if computedPayloadCRC32 != readPayloadCRC32 {
+               return nil, false, fmt.Errorf("gocql: payload crc32 mismatch, 
computed: %d, got: %d", computedPayloadCRC32, readPayloadCRC32)
+       }
+
+       return payload, isSelfContained, nil
+}
+
+func newUncompressedFrame(payload []byte, isSelfContained bool) ([]byte, 
error) {
+       const (
+               headerSize       = 6
+               selfContainedBit = 1 << 17
+       )
+
+       payloadLen := len(payload)
+       if payloadLen > maxPayloadSize {
+               return nil, fmt.Errorf("payload length (%d) exceeds maximum 
size of 128 KiB", payloadLen)
+       }
+
+       // Create the frame
+       frameSize := headerSize + payloadLen + 4 // 4 bytes for CRC32
+       frame := make([]byte, frameSize)
+
+       // First 3 bytes: payload length and self-contained flag
+       headerInt := uint32(payloadLen) & 0x1FFFF
+       if isSelfContained {
+               headerInt |= selfContainedBit // Set the self-contained flag
+       }
+
+       // Encode the first 3 bytes as a single little-endian integer
+       frame[0] = byte(headerInt)
+       frame[1] = byte(headerInt >> 8)
+       frame[2] = byte(headerInt >> 16)
+
+       // Calculate CRC24 for the first 3 bytes of the header
+       crc := KoopmanChecksum(frame[:3])
+
+       // Encode CRC24 into the next 3 bytes of the header
+       frame[3] = byte(crc)
+       frame[4] = byte(crc >> 8)
+       frame[5] = byte(crc >> 16)
+
+       copy(frame[headerSize:], payload) // Copy the payload to the frame
+
+       // Calculate CRC32 for the payload
+       payloadCRC32 := ChecksumIEEE(payload)
+       binary.LittleEndian.PutUint32(frame[headerSize+payloadLen:], 
payloadCRC32)
+
+       return frame, nil
+}
+
+func newCompressedFrame(uncompressedPayload []byte, isSelfContained bool, 
compressor Compressor) ([]byte, error) {
+       uncompressedLen := len(uncompressedPayload)
+       if uncompressedLen > maxPayloadSize {
+               return nil, fmt.Errorf("uncompressed compressed payload length 
exceedes max size of frame payload %d/%d", uncompressedLen, maxPayloadSize)
+       }
+
+       compressedPayload, err := compressor.Encode(uncompressedPayload)
+       if err != nil {
+               return nil, err
+       }
+
+       // Skip the first 4 bytes because the size of the uncompressed payload 
is written in the frame header, not in the
+       // body of the compressed envelope
+       compressedPayload = compressedPayload[4:]
+
+       compressedLen := len(compressedPayload)
+
+       // Compression is not worth it
+       if uncompressedLen < compressedLen {
+               // native_protocol_v5.spec
+               // 2.2
+               //  An uncompressed length of 0 signals that the compressed 
payload
+               //  should be used as-is and not decompressed.
+               compressedPayload = uncompressedPayload
+               compressedLen = uncompressedLen
+               uncompressedLen = 0
+       }
+
+       // Combine compressed and uncompressed lengths and set the 
self-contained flag if needed
+       combined := uint64(compressedLen) | uint64(uncompressedLen)<<17
+       if isSelfContained {
+               combined |= 1 << 34
+       }
+
+       var headerBuf [8]byte
+
+       // Write the combined value into the header buffer
+       binary.LittleEndian.PutUint64(headerBuf[:], combined)
+
+       // Create a buffer with enough capacity to hold the header, compressed 
payload, and checksums
+       buf := bytes.NewBuffer(make([]byte, 0, 8+compressedLen+4))

Review Comment:
   NIT:  `buf := bytes.NewBuffer(make([]byte, 0, headerSize + crc24Size + 
compressedLen + crc32Size))`



##########
frame.go:
##########
@@ -2070,3 +2133,247 @@ func (f *framer) writeBytesMap(m map[string][]byte) {
                f.writeBytes(v)
        }
 }
+
+func (f *framer) prepareModernLayout() error {
+       // Ensure protocol version is V5 or higher
+       if f.proto < protoVersion5 {
+               panic("Modern layout is not supported with version V4 or less")
+       }
+
+       selfContained := true
+
+       var (
+               adjustedBuf []byte
+               tempBuf     []byte
+               err         error
+       )
+
+       // Process the buffer in chunks if it exceeds the max payload size
+       for len(f.buf) > maxPayloadSize {
+               if f.compres != nil {
+                       tempBuf, err = 
newCompressedFrame(f.buf[:maxPayloadSize], false, f.compres)
+               } else {
+                       tempBuf, err = 
newUncompressedFrame(f.buf[:maxPayloadSize], false)
+               }
+               if err != nil {
+                       return err
+               }
+
+               adjustedBuf = append(adjustedBuf, tempBuf...)

Review Comment:
   I have some concerns about performance not being the best when compared to 
v4 but I don't think it will be bad enough to justify making it a blocker for 
this PR. It would be nice to get some benchmark data comparing the v4 and v5 
implementations for the same workloads though.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to