joao-r-reis commented on code in PR #1822:
URL:
https://github.com/apache/cassandra-gocql-driver/pull/1822#discussion_r1806337865
##########
conn.go:
##########
@@ -1756,6 +1888,41 @@ func (c *Conn) awaitSchemaAgreement(ctx context.Context)
(err error) {
return fmt.Errorf("gocql: cluster schema versions not consistent: %+v",
schemas)
}
+// recvLastsFrames reads proto v5 frames from Conn.r and writes decoded
payload to dst.
+// It reads data until the bytesToRead is reached.
+// If Conn.compressor is not nil, it processes Compressed Format frames.
+func (c *Conn) recvLastsFrames(dst *bytes.Buffer, bytesToRead int) error {
+ var read int
+ var segment []byte
+ var err error
+ for read != bytesToRead {
+ // Read frame based on compression
+ if c.compressor != nil {
+ segment, _, err = readCompressedFrame(c.r, c.compressor)
+ } else {
+ segment, _, err = readUncompressedFrame(c.r)
+ }
+ if err != nil {
+ return fmt.Errorf("gocql: failed to read non
self-contained frame: %w", err)
+ }
+
+ // Write the segment to the destination writer
+ n, _ := dst.Write(segment)
+ read += n
Review Comment:
We're assuming here that this segment contains a single partial envelope
since we're copying the entire payload into the buffer without any checks. I
think this is a correct assumption but I still think it wouldn't hurt to check
that we're not adding more bytes to the buffer than what we expect. If this
check fails we can return a custom protocol error so `go.serve` logs and closes
the connection.
##########
conn.go:
##########
@@ -1756,6 +1888,41 @@ func (c *Conn) awaitSchemaAgreement(ctx context.Context)
(err error) {
return fmt.Errorf("gocql: cluster schema versions not consistent: %+v",
schemas)
}
+// recvLastsFrames reads proto v5 frames from Conn.r and writes decoded
payload to dst.
+// It reads data until the bytesToRead is reached.
+// If Conn.compressor is not nil, it processes Compressed Format frames.
+func (c *Conn) recvLastsFrames(dst *bytes.Buffer, bytesToRead int) error {
+ var read int
+ var segment []byte
+ var err error
+ for read != bytesToRead {
+ // Read frame based on compression
+ if c.compressor != nil {
+ segment, _, err = readCompressedFrame(c.r, c.compressor)
Review Comment:
NIT: rename `segment` to `payload` or `segmentPayload` or even `frame` to
clarify that this byte slice only contains the payload (frame in old notation /
envelope in new notation) and not necessarily the entire segment
##########
frame.go:
##########
@@ -2070,3 +2133,247 @@ func (f *framer) writeBytesMap(m map[string][]byte) {
f.writeBytes(v)
}
}
+
+func (f *framer) prepareModernLayout() error {
+ // Ensure protocol version is V5 or higher
+ if f.proto < protoVersion5 {
+ panic("Modern layout is not supported with version V4 or less")
+ }
+
+ selfContained := true
+
+ var (
+ adjustedBuf []byte
+ tempBuf []byte
+ err error
+ )
+
+ // Process the buffer in chunks if it exceeds the max payload size
+ for len(f.buf) > maxPayloadSize {
+ if f.compres != nil {
+ tempBuf, err =
newCompressedFrame(f.buf[:maxPayloadSize], false, f.compres)
+ } else {
+ tempBuf, err =
newUncompressedFrame(f.buf[:maxPayloadSize], false)
+ }
+ if err != nil {
+ return err
+ }
+
+ adjustedBuf = append(adjustedBuf, tempBuf...)
+ f.buf = f.buf[maxPayloadSize:]
+ selfContained = false
+ }
+
+ // Process the remaining buffer
+ if f.compres != nil {
+ tempBuf, err = newCompressedFrame(f.buf, selfContained,
f.compres)
+ } else {
+ tempBuf, err = newUncompressedFrame(f.buf, selfContained)
+ }
+ if err != nil {
+ return err
+ }
+
+ adjustedBuf = append(adjustedBuf, tempBuf...)
+ f.buf = adjustedBuf
+
+ return nil
+}
+
+func readUncompressedFrame(r io.Reader) ([]byte, bool, error) {
+ const headerSize = 6
+ header := [headerSize + 1]byte{}
+
+ // Read the frame header
+ if _, err := io.ReadFull(r, header[:headerSize]); err != nil {
+ return nil, false, fmt.Errorf("gocql: failed to read
uncompressed frame, err: %w", err)
+ }
+
+ // Compute and verify the header CRC24
+ computedHeaderCRC24 := KoopmanChecksum(header[:3])
+ readHeaderCRC24 := binary.LittleEndian.Uint32(header[3:]) & 0xFFFFFF
+ if computedHeaderCRC24 != readHeaderCRC24 {
+ return nil, false, fmt.Errorf("gocql: crc24 mismatch in frame
header, computed: %d, got: %d", computedHeaderCRC24, readHeaderCRC24)
+ }
+
+ // Extract the payload length and self-contained flag
+ headerInt := binary.LittleEndian.Uint32(header[:4])
+ payloadLen := int(headerInt & 0x1FFFF)
+ isSelfContained := (headerInt & (1 << 17)) != 0
+
+ // Read the payload
+ payload := make([]byte, payloadLen)
+ if _, err := io.ReadFull(r, payload); err != nil {
+ return nil, false, fmt.Errorf("gocql: failed to read
uncompressed frame payload, err: %w", err)
+ }
+
+ // Read and verify the payload CRC32
+ if _, err := io.ReadFull(r, header[:4]); err != nil {
+ return nil, false, fmt.Errorf("gocql: failed to read payload
crc32, err: %w", err)
+ }
+
+ computedPayloadCRC32 := ChecksumIEEE(payload)
+ readPayloadCRC32 := binary.LittleEndian.Uint32(header[:4])
+ if computedPayloadCRC32 != readPayloadCRC32 {
+ return nil, false, fmt.Errorf("gocql: payload crc32 mismatch,
computed: %d, got: %d", computedPayloadCRC32, readPayloadCRC32)
+ }
+
+ return payload, isSelfContained, nil
+}
+
+func newUncompressedFrame(payload []byte, isSelfContained bool) ([]byte,
error) {
Review Comment:
rename to `newUncompressedSegment`
##########
frame.go:
##########
@@ -2070,3 +2133,247 @@ func (f *framer) writeBytesMap(m map[string][]byte) {
f.writeBytes(v)
}
}
+
+func (f *framer) prepareModernLayout() error {
+ // Ensure protocol version is V5 or higher
+ if f.proto < protoVersion5 {
+ panic("Modern layout is not supported with version V4 or less")
+ }
+
+ selfContained := true
+
+ var (
+ adjustedBuf []byte
+ tempBuf []byte
+ err error
+ )
+
+ // Process the buffer in chunks if it exceeds the max payload size
+ for len(f.buf) > maxPayloadSize {
+ if f.compres != nil {
+ tempBuf, err =
newCompressedFrame(f.buf[:maxPayloadSize], false, f.compres)
+ } else {
+ tempBuf, err =
newUncompressedFrame(f.buf[:maxPayloadSize], false)
+ }
+ if err != nil {
+ return err
+ }
+
+ adjustedBuf = append(adjustedBuf, tempBuf...)
+ f.buf = f.buf[maxPayloadSize:]
+ selfContained = false
+ }
+
+ // Process the remaining buffer
+ if f.compres != nil {
+ tempBuf, err = newCompressedFrame(f.buf, selfContained,
f.compres)
+ } else {
+ tempBuf, err = newUncompressedFrame(f.buf, selfContained)
+ }
+ if err != nil {
+ return err
+ }
+
+ adjustedBuf = append(adjustedBuf, tempBuf...)
+ f.buf = adjustedBuf
+
+ return nil
+}
+
+func readUncompressedFrame(r io.Reader) ([]byte, bool, error) {
+ const headerSize = 6
+ header := [headerSize + 1]byte{}
+
+ // Read the frame header
+ if _, err := io.ReadFull(r, header[:headerSize]); err != nil {
+ return nil, false, fmt.Errorf("gocql: failed to read
uncompressed frame, err: %w", err)
+ }
+
+ // Compute and verify the header CRC24
+ computedHeaderCRC24 := KoopmanChecksum(header[:3])
+ readHeaderCRC24 := binary.LittleEndian.Uint32(header[3:]) & 0xFFFFFF
+ if computedHeaderCRC24 != readHeaderCRC24 {
+ return nil, false, fmt.Errorf("gocql: crc24 mismatch in frame
header, computed: %d, got: %d", computedHeaderCRC24, readHeaderCRC24)
+ }
+
+ // Extract the payload length and self-contained flag
+ headerInt := binary.LittleEndian.Uint32(header[:4])
+ payloadLen := int(headerInt & 0x1FFFF)
+ isSelfContained := (headerInt & (1 << 17)) != 0
+
+ // Read the payload
+ payload := make([]byte, payloadLen)
+ if _, err := io.ReadFull(r, payload); err != nil {
+ return nil, false, fmt.Errorf("gocql: failed to read
uncompressed frame payload, err: %w", err)
+ }
+
+ // Read and verify the payload CRC32
+ if _, err := io.ReadFull(r, header[:4]); err != nil {
+ return nil, false, fmt.Errorf("gocql: failed to read payload
crc32, err: %w", err)
+ }
+
+ computedPayloadCRC32 := ChecksumIEEE(payload)
+ readPayloadCRC32 := binary.LittleEndian.Uint32(header[:4])
+ if computedPayloadCRC32 != readPayloadCRC32 {
+ return nil, false, fmt.Errorf("gocql: payload crc32 mismatch,
computed: %d, got: %d", computedPayloadCRC32, readPayloadCRC32)
+ }
+
+ return payload, isSelfContained, nil
+}
+
+func newUncompressedFrame(payload []byte, isSelfContained bool) ([]byte,
error) {
+ const (
+ headerSize = 6
+ selfContainedBit = 1 << 17
+ )
+
+ payloadLen := len(payload)
+ if payloadLen > maxPayloadSize {
+ return nil, fmt.Errorf("payload length (%d) exceeds maximum
size of 128 KiB", payloadLen)
+ }
+
+ // Create the frame
+ frameSize := headerSize + payloadLen + 4 // 4 bytes for CRC32
+ frame := make([]byte, frameSize)
+
+ // First 3 bytes: payload length and self-contained flag
+ headerInt := uint32(payloadLen) & 0x1FFFF
+ if isSelfContained {
+ headerInt |= selfContainedBit // Set the self-contained flag
+ }
+
+ // Encode the first 3 bytes as a single little-endian integer
+ frame[0] = byte(headerInt)
+ frame[1] = byte(headerInt >> 8)
+ frame[2] = byte(headerInt >> 16)
+
+ // Calculate CRC24 for the first 3 bytes of the header
+ crc := KoopmanChecksum(frame[:3])
+
+ // Encode CRC24 into the next 3 bytes of the header
+ frame[3] = byte(crc)
+ frame[4] = byte(crc >> 8)
+ frame[5] = byte(crc >> 16)
+
+ copy(frame[headerSize:], payload) // Copy the payload to the frame
+
+ // Calculate CRC32 for the payload
+ payloadCRC32 := ChecksumIEEE(payload)
+ binary.LittleEndian.PutUint32(frame[headerSize+payloadLen:],
payloadCRC32)
+
+ return frame, nil
+}
+
+func newCompressedFrame(uncompressedPayload []byte, isSelfContained bool,
compressor Compressor) ([]byte, error) {
Review Comment:
rename to `newCompressedSegment`
##########
frame.go:
##########
@@ -2070,3 +2133,247 @@ func (f *framer) writeBytesMap(m map[string][]byte) {
f.writeBytes(v)
}
}
+
+func (f *framer) prepareModernLayout() error {
+ // Ensure protocol version is V5 or higher
+ if f.proto < protoVersion5 {
+ panic("Modern layout is not supported with version V4 or less")
+ }
+
+ selfContained := true
+
+ var (
+ adjustedBuf []byte
+ tempBuf []byte
+ err error
+ )
+
+ // Process the buffer in chunks if it exceeds the max payload size
+ for len(f.buf) > maxPayloadSize {
+ if f.compres != nil {
+ tempBuf, err =
newCompressedFrame(f.buf[:maxPayloadSize], false, f.compres)
+ } else {
+ tempBuf, err =
newUncompressedFrame(f.buf[:maxPayloadSize], false)
+ }
+ if err != nil {
+ return err
+ }
+
+ adjustedBuf = append(adjustedBuf, tempBuf...)
+ f.buf = f.buf[maxPayloadSize:]
+ selfContained = false
+ }
+
+ // Process the remaining buffer
+ if f.compres != nil {
+ tempBuf, err = newCompressedFrame(f.buf, selfContained,
f.compres)
+ } else {
+ tempBuf, err = newUncompressedFrame(f.buf, selfContained)
+ }
+ if err != nil {
+ return err
+ }
+
+ adjustedBuf = append(adjustedBuf, tempBuf...)
+ f.buf = adjustedBuf
+
+ return nil
+}
+
+func readUncompressedFrame(r io.Reader) ([]byte, bool, error) {
+ const headerSize = 6
+ header := [headerSize + 1]byte{}
+
+ // Read the frame header
+ if _, err := io.ReadFull(r, header[:headerSize]); err != nil {
+ return nil, false, fmt.Errorf("gocql: failed to read
uncompressed frame, err: %w", err)
+ }
+
+ // Compute and verify the header CRC24
+ computedHeaderCRC24 := KoopmanChecksum(header[:3])
+ readHeaderCRC24 := binary.LittleEndian.Uint32(header[3:]) & 0xFFFFFF
+ if computedHeaderCRC24 != readHeaderCRC24 {
+ return nil, false, fmt.Errorf("gocql: crc24 mismatch in frame
header, computed: %d, got: %d", computedHeaderCRC24, readHeaderCRC24)
+ }
+
+ // Extract the payload length and self-contained flag
+ headerInt := binary.LittleEndian.Uint32(header[:4])
Review Comment:
Header size is only 3 bytes but we're passing a 4 byte slice which is
confusing.
`headerInt := uint32(header[0]) | uint32(header[1])<<8 |
uint32(header[2])<<16`
##########
frame.go:
##########
@@ -2070,3 +2133,247 @@ func (f *framer) writeBytesMap(m map[string][]byte) {
f.writeBytes(v)
}
}
+
+func (f *framer) prepareModernLayout() error {
+ // Ensure protocol version is V5 or higher
+ if f.proto < protoVersion5 {
+ panic("Modern layout is not supported with version V4 or less")
+ }
+
+ selfContained := true
+
+ var (
+ adjustedBuf []byte
+ tempBuf []byte
+ err error
+ )
+
+ // Process the buffer in chunks if it exceeds the max payload size
+ for len(f.buf) > maxPayloadSize {
+ if f.compres != nil {
+ tempBuf, err =
newCompressedFrame(f.buf[:maxPayloadSize], false, f.compres)
+ } else {
+ tempBuf, err =
newUncompressedFrame(f.buf[:maxPayloadSize], false)
+ }
+ if err != nil {
+ return err
+ }
+
+ adjustedBuf = append(adjustedBuf, tempBuf...)
+ f.buf = f.buf[maxPayloadSize:]
+ selfContained = false
+ }
+
+ // Process the remaining buffer
+ if f.compres != nil {
+ tempBuf, err = newCompressedFrame(f.buf, selfContained,
f.compres)
+ } else {
+ tempBuf, err = newUncompressedFrame(f.buf, selfContained)
+ }
+ if err != nil {
+ return err
+ }
+
+ adjustedBuf = append(adjustedBuf, tempBuf...)
+ f.buf = adjustedBuf
+
+ return nil
+}
+
+func readUncompressedFrame(r io.Reader) ([]byte, bool, error) {
+ const headerSize = 6
+ header := [headerSize + 1]byte{}
+
+ // Read the frame header
+ if _, err := io.ReadFull(r, header[:headerSize]); err != nil {
+ return nil, false, fmt.Errorf("gocql: failed to read
uncompressed frame, err: %w", err)
+ }
+
+ // Compute and verify the header CRC24
+ computedHeaderCRC24 := KoopmanChecksum(header[:3])
Review Comment:
NIT: `KoopmanChecksum(header[:headerSize])`
##########
frame.go:
##########
@@ -2070,3 +2133,247 @@ func (f *framer) writeBytesMap(m map[string][]byte) {
f.writeBytes(v)
}
}
+
+func (f *framer) prepareModernLayout() error {
+ // Ensure protocol version is V5 or higher
+ if f.proto < protoVersion5 {
+ panic("Modern layout is not supported with version V4 or less")
+ }
+
+ selfContained := true
+
+ var (
+ adjustedBuf []byte
+ tempBuf []byte
+ err error
+ )
+
+ // Process the buffer in chunks if it exceeds the max payload size
+ for len(f.buf) > maxPayloadSize {
+ if f.compres != nil {
+ tempBuf, err =
newCompressedFrame(f.buf[:maxPayloadSize], false, f.compres)
+ } else {
+ tempBuf, err =
newUncompressedFrame(f.buf[:maxPayloadSize], false)
+ }
+ if err != nil {
+ return err
+ }
+
+ adjustedBuf = append(adjustedBuf, tempBuf...)
+ f.buf = f.buf[maxPayloadSize:]
+ selfContained = false
+ }
+
+ // Process the remaining buffer
+ if f.compres != nil {
+ tempBuf, err = newCompressedFrame(f.buf, selfContained,
f.compres)
+ } else {
+ tempBuf, err = newUncompressedFrame(f.buf, selfContained)
+ }
+ if err != nil {
+ return err
+ }
+
+ adjustedBuf = append(adjustedBuf, tempBuf...)
+ f.buf = adjustedBuf
+
+ return nil
+}
+
+func readUncompressedFrame(r io.Reader) ([]byte, bool, error) {
+ const headerSize = 6
+ header := [headerSize + 1]byte{}
+
+ // Read the frame header
+ if _, err := io.ReadFull(r, header[:headerSize]); err != nil {
+ return nil, false, fmt.Errorf("gocql: failed to read
uncompressed frame, err: %w", err)
+ }
+
+ // Compute and verify the header CRC24
+ computedHeaderCRC24 := KoopmanChecksum(header[:3])
+ readHeaderCRC24 := binary.LittleEndian.Uint32(header[3:]) & 0xFFFFFF
+ if computedHeaderCRC24 != readHeaderCRC24 {
+ return nil, false, fmt.Errorf("gocql: crc24 mismatch in frame
header, computed: %d, got: %d", computedHeaderCRC24, readHeaderCRC24)
+ }
+
+ // Extract the payload length and self-contained flag
+ headerInt := binary.LittleEndian.Uint32(header[:4])
+ payloadLen := int(headerInt & 0x1FFFF)
+ isSelfContained := (headerInt & (1 << 17)) != 0
+
+ // Read the payload
+ payload := make([]byte, payloadLen)
+ if _, err := io.ReadFull(r, payload); err != nil {
+ return nil, false, fmt.Errorf("gocql: failed to read
uncompressed frame payload, err: %w", err)
+ }
+
+ // Read and verify the payload CRC32
+ if _, err := io.ReadFull(r, header[:4]); err != nil {
+ return nil, false, fmt.Errorf("gocql: failed to read payload
crc32, err: %w", err)
+ }
+
+ computedPayloadCRC32 := ChecksumIEEE(payload)
+ readPayloadCRC32 := binary.LittleEndian.Uint32(header[:4])
+ if computedPayloadCRC32 != readPayloadCRC32 {
+ return nil, false, fmt.Errorf("gocql: payload crc32 mismatch,
computed: %d, got: %d", computedPayloadCRC32, readPayloadCRC32)
+ }
+
+ return payload, isSelfContained, nil
+}
+
+func newUncompressedFrame(payload []byte, isSelfContained bool) ([]byte,
error) {
+ const (
+ headerSize = 6
+ selfContainedBit = 1 << 17
+ )
+
+ payloadLen := len(payload)
+ if payloadLen > maxPayloadSize {
+ return nil, fmt.Errorf("payload length (%d) exceeds maximum
size of 128 KiB", payloadLen)
+ }
+
+ // Create the frame
+ frameSize := headerSize + payloadLen + 4 // 4 bytes for CRC32
+ frame := make([]byte, frameSize)
+
+ // First 3 bytes: payload length and self-contained flag
+ headerInt := uint32(payloadLen) & 0x1FFFF
+ if isSelfContained {
+ headerInt |= selfContainedBit // Set the self-contained flag
+ }
+
+ // Encode the first 3 bytes as a single little-endian integer
+ frame[0] = byte(headerInt)
+ frame[1] = byte(headerInt >> 8)
+ frame[2] = byte(headerInt >> 16)
+
+ // Calculate CRC24 for the first 3 bytes of the header
+ crc := KoopmanChecksum(frame[:3])
+
+ // Encode CRC24 into the next 3 bytes of the header
+ frame[3] = byte(crc)
+ frame[4] = byte(crc >> 8)
+ frame[5] = byte(crc >> 16)
+
+ copy(frame[headerSize:], payload) // Copy the payload to the frame
+
+ // Calculate CRC32 for the payload
+ payloadCRC32 := ChecksumIEEE(payload)
+ binary.LittleEndian.PutUint32(frame[headerSize+payloadLen:],
payloadCRC32)
+
+ return frame, nil
+}
+
+func newCompressedFrame(uncompressedPayload []byte, isSelfContained bool,
compressor Compressor) ([]byte, error) {
+ uncompressedLen := len(uncompressedPayload)
+ if uncompressedLen > maxPayloadSize {
+ return nil, fmt.Errorf("uncompressed compressed payload length
exceedes max size of frame payload %d/%d", uncompressedLen, maxPayloadSize)
+ }
+
+ compressedPayload, err := compressor.Encode(uncompressedPayload)
+ if err != nil {
+ return nil, err
+ }
+
+ // Skip the first 4 bytes because the size of the uncompressed payload
is written in the frame header, not in the
+ // body of the compressed envelope
+ compressedPayload = compressedPayload[4:]
+
+ compressedLen := len(compressedPayload)
+
+ // Compression is not worth it
+ if uncompressedLen < compressedLen {
+ // native_protocol_v5.spec
+ // 2.2
+ // An uncompressed length of 0 signals that the compressed
payload
+ // should be used as-is and not decompressed.
+ compressedPayload = uncompressedPayload
+ compressedLen = uncompressedLen
+ uncompressedLen = 0
+ }
+
+ // Combine compressed and uncompressed lengths and set the
self-contained flag if needed
+ combined := uint64(compressedLen) | uint64(uncompressedLen)<<17
+ if isSelfContained {
+ combined |= 1 << 34
+ }
+
+ var headerBuf [8]byte
+
+ // Write the combined value into the header buffer
+ binary.LittleEndian.PutUint64(headerBuf[:], combined)
+
+ // Create a buffer with enough capacity to hold the header, compressed
payload, and checksums
+ buf := bytes.NewBuffer(make([]byte, 0, 8+compressedLen+4))
+
+ // Write the first 5 bytes of the header (compressed and uncompressed
sizes)
+ buf.Write(headerBuf[:5])
+
+ // Compute and write the CRC24 checksum of the first 5 bytes
+ headerChecksum := KoopmanChecksum(headerBuf[:5])
+ binary.LittleEndian.PutUint32(headerBuf[:], headerChecksum)
+ buf.Write(headerBuf[:3])
+ buf.Write(compressedPayload)
+
+ // Compute and write the CRC32 checksum of the payload
+ payloadChecksum := ChecksumIEEE(compressedPayload)
+ binary.LittleEndian.PutUint32(headerBuf[:], payloadChecksum)
+ buf.Write(headerBuf[:4])
+
+ return buf.Bytes(), nil
+}
+
+func readCompressedFrame(r io.Reader, compressor Compressor) ([]byte, bool,
error) {
+ var (
+ headerBuf [8]byte
+ err error
+ )
+
+ if _, err = io.ReadFull(r, headerBuf[:]); err != nil {
+ return nil, false, err
+ }
+
+ // Reading checksum from frame header
+ readHeaderChecksum := uint32(headerBuf[5]) | uint32(headerBuf[6])<<8 |
uint32(headerBuf[7])<<16
+ if computedHeaderChecksum := KoopmanChecksum(headerBuf[:5]);
computedHeaderChecksum != readHeaderChecksum {
+ return nil, false, fmt.Errorf("gocql: crc24 mismatch in frame
header, read: %d, computed: %d", readHeaderChecksum, computedHeaderChecksum)
+ }
+
+ // First 17 bits - payload size after compression
+ compressedLen := uint32(headerBuf[0]) | uint32(headerBuf[1])<<8 |
uint32(headerBuf[2]&0x1)<<16
+
+ // The next 17 bits - payload size before compression
+ uncompressedLen := (uint32(headerBuf[2]) >> 1) |
uint32(headerBuf[3])<<7 | uint32(headerBuf[4]&0b11)<<15
+
+ // Self-contained flag
+ selfContained := (headerBuf[4] & 0b100) != 0
+
+ compressedPayload := make([]byte, compressedLen)
+ if _, err = io.ReadFull(r, compressedPayload); err != nil {
+ return nil, false, fmt.Errorf("gocql: failed to read compressed
frame payload, err: %w", err)
+ }
+
+ if _, err = io.ReadFull(r, headerBuf[:4]); err != nil {
Review Comment:
NIT: `io.ReadFull(r, headerBuf[:crc32Size])`
##########
frame.go:
##########
@@ -2070,3 +2133,247 @@ func (f *framer) writeBytesMap(m map[string][]byte) {
f.writeBytes(v)
}
}
+
+func (f *framer) prepareModernLayout() error {
+ // Ensure protocol version is V5 or higher
+ if f.proto < protoVersion5 {
+ panic("Modern layout is not supported with version V4 or less")
+ }
+
+ selfContained := true
+
+ var (
+ adjustedBuf []byte
+ tempBuf []byte
+ err error
+ )
+
+ // Process the buffer in chunks if it exceeds the max payload size
+ for len(f.buf) > maxPayloadSize {
+ if f.compres != nil {
+ tempBuf, err =
newCompressedFrame(f.buf[:maxPayloadSize], false, f.compres)
+ } else {
+ tempBuf, err =
newUncompressedFrame(f.buf[:maxPayloadSize], false)
+ }
+ if err != nil {
+ return err
+ }
+
+ adjustedBuf = append(adjustedBuf, tempBuf...)
+ f.buf = f.buf[maxPayloadSize:]
+ selfContained = false
+ }
+
+ // Process the remaining buffer
+ if f.compres != nil {
+ tempBuf, err = newCompressedFrame(f.buf, selfContained,
f.compres)
+ } else {
+ tempBuf, err = newUncompressedFrame(f.buf, selfContained)
+ }
+ if err != nil {
+ return err
+ }
+
+ adjustedBuf = append(adjustedBuf, tempBuf...)
+ f.buf = adjustedBuf
+
+ return nil
+}
+
+func readUncompressedFrame(r io.Reader) ([]byte, bool, error) {
+ const headerSize = 6
Review Comment:
NIT: I'd say put this as `3` and add a constant for `crc24Size`
##########
frame.go:
##########
@@ -2070,3 +2133,247 @@ func (f *framer) writeBytesMap(m map[string][]byte) {
f.writeBytes(v)
}
}
+
+func (f *framer) prepareModernLayout() error {
+ // Ensure protocol version is V5 or higher
+ if f.proto < protoVersion5 {
+ panic("Modern layout is not supported with version V4 or less")
+ }
+
+ selfContained := true
+
+ var (
+ adjustedBuf []byte
+ tempBuf []byte
+ err error
+ )
+
+ // Process the buffer in chunks if it exceeds the max payload size
+ for len(f.buf) > maxPayloadSize {
+ if f.compres != nil {
+ tempBuf, err =
newCompressedFrame(f.buf[:maxPayloadSize], false, f.compres)
+ } else {
+ tempBuf, err =
newUncompressedFrame(f.buf[:maxPayloadSize], false)
+ }
+ if err != nil {
+ return err
+ }
+
+ adjustedBuf = append(adjustedBuf, tempBuf...)
+ f.buf = f.buf[maxPayloadSize:]
+ selfContained = false
+ }
+
+ // Process the remaining buffer
+ if f.compres != nil {
+ tempBuf, err = newCompressedFrame(f.buf, selfContained,
f.compres)
+ } else {
+ tempBuf, err = newUncompressedFrame(f.buf, selfContained)
+ }
+ if err != nil {
+ return err
+ }
+
+ adjustedBuf = append(adjustedBuf, tempBuf...)
+ f.buf = adjustedBuf
+
+ return nil
+}
+
+func readUncompressedFrame(r io.Reader) ([]byte, bool, error) {
+ const headerSize = 6
+ header := [headerSize + 1]byte{}
+
+ // Read the frame header
+ if _, err := io.ReadFull(r, header[:headerSize]); err != nil {
+ return nil, false, fmt.Errorf("gocql: failed to read
uncompressed frame, err: %w", err)
+ }
+
+ // Compute and verify the header CRC24
+ computedHeaderCRC24 := KoopmanChecksum(header[:3])
+ readHeaderCRC24 := binary.LittleEndian.Uint32(header[3:]) & 0xFFFFFF
+ if computedHeaderCRC24 != readHeaderCRC24 {
+ return nil, false, fmt.Errorf("gocql: crc24 mismatch in frame
header, computed: %d, got: %d", computedHeaderCRC24, readHeaderCRC24)
+ }
+
+ // Extract the payload length and self-contained flag
+ headerInt := binary.LittleEndian.Uint32(header[:4])
+ payloadLen := int(headerInt & 0x1FFFF)
+ isSelfContained := (headerInt & (1 << 17)) != 0
+
+ // Read the payload
+ payload := make([]byte, payloadLen)
+ if _, err := io.ReadFull(r, payload); err != nil {
+ return nil, false, fmt.Errorf("gocql: failed to read
uncompressed frame payload, err: %w", err)
+ }
+
+ // Read and verify the payload CRC32
+ if _, err := io.ReadFull(r, header[:4]); err != nil {
+ return nil, false, fmt.Errorf("gocql: failed to read payload
crc32, err: %w", err)
+ }
+
+ computedPayloadCRC32 := ChecksumIEEE(payload)
+ readPayloadCRC32 := binary.LittleEndian.Uint32(header[:4])
+ if computedPayloadCRC32 != readPayloadCRC32 {
+ return nil, false, fmt.Errorf("gocql: payload crc32 mismatch,
computed: %d, got: %d", computedPayloadCRC32, readPayloadCRC32)
+ }
+
+ return payload, isSelfContained, nil
+}
+
+func newUncompressedFrame(payload []byte, isSelfContained bool) ([]byte,
error) {
+ const (
+ headerSize = 6
+ selfContainedBit = 1 << 17
+ )
+
+ payloadLen := len(payload)
+ if payloadLen > maxPayloadSize {
+ return nil, fmt.Errorf("payload length (%d) exceeds maximum
size of 128 KiB", payloadLen)
+ }
+
+ // Create the frame
+ frameSize := headerSize + payloadLen + 4 // 4 bytes for CRC32
+ frame := make([]byte, frameSize)
+
+ // First 3 bytes: payload length and self-contained flag
+ headerInt := uint32(payloadLen) & 0x1FFFF
+ if isSelfContained {
+ headerInt |= selfContainedBit // Set the self-contained flag
+ }
+
+ // Encode the first 3 bytes as a single little-endian integer
+ frame[0] = byte(headerInt)
+ frame[1] = byte(headerInt >> 8)
+ frame[2] = byte(headerInt >> 16)
+
+ // Calculate CRC24 for the first 3 bytes of the header
+ crc := KoopmanChecksum(frame[:3])
+
+ // Encode CRC24 into the next 3 bytes of the header
+ frame[3] = byte(crc)
+ frame[4] = byte(crc >> 8)
+ frame[5] = byte(crc >> 16)
+
+ copy(frame[headerSize:], payload) // Copy the payload to the frame
+
+ // Calculate CRC32 for the payload
+ payloadCRC32 := ChecksumIEEE(payload)
+ binary.LittleEndian.PutUint32(frame[headerSize+payloadLen:],
payloadCRC32)
+
+ return frame, nil
+}
+
+func newCompressedFrame(uncompressedPayload []byte, isSelfContained bool,
compressor Compressor) ([]byte, error) {
+ uncompressedLen := len(uncompressedPayload)
+ if uncompressedLen > maxPayloadSize {
+ return nil, fmt.Errorf("uncompressed compressed payload length
exceedes max size of frame payload %d/%d", uncompressedLen, maxPayloadSize)
+ }
+
+ compressedPayload, err := compressor.Encode(uncompressedPayload)
+ if err != nil {
+ return nil, err
+ }
+
+ // Skip the first 4 bytes because the size of the uncompressed payload
is written in the frame header, not in the
+ // body of the compressed envelope
+ compressedPayload = compressedPayload[4:]
+
+ compressedLen := len(compressedPayload)
+
+ // Compression is not worth it
+ if uncompressedLen < compressedLen {
+ // native_protocol_v5.spec
+ // 2.2
+ // An uncompressed length of 0 signals that the compressed
payload
+ // should be used as-is and not decompressed.
+ compressedPayload = uncompressedPayload
+ compressedLen = uncompressedLen
+ uncompressedLen = 0
+ }
+
+ // Combine compressed and uncompressed lengths and set the
self-contained flag if needed
+ combined := uint64(compressedLen) | uint64(uncompressedLen)<<17
+ if isSelfContained {
+ combined |= 1 << 34
+ }
+
+ var headerBuf [8]byte
+
+ // Write the combined value into the header buffer
+ binary.LittleEndian.PutUint64(headerBuf[:], combined)
+
+ // Create a buffer with enough capacity to hold the header, compressed
payload, and checksums
+ buf := bytes.NewBuffer(make([]byte, 0, 8+compressedLen+4))
+
+ // Write the first 5 bytes of the header (compressed and uncompressed
sizes)
+ buf.Write(headerBuf[:5])
+
+ // Compute and write the CRC24 checksum of the first 5 bytes
+ headerChecksum := KoopmanChecksum(headerBuf[:5])
+ binary.LittleEndian.PutUint32(headerBuf[:], headerChecksum)
+ buf.Write(headerBuf[:3])
Review Comment:
Hmm this works but it is a bit confusing (`PutUint32` but then we only write
24 bits). How about:
```
// LittleEndian 3 bytes
headerBuf[0] = byte(headerChecksum)
headerBuf[1] = byte(headerChecksum >> 8)
headerBuf[2] = byte(headerChecksum >> 16)
buf.Write(headerBuf[:3])
```
##########
frame.go:
##########
@@ -2070,3 +2133,247 @@ func (f *framer) writeBytesMap(m map[string][]byte) {
f.writeBytes(v)
}
}
+
+func (f *framer) prepareModernLayout() error {
+ // Ensure protocol version is V5 or higher
+ if f.proto < protoVersion5 {
+ panic("Modern layout is not supported with version V4 or less")
+ }
+
+ selfContained := true
+
+ var (
+ adjustedBuf []byte
+ tempBuf []byte
+ err error
+ )
+
+ // Process the buffer in chunks if it exceeds the max payload size
+ for len(f.buf) > maxPayloadSize {
+ if f.compres != nil {
+ tempBuf, err =
newCompressedFrame(f.buf[:maxPayloadSize], false, f.compres)
+ } else {
+ tempBuf, err =
newUncompressedFrame(f.buf[:maxPayloadSize], false)
+ }
+ if err != nil {
+ return err
+ }
+
+ adjustedBuf = append(adjustedBuf, tempBuf...)
+ f.buf = f.buf[maxPayloadSize:]
+ selfContained = false
+ }
+
+ // Process the remaining buffer
+ if f.compres != nil {
+ tempBuf, err = newCompressedFrame(f.buf, selfContained,
f.compres)
+ } else {
+ tempBuf, err = newUncompressedFrame(f.buf, selfContained)
+ }
+ if err != nil {
+ return err
+ }
+
+ adjustedBuf = append(adjustedBuf, tempBuf...)
+ f.buf = adjustedBuf
+
+ return nil
+}
+
+func readUncompressedFrame(r io.Reader) ([]byte, bool, error) {
+ const headerSize = 6
+ header := [headerSize + 1]byte{}
+
+ // Read the frame header
+ if _, err := io.ReadFull(r, header[:headerSize]); err != nil {
+ return nil, false, fmt.Errorf("gocql: failed to read
uncompressed frame, err: %w", err)
+ }
+
+ // Compute and verify the header CRC24
+ computedHeaderCRC24 := KoopmanChecksum(header[:3])
+ readHeaderCRC24 := binary.LittleEndian.Uint32(header[3:]) & 0xFFFFFF
+ if computedHeaderCRC24 != readHeaderCRC24 {
+ return nil, false, fmt.Errorf("gocql: crc24 mismatch in frame
header, computed: %d, got: %d", computedHeaderCRC24, readHeaderCRC24)
+ }
+
+ // Extract the payload length and self-contained flag
+ headerInt := binary.LittleEndian.Uint32(header[:4])
+ payloadLen := int(headerInt & 0x1FFFF)
+ isSelfContained := (headerInt & (1 << 17)) != 0
+
+ // Read the payload
+ payload := make([]byte, payloadLen)
+ if _, err := io.ReadFull(r, payload); err != nil {
+ return nil, false, fmt.Errorf("gocql: failed to read
uncompressed frame payload, err: %w", err)
+ }
+
+ // Read and verify the payload CRC32
+ if _, err := io.ReadFull(r, header[:4]); err != nil {
+ return nil, false, fmt.Errorf("gocql: failed to read payload
crc32, err: %w", err)
+ }
+
+ computedPayloadCRC32 := ChecksumIEEE(payload)
+ readPayloadCRC32 := binary.LittleEndian.Uint32(header[:4])
+ if computedPayloadCRC32 != readPayloadCRC32 {
+ return nil, false, fmt.Errorf("gocql: payload crc32 mismatch,
computed: %d, got: %d", computedPayloadCRC32, readPayloadCRC32)
+ }
+
+ return payload, isSelfContained, nil
+}
+
+func newUncompressedFrame(payload []byte, isSelfContained bool) ([]byte,
error) {
+ const (
+ headerSize = 6
+ selfContainedBit = 1 << 17
+ )
+
+ payloadLen := len(payload)
+ if payloadLen > maxPayloadSize {
+ return nil, fmt.Errorf("payload length (%d) exceeds maximum
size of 128 KiB", payloadLen)
+ }
+
+ // Create the frame
+ frameSize := headerSize + payloadLen + 4 // 4 bytes for CRC32
+ frame := make([]byte, frameSize)
+
+ // First 3 bytes: payload length and self-contained flag
+ headerInt := uint32(payloadLen) & 0x1FFFF
Review Comment:
I don't think the `& 0x1FFFF` part is necessary since we already checked
that this value is within the acceptable payload length.
##########
conn.go:
##########
@@ -777,6 +801,47 @@ func (c *Conn) handleTimeout() {
}
}
+func (c *Conn) recvProtoV5Frame(ctx context.Context) error {
+ var (
+ payload []byte
+ isSelfContained bool
+ err error
+ )
+
+ // Read frame based on compression
+ if c.compressor != nil {
+ payload, isSelfContained, err = readCompressedFrame(c.r,
c.compressor)
Review Comment:
rename `payload` to `frame`
##########
crc.go:
##########
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package gocql
+
+import (
+ "hash/crc32"
+)
+
+var (
+ // Initial CRC32 bytes: 0xFA, 0x2D, 0x55, 0xCA
+ initialCRC32Bytes = []byte{0xfa, 0x2d, 0x55, 0xca}
+)
+
+// ChecksumIEEE calculates the CRC32 checksum of the given byte slice.
+func ChecksumIEEE(b []byte) uint32 {
Review Comment:
NIT: I think it would be better for readability to reference `Crc32` and
`Crc24` in the name of these two functions instead of `IEEE` and `Koopman`
##########
conn.go:
##########
@@ -1756,6 +1888,41 @@ func (c *Conn) awaitSchemaAgreement(ctx context.Context)
(err error) {
return fmt.Errorf("gocql: cluster schema versions not consistent: %+v",
schemas)
}
+// recvLastsFrames reads proto v5 frames from Conn.r and writes decoded
payload to dst.
+// It reads data until the bytesToRead is reached.
+// If Conn.compressor is not nil, it processes Compressed Format frames.
+func (c *Conn) recvLastsFrames(dst *bytes.Buffer, bytesToRead int) error {
+ var read int
+ var segment []byte
+ var err error
+ for read != bytesToRead {
+ // Read frame based on compression
+ if c.compressor != nil {
+ segment, _, err = readCompressedFrame(c.r, c.compressor)
+ } else {
+ segment, _, err = readUncompressedFrame(c.r)
+ }
+ if err != nil {
+ return fmt.Errorf("gocql: failed to read non
self-contained frame: %w", err)
+ }
+
+ // Write the segment to the destination writer
+ n, _ := dst.Write(segment)
+ read += n
+ }
+
+ return nil
+}
+
+func (c *Conn) processAllEnvelopesInFrame(ctx context.Context, r
*bytes.Reader) error {
Review Comment:
rename to `processAllFramesInSegment`
##########
conn.go:
##########
@@ -1756,6 +1888,41 @@ func (c *Conn) awaitSchemaAgreement(ctx context.Context)
(err error) {
return fmt.Errorf("gocql: cluster schema versions not consistent: %+v",
schemas)
}
+// recvLastsFrames reads proto v5 frames from Conn.r and writes decoded
payload to dst.
+// It reads data until the bytesToRead is reached.
+// If Conn.compressor is not nil, it processes Compressed Format frames.
+func (c *Conn) recvLastsFrames(dst *bytes.Buffer, bytesToRead int) error {
Review Comment:
NIT: rename to `recvPartialFrames`
Or if you prefer `recvLastFrames` then just fix the typo
##########
frame.go:
##########
@@ -2070,3 +2133,247 @@ func (f *framer) writeBytesMap(m map[string][]byte) {
f.writeBytes(v)
}
}
+
+func (f *framer) prepareModernLayout() error {
+ // Ensure protocol version is V5 or higher
+ if f.proto < protoVersion5 {
+ panic("Modern layout is not supported with version V4 or less")
+ }
+
+ selfContained := true
+
+ var (
+ adjustedBuf []byte
+ tempBuf []byte
+ err error
+ )
+
+ // Process the buffer in chunks if it exceeds the max payload size
+ for len(f.buf) > maxPayloadSize {
+ if f.compres != nil {
+ tempBuf, err =
newCompressedFrame(f.buf[:maxPayloadSize], false, f.compres)
+ } else {
+ tempBuf, err =
newUncompressedFrame(f.buf[:maxPayloadSize], false)
+ }
+ if err != nil {
+ return err
+ }
+
+ adjustedBuf = append(adjustedBuf, tempBuf...)
+ f.buf = f.buf[maxPayloadSize:]
+ selfContained = false
+ }
+
+ // Process the remaining buffer
+ if f.compres != nil {
+ tempBuf, err = newCompressedFrame(f.buf, selfContained,
f.compres)
+ } else {
+ tempBuf, err = newUncompressedFrame(f.buf, selfContained)
+ }
+ if err != nil {
+ return err
+ }
+
+ adjustedBuf = append(adjustedBuf, tempBuf...)
+ f.buf = adjustedBuf
+
+ return nil
+}
+
+func readUncompressedFrame(r io.Reader) ([]byte, bool, error) {
+ const headerSize = 6
+ header := [headerSize + 1]byte{}
+
+ // Read the frame header
+ if _, err := io.ReadFull(r, header[:headerSize]); err != nil {
+ return nil, false, fmt.Errorf("gocql: failed to read
uncompressed frame, err: %w", err)
+ }
+
+ // Compute and verify the header CRC24
+ computedHeaderCRC24 := KoopmanChecksum(header[:3])
+ readHeaderCRC24 := binary.LittleEndian.Uint32(header[3:]) & 0xFFFFFF
Review Comment:
Replace with `readHeaderCRC24 := uint32(header[3]) | uint32(header[4])<<8 |
uint32(header[5])<<16`
so that we can declare the header as a `6` byte array instead of `7`
##########
conn.go:
##########
@@ -1756,6 +1888,41 @@ func (c *Conn) awaitSchemaAgreement(ctx context.Context)
(err error) {
return fmt.Errorf("gocql: cluster schema versions not consistent: %+v",
schemas)
}
+// recvLastsFrames reads proto v5 frames from Conn.r and writes decoded
payload to dst.
Review Comment:
NIT: `reads proto v5 segments from Conn.r and writes decoded partial frames
to dst.`
##########
conn.go:
##########
@@ -694,7 +718,7 @@ func (c *Conn) recv(ctx context.Context) error {
} else if head.stream == -1 {
// TODO: handle cassandra event frames, we shouldnt get any
currently
framer := newFramer(c.compressor, c.version)
- if err := framer.readFrame(c, &head); err != nil {
+ if err := framer.readFrame(r, &head); err != nil {
return err
}
go c.session.handleEvent(framer)
Review Comment:
Line 730:
```
if err := framer.readFrame(c, &head); err != nil {
return err
}
```
This should be changed to `r` as well right?
I don't like that `Conn` implements `io.Reader` and also has a
`*bufio.Reader` field, it makes it very easy to use one when we actually wanted
the other...
I think this is even the case on `readUncompressedFrame` and
`readCompressedFrame`... It's using `c.r` directly but it should be using `c`
so the additional logic from `Conn.Read()` is used right?
I think we should create a type (maybe `connReader`?) that wraps the
`*bufio.Reader` object and then move the implementation of `Conn.Read()` to
that new type so we ensure that we only have one object that implements
`io.Reader`. This way `c.r` becomes an object of `*connReader` instead of
`*bufio.Reader`, `Conn` no longer implements `io.Reader` and everytime we want
to actually read from the socket we have to use the `c.r` field instead of `c`
directly.
##########
conn.go:
##########
@@ -1756,6 +1888,41 @@ func (c *Conn) awaitSchemaAgreement(ctx context.Context)
(err error) {
return fmt.Errorf("gocql: cluster schema versions not consistent: %+v",
schemas)
}
+// recvLastsFrames reads proto v5 frames from Conn.r and writes decoded
payload to dst.
+// It reads data until the bytesToRead is reached.
+// If Conn.compressor is not nil, it processes Compressed Format frames.
+func (c *Conn) recvLastsFrames(dst *bytes.Buffer, bytesToRead int) error {
+ var read int
+ var segment []byte
+ var err error
+ for read != bytesToRead {
+ // Read frame based on compression
+ if c.compressor != nil {
+ segment, _, err = readCompressedFrame(c.r, c.compressor)
+ } else {
+ segment, _, err = readUncompressedFrame(c.r)
Review Comment:
We could check the self contained flag to make sure it is false and if it's
not then return a custom "protocol error" so `go.serve` logs it and closes the
connection.
##########
conn.go:
##########
@@ -777,6 +801,47 @@ func (c *Conn) handleTimeout() {
}
}
+func (c *Conn) recvProtoV5Frame(ctx context.Context) error {
Review Comment:
rename `recvProtoV5Frame` to `recvSegment` (I'd say not to include the
protocol version in the name since there will eventually be a protocol v6 that
will very likely follow this format)
##########
frame.go:
##########
@@ -2070,3 +2133,247 @@ func (f *framer) writeBytesMap(m map[string][]byte) {
f.writeBytes(v)
}
}
+
+func (f *framer) prepareModernLayout() error {
+ // Ensure protocol version is V5 or higher
+ if f.proto < protoVersion5 {
+ panic("Modern layout is not supported with version V4 or less")
+ }
+
+ selfContained := true
+
+ var (
+ adjustedBuf []byte
+ tempBuf []byte
+ err error
+ )
+
+ // Process the buffer in chunks if it exceeds the max payload size
+ for len(f.buf) > maxPayloadSize {
+ if f.compres != nil {
+ tempBuf, err =
newCompressedFrame(f.buf[:maxPayloadSize], false, f.compres)
+ } else {
+ tempBuf, err =
newUncompressedFrame(f.buf[:maxPayloadSize], false)
+ }
+ if err != nil {
+ return err
+ }
+
+ adjustedBuf = append(adjustedBuf, tempBuf...)
+ f.buf = f.buf[maxPayloadSize:]
+ selfContained = false
+ }
+
+ // Process the remaining buffer
+ if f.compres != nil {
+ tempBuf, err = newCompressedFrame(f.buf, selfContained,
f.compres)
+ } else {
+ tempBuf, err = newUncompressedFrame(f.buf, selfContained)
+ }
+ if err != nil {
+ return err
+ }
+
+ adjustedBuf = append(adjustedBuf, tempBuf...)
+ f.buf = adjustedBuf
+
+ return nil
+}
+
+func readUncompressedFrame(r io.Reader) ([]byte, bool, error) {
+ const headerSize = 6
+ header := [headerSize + 1]byte{}
+
+ // Read the frame header
+ if _, err := io.ReadFull(r, header[:headerSize]); err != nil {
+ return nil, false, fmt.Errorf("gocql: failed to read
uncompressed frame, err: %w", err)
+ }
+
+ // Compute and verify the header CRC24
+ computedHeaderCRC24 := KoopmanChecksum(header[:3])
+ readHeaderCRC24 := binary.LittleEndian.Uint32(header[3:]) & 0xFFFFFF
+ if computedHeaderCRC24 != readHeaderCRC24 {
+ return nil, false, fmt.Errorf("gocql: crc24 mismatch in frame
header, computed: %d, got: %d", computedHeaderCRC24, readHeaderCRC24)
+ }
+
+ // Extract the payload length and self-contained flag
+ headerInt := binary.LittleEndian.Uint32(header[:4])
+ payloadLen := int(headerInt & 0x1FFFF)
Review Comment:
NIT: make `0x1FFFF` a constant (max segment payload size)
##########
conn.go:
##########
@@ -777,6 +801,47 @@ func (c *Conn) handleTimeout() {
}
}
+func (c *Conn) recvProtoV5Frame(ctx context.Context) error {
+ var (
+ payload []byte
+ isSelfContained bool
+ err error
+ )
+
+ // Read frame based on compression
+ if c.compressor != nil {
+ payload, isSelfContained, err = readCompressedFrame(c.r,
c.compressor)
+ } else {
+ payload, isSelfContained, err = readUncompressedFrame(c.r)
+ }
+ if err != nil {
+ return err
+ }
+
+ if isSelfContained {
+ return c.processAllEnvelopesInFrame(ctx,
bytes.NewReader(payload))
+ }
+
+ head, err := readHeader(bytes.NewReader(payload), c.headerBuf[:])
+ if err != nil {
+ return err
+ }
+
+ const envelopeHeaderLength = 9
Review Comment:
rename to `frameHeaderLength`
##########
frame.go:
##########
@@ -2070,3 +2133,247 @@ func (f *framer) writeBytesMap(m map[string][]byte) {
f.writeBytes(v)
}
}
+
+func (f *framer) prepareModernLayout() error {
+ // Ensure protocol version is V5 or higher
+ if f.proto < protoVersion5 {
+ panic("Modern layout is not supported with version V4 or less")
+ }
+
+ selfContained := true
+
+ var (
+ adjustedBuf []byte
+ tempBuf []byte
+ err error
+ )
+
+ // Process the buffer in chunks if it exceeds the max payload size
+ for len(f.buf) > maxPayloadSize {
+ if f.compres != nil {
+ tempBuf, err =
newCompressedFrame(f.buf[:maxPayloadSize], false, f.compres)
+ } else {
+ tempBuf, err =
newUncompressedFrame(f.buf[:maxPayloadSize], false)
+ }
+ if err != nil {
+ return err
+ }
+
+ adjustedBuf = append(adjustedBuf, tempBuf...)
+ f.buf = f.buf[maxPayloadSize:]
+ selfContained = false
+ }
+
+ // Process the remaining buffer
+ if f.compres != nil {
+ tempBuf, err = newCompressedFrame(f.buf, selfContained,
f.compres)
+ } else {
+ tempBuf, err = newUncompressedFrame(f.buf, selfContained)
+ }
+ if err != nil {
+ return err
+ }
+
+ adjustedBuf = append(adjustedBuf, tempBuf...)
+ f.buf = adjustedBuf
+
+ return nil
+}
+
+func readUncompressedFrame(r io.Reader) ([]byte, bool, error) {
+ const headerSize = 6
+ header := [headerSize + 1]byte{}
+
+ // Read the frame header
+ if _, err := io.ReadFull(r, header[:headerSize]); err != nil {
+ return nil, false, fmt.Errorf("gocql: failed to read
uncompressed frame, err: %w", err)
+ }
+
+ // Compute and verify the header CRC24
+ computedHeaderCRC24 := KoopmanChecksum(header[:3])
+ readHeaderCRC24 := binary.LittleEndian.Uint32(header[3:]) & 0xFFFFFF
+ if computedHeaderCRC24 != readHeaderCRC24 {
+ return nil, false, fmt.Errorf("gocql: crc24 mismatch in frame
header, computed: %d, got: %d", computedHeaderCRC24, readHeaderCRC24)
+ }
+
+ // Extract the payload length and self-contained flag
+ headerInt := binary.LittleEndian.Uint32(header[:4])
+ payloadLen := int(headerInt & 0x1FFFF)
+ isSelfContained := (headerInt & (1 << 17)) != 0
+
+ // Read the payload
+ payload := make([]byte, payloadLen)
+ if _, err := io.ReadFull(r, payload); err != nil {
+ return nil, false, fmt.Errorf("gocql: failed to read
uncompressed frame payload, err: %w", err)
+ }
+
+ // Read and verify the payload CRC32
+ if _, err := io.ReadFull(r, header[:4]); err != nil {
+ return nil, false, fmt.Errorf("gocql: failed to read payload
crc32, err: %w", err)
+ }
+
+ computedPayloadCRC32 := ChecksumIEEE(payload)
+ readPayloadCRC32 := binary.LittleEndian.Uint32(header[:4])
+ if computedPayloadCRC32 != readPayloadCRC32 {
+ return nil, false, fmt.Errorf("gocql: payload crc32 mismatch,
computed: %d, got: %d", computedPayloadCRC32, readPayloadCRC32)
+ }
+
+ return payload, isSelfContained, nil
+}
+
+func newUncompressedFrame(payload []byte, isSelfContained bool) ([]byte,
error) {
+ const (
+ headerSize = 6
+ selfContainedBit = 1 << 17
+ )
+
+ payloadLen := len(payload)
+ if payloadLen > maxPayloadSize {
+ return nil, fmt.Errorf("payload length (%d) exceeds maximum
size of 128 KiB", payloadLen)
+ }
+
+ // Create the frame
+ frameSize := headerSize + payloadLen + 4 // 4 bytes for CRC32
+ frame := make([]byte, frameSize)
+
+ // First 3 bytes: payload length and self-contained flag
+ headerInt := uint32(payloadLen) & 0x1FFFF
+ if isSelfContained {
+ headerInt |= selfContainedBit // Set the self-contained flag
+ }
+
+ // Encode the first 3 bytes as a single little-endian integer
+ frame[0] = byte(headerInt)
+ frame[1] = byte(headerInt >> 8)
+ frame[2] = byte(headerInt >> 16)
+
+ // Calculate CRC24 for the first 3 bytes of the header
+ crc := KoopmanChecksum(frame[:3])
+
+ // Encode CRC24 into the next 3 bytes of the header
+ frame[3] = byte(crc)
+ frame[4] = byte(crc >> 8)
+ frame[5] = byte(crc >> 16)
+
+ copy(frame[headerSize:], payload) // Copy the payload to the frame
+
+ // Calculate CRC32 for the payload
+ payloadCRC32 := ChecksumIEEE(payload)
+ binary.LittleEndian.PutUint32(frame[headerSize+payloadLen:],
payloadCRC32)
+
+ return frame, nil
+}
+
+func newCompressedFrame(uncompressedPayload []byte, isSelfContained bool,
compressor Compressor) ([]byte, error) {
+ uncompressedLen := len(uncompressedPayload)
+ if uncompressedLen > maxPayloadSize {
+ return nil, fmt.Errorf("uncompressed compressed payload length
exceedes max size of frame payload %d/%d", uncompressedLen, maxPayloadSize)
+ }
+
+ compressedPayload, err := compressor.Encode(uncompressedPayload)
+ if err != nil {
+ return nil, err
+ }
+
+ // Skip the first 4 bytes because the size of the uncompressed payload
is written in the frame header, not in the
+ // body of the compressed envelope
+ compressedPayload = compressedPayload[4:]
+
+ compressedLen := len(compressedPayload)
+
+ // Compression is not worth it
+ if uncompressedLen < compressedLen {
+ // native_protocol_v5.spec
+ // 2.2
+ // An uncompressed length of 0 signals that the compressed
payload
+ // should be used as-is and not decompressed.
+ compressedPayload = uncompressedPayload
+ compressedLen = uncompressedLen
+ uncompressedLen = 0
+ }
+
+ // Combine compressed and uncompressed lengths and set the
self-contained flag if needed
+ combined := uint64(compressedLen) | uint64(uncompressedLen)<<17
+ if isSelfContained {
+ combined |= 1 << 34
+ }
+
+ var headerBuf [8]byte
+
+ // Write the combined value into the header buffer
+ binary.LittleEndian.PutUint64(headerBuf[:], combined)
+
+ // Create a buffer with enough capacity to hold the header, compressed
payload, and checksums
+ buf := bytes.NewBuffer(make([]byte, 0, 8+compressedLen+4))
+
+ // Write the first 5 bytes of the header (compressed and uncompressed
sizes)
+ buf.Write(headerBuf[:5])
+
+ // Compute and write the CRC24 checksum of the first 5 bytes
+ headerChecksum := KoopmanChecksum(headerBuf[:5])
+ binary.LittleEndian.PutUint32(headerBuf[:], headerChecksum)
+ buf.Write(headerBuf[:3])
+ buf.Write(compressedPayload)
+
+ // Compute and write the CRC32 checksum of the payload
+ payloadChecksum := ChecksumIEEE(compressedPayload)
+ binary.LittleEndian.PutUint32(headerBuf[:], payloadChecksum)
+ buf.Write(headerBuf[:4])
+
+ return buf.Bytes(), nil
+}
+
+func readCompressedFrame(r io.Reader, compressor Compressor) ([]byte, bool,
error) {
Review Comment:
rename to `readCompressedSegment`
##########
frame.go:
##########
@@ -2070,3 +2133,247 @@ func (f *framer) writeBytesMap(m map[string][]byte) {
f.writeBytes(v)
}
}
+
+func (f *framer) prepareModernLayout() error {
+ // Ensure protocol version is V5 or higher
+ if f.proto < protoVersion5 {
+ panic("Modern layout is not supported with version V4 or less")
+ }
+
+ selfContained := true
+
+ var (
+ adjustedBuf []byte
+ tempBuf []byte
+ err error
+ )
+
+ // Process the buffer in chunks if it exceeds the max payload size
+ for len(f.buf) > maxPayloadSize {
+ if f.compres != nil {
+ tempBuf, err =
newCompressedFrame(f.buf[:maxPayloadSize], false, f.compres)
+ } else {
+ tempBuf, err =
newUncompressedFrame(f.buf[:maxPayloadSize], false)
+ }
+ if err != nil {
+ return err
+ }
+
+ adjustedBuf = append(adjustedBuf, tempBuf...)
+ f.buf = f.buf[maxPayloadSize:]
+ selfContained = false
+ }
+
+ // Process the remaining buffer
+ if f.compres != nil {
+ tempBuf, err = newCompressedFrame(f.buf, selfContained,
f.compres)
+ } else {
+ tempBuf, err = newUncompressedFrame(f.buf, selfContained)
+ }
+ if err != nil {
+ return err
+ }
+
+ adjustedBuf = append(adjustedBuf, tempBuf...)
+ f.buf = adjustedBuf
+
+ return nil
+}
+
+func readUncompressedFrame(r io.Reader) ([]byte, bool, error) {
+ const headerSize = 6
+ header := [headerSize + 1]byte{}
+
+ // Read the frame header
+ if _, err := io.ReadFull(r, header[:headerSize]); err != nil {
+ return nil, false, fmt.Errorf("gocql: failed to read
uncompressed frame, err: %w", err)
+ }
+
+ // Compute and verify the header CRC24
+ computedHeaderCRC24 := KoopmanChecksum(header[:3])
+ readHeaderCRC24 := binary.LittleEndian.Uint32(header[3:]) & 0xFFFFFF
+ if computedHeaderCRC24 != readHeaderCRC24 {
+ return nil, false, fmt.Errorf("gocql: crc24 mismatch in frame
header, computed: %d, got: %d", computedHeaderCRC24, readHeaderCRC24)
+ }
+
+ // Extract the payload length and self-contained flag
+ headerInt := binary.LittleEndian.Uint32(header[:4])
+ payloadLen := int(headerInt & 0x1FFFF)
+ isSelfContained := (headerInt & (1 << 17)) != 0
+
+ // Read the payload
+ payload := make([]byte, payloadLen)
+ if _, err := io.ReadFull(r, payload); err != nil {
+ return nil, false, fmt.Errorf("gocql: failed to read
uncompressed frame payload, err: %w", err)
+ }
+
+ // Read and verify the payload CRC32
+ if _, err := io.ReadFull(r, header[:4]); err != nil {
Review Comment:
NIT: make `4` a constant (crc32 length), `io.ReadFull(r, header[:crc32Size])`
##########
frame.go:
##########
@@ -2070,3 +2133,247 @@ func (f *framer) writeBytesMap(m map[string][]byte) {
f.writeBytes(v)
}
}
+
+func (f *framer) prepareModernLayout() error {
+ // Ensure protocol version is V5 or higher
+ if f.proto < protoVersion5 {
+ panic("Modern layout is not supported with version V4 or less")
+ }
+
+ selfContained := true
+
+ var (
+ adjustedBuf []byte
+ tempBuf []byte
+ err error
+ )
+
+ // Process the buffer in chunks if it exceeds the max payload size
+ for len(f.buf) > maxPayloadSize {
+ if f.compres != nil {
+ tempBuf, err =
newCompressedFrame(f.buf[:maxPayloadSize], false, f.compres)
+ } else {
+ tempBuf, err =
newUncompressedFrame(f.buf[:maxPayloadSize], false)
+ }
+ if err != nil {
+ return err
+ }
+
+ adjustedBuf = append(adjustedBuf, tempBuf...)
+ f.buf = f.buf[maxPayloadSize:]
+ selfContained = false
+ }
+
+ // Process the remaining buffer
+ if f.compres != nil {
+ tempBuf, err = newCompressedFrame(f.buf, selfContained,
f.compres)
+ } else {
+ tempBuf, err = newUncompressedFrame(f.buf, selfContained)
+ }
+ if err != nil {
+ return err
+ }
+
+ adjustedBuf = append(adjustedBuf, tempBuf...)
+ f.buf = adjustedBuf
+
+ return nil
+}
+
+func readUncompressedFrame(r io.Reader) ([]byte, bool, error) {
+ const headerSize = 6
+ header := [headerSize + 1]byte{}
+
+ // Read the frame header
+ if _, err := io.ReadFull(r, header[:headerSize]); err != nil {
Review Comment:
`io.ReadFull(r, header[:])` after making the array size of `6` instead of
`7` (see comment below)
##########
frame.go:
##########
@@ -2070,3 +2133,247 @@ func (f *framer) writeBytesMap(m map[string][]byte) {
f.writeBytes(v)
}
}
+
+func (f *framer) prepareModernLayout() error {
+ // Ensure protocol version is V5 or higher
+ if f.proto < protoVersion5 {
+ panic("Modern layout is not supported with version V4 or less")
+ }
+
+ selfContained := true
+
+ var (
+ adjustedBuf []byte
+ tempBuf []byte
+ err error
+ )
+
+ // Process the buffer in chunks if it exceeds the max payload size
+ for len(f.buf) > maxPayloadSize {
+ if f.compres != nil {
+ tempBuf, err =
newCompressedFrame(f.buf[:maxPayloadSize], false, f.compres)
+ } else {
+ tempBuf, err =
newUncompressedFrame(f.buf[:maxPayloadSize], false)
+ }
+ if err != nil {
+ return err
+ }
+
+ adjustedBuf = append(adjustedBuf, tempBuf...)
+ f.buf = f.buf[maxPayloadSize:]
+ selfContained = false
+ }
+
+ // Process the remaining buffer
+ if f.compres != nil {
+ tempBuf, err = newCompressedFrame(f.buf, selfContained,
f.compres)
+ } else {
+ tempBuf, err = newUncompressedFrame(f.buf, selfContained)
+ }
+ if err != nil {
+ return err
+ }
+
+ adjustedBuf = append(adjustedBuf, tempBuf...)
+ f.buf = adjustedBuf
+
+ return nil
+}
+
+func readUncompressedFrame(r io.Reader) ([]byte, bool, error) {
+ const headerSize = 6
+ header := [headerSize + 1]byte{}
+
+ // Read the frame header
+ if _, err := io.ReadFull(r, header[:headerSize]); err != nil {
+ return nil, false, fmt.Errorf("gocql: failed to read
uncompressed frame, err: %w", err)
+ }
+
+ // Compute and verify the header CRC24
+ computedHeaderCRC24 := KoopmanChecksum(header[:3])
+ readHeaderCRC24 := binary.LittleEndian.Uint32(header[3:]) & 0xFFFFFF
+ if computedHeaderCRC24 != readHeaderCRC24 {
+ return nil, false, fmt.Errorf("gocql: crc24 mismatch in frame
header, computed: %d, got: %d", computedHeaderCRC24, readHeaderCRC24)
+ }
+
+ // Extract the payload length and self-contained flag
+ headerInt := binary.LittleEndian.Uint32(header[:4])
+ payloadLen := int(headerInt & 0x1FFFF)
+ isSelfContained := (headerInt & (1 << 17)) != 0
+
+ // Read the payload
+ payload := make([]byte, payloadLen)
+ if _, err := io.ReadFull(r, payload); err != nil {
+ return nil, false, fmt.Errorf("gocql: failed to read
uncompressed frame payload, err: %w", err)
+ }
+
+ // Read and verify the payload CRC32
+ if _, err := io.ReadFull(r, header[:4]); err != nil {
+ return nil, false, fmt.Errorf("gocql: failed to read payload
crc32, err: %w", err)
+ }
+
+ computedPayloadCRC32 := ChecksumIEEE(payload)
+ readPayloadCRC32 := binary.LittleEndian.Uint32(header[:4])
+ if computedPayloadCRC32 != readPayloadCRC32 {
+ return nil, false, fmt.Errorf("gocql: payload crc32 mismatch,
computed: %d, got: %d", computedPayloadCRC32, readPayloadCRC32)
+ }
+
+ return payload, isSelfContained, nil
+}
+
+func newUncompressedFrame(payload []byte, isSelfContained bool) ([]byte,
error) {
+ const (
+ headerSize = 6
+ selfContainedBit = 1 << 17
+ )
+
+ payloadLen := len(payload)
+ if payloadLen > maxPayloadSize {
+ return nil, fmt.Errorf("payload length (%d) exceeds maximum
size of 128 KiB", payloadLen)
Review Comment:
NIT: use the constant for the maximum payload size in the string
##########
frame.go:
##########
@@ -2070,3 +2133,247 @@ func (f *framer) writeBytesMap(m map[string][]byte) {
f.writeBytes(v)
}
}
+
+func (f *framer) prepareModernLayout() error {
+ // Ensure protocol version is V5 or higher
+ if f.proto < protoVersion5 {
+ panic("Modern layout is not supported with version V4 or less")
+ }
+
+ selfContained := true
+
+ var (
+ adjustedBuf []byte
+ tempBuf []byte
+ err error
+ )
+
+ // Process the buffer in chunks if it exceeds the max payload size
+ for len(f.buf) > maxPayloadSize {
+ if f.compres != nil {
+ tempBuf, err =
newCompressedFrame(f.buf[:maxPayloadSize], false, f.compres)
+ } else {
+ tempBuf, err =
newUncompressedFrame(f.buf[:maxPayloadSize], false)
+ }
+ if err != nil {
+ return err
+ }
+
+ adjustedBuf = append(adjustedBuf, tempBuf...)
+ f.buf = f.buf[maxPayloadSize:]
+ selfContained = false
+ }
+
+ // Process the remaining buffer
+ if f.compres != nil {
+ tempBuf, err = newCompressedFrame(f.buf, selfContained,
f.compres)
+ } else {
+ tempBuf, err = newUncompressedFrame(f.buf, selfContained)
+ }
+ if err != nil {
+ return err
+ }
+
+ adjustedBuf = append(adjustedBuf, tempBuf...)
+ f.buf = adjustedBuf
+
+ return nil
+}
+
+func readUncompressedFrame(r io.Reader) ([]byte, bool, error) {
+ const headerSize = 6
+ header := [headerSize + 1]byte{}
Review Comment:
NIT: `headerBuf := [headerSize + crc24Size]byte{}`
##########
go.mod:
##########
@@ -20,11 +20,14 @@ module github.com/gocql/gocql
require (
github.com/bitly/go-hostpool v0.0.0-20171023180738-a3a6125de932 //
indirect
github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869 //
indirect
+ github.com/gocql/gocql/lz4 v0.0.0-20240925165811-953e0df999ca
github.com/golang/snappy v0.0.3
github.com/hailocab/go-hostpool v0.0.0-20160125115350-e80d13ce29ed
github.com/kr/pretty v0.1.0 // indirect
- github.com/stretchr/testify v1.3.0 // indirect
+ github.com/stretchr/testify v1.9.0
gopkg.in/inf.v0 v0.9.1
)
go 1.13
+
+replace github.com/gocql/gocql/lz4 => ./lz4
Review Comment:
This doesn't seem intended?
##########
conn.go:
##########
@@ -1086,7 +1155,29 @@ func (c *Conn) exec(ctx context.Context, req
frameBuilder, tracer Tracer) (*fram
return nil, err
}
- n, err := c.w.writeContext(ctx, framer.buf)
+ var n int
+
+ if c.version > protoVersion4 && startupCompleted {
+ err = framer.prepareModernLayout()
+ if err != nil {
Review Comment:
Do we need this `if` here? Can't we just check below before calling
`c.w.writeContext` and let that error handling code do this for us?
```
if err == nil {
n, err = c.w.writeContext(ctx, framer.buf)
}
if err != nil {
// existing error handling
}
```
##########
frame.go:
##########
@@ -2070,3 +2133,247 @@ func (f *framer) writeBytesMap(m map[string][]byte) {
f.writeBytes(v)
}
}
+
+func (f *framer) prepareModernLayout() error {
+ // Ensure protocol version is V5 or higher
+ if f.proto < protoVersion5 {
+ panic("Modern layout is not supported with version V4 or less")
+ }
+
+ selfContained := true
+
+ var (
+ adjustedBuf []byte
+ tempBuf []byte
+ err error
+ )
+
+ // Process the buffer in chunks if it exceeds the max payload size
+ for len(f.buf) > maxPayloadSize {
+ if f.compres != nil {
+ tempBuf, err =
newCompressedFrame(f.buf[:maxPayloadSize], false, f.compres)
+ } else {
+ tempBuf, err =
newUncompressedFrame(f.buf[:maxPayloadSize], false)
+ }
+ if err != nil {
+ return err
+ }
+
+ adjustedBuf = append(adjustedBuf, tempBuf...)
+ f.buf = f.buf[maxPayloadSize:]
+ selfContained = false
+ }
+
+ // Process the remaining buffer
+ if f.compres != nil {
+ tempBuf, err = newCompressedFrame(f.buf, selfContained,
f.compres)
+ } else {
+ tempBuf, err = newUncompressedFrame(f.buf, selfContained)
+ }
+ if err != nil {
+ return err
+ }
+
+ adjustedBuf = append(adjustedBuf, tempBuf...)
+ f.buf = adjustedBuf
+
+ return nil
+}
+
+func readUncompressedFrame(r io.Reader) ([]byte, bool, error) {
+ const headerSize = 6
+ header := [headerSize + 1]byte{}
+
+ // Read the frame header
+ if _, err := io.ReadFull(r, header[:headerSize]); err != nil {
+ return nil, false, fmt.Errorf("gocql: failed to read
uncompressed frame, err: %w", err)
+ }
+
+ // Compute and verify the header CRC24
+ computedHeaderCRC24 := KoopmanChecksum(header[:3])
+ readHeaderCRC24 := binary.LittleEndian.Uint32(header[3:]) & 0xFFFFFF
+ if computedHeaderCRC24 != readHeaderCRC24 {
+ return nil, false, fmt.Errorf("gocql: crc24 mismatch in frame
header, computed: %d, got: %d", computedHeaderCRC24, readHeaderCRC24)
+ }
+
+ // Extract the payload length and self-contained flag
+ headerInt := binary.LittleEndian.Uint32(header[:4])
+ payloadLen := int(headerInt & 0x1FFFF)
+ isSelfContained := (headerInt & (1 << 17)) != 0
+
+ // Read the payload
+ payload := make([]byte, payloadLen)
+ if _, err := io.ReadFull(r, payload); err != nil {
+ return nil, false, fmt.Errorf("gocql: failed to read
uncompressed frame payload, err: %w", err)
+ }
+
+ // Read and verify the payload CRC32
+ if _, err := io.ReadFull(r, header[:4]); err != nil {
+ return nil, false, fmt.Errorf("gocql: failed to read payload
crc32, err: %w", err)
+ }
+
+ computedPayloadCRC32 := ChecksumIEEE(payload)
+ readPayloadCRC32 := binary.LittleEndian.Uint32(header[:4])
+ if computedPayloadCRC32 != readPayloadCRC32 {
+ return nil, false, fmt.Errorf("gocql: payload crc32 mismatch,
computed: %d, got: %d", computedPayloadCRC32, readPayloadCRC32)
+ }
+
+ return payload, isSelfContained, nil
+}
+
+func newUncompressedFrame(payload []byte, isSelfContained bool) ([]byte,
error) {
+ const (
+ headerSize = 6
+ selfContainedBit = 1 << 17
+ )
+
+ payloadLen := len(payload)
+ if payloadLen > maxPayloadSize {
+ return nil, fmt.Errorf("payload length (%d) exceeds maximum
size of 128 KiB", payloadLen)
+ }
+
+ // Create the frame
+ frameSize := headerSize + payloadLen + 4 // 4 bytes for CRC32
+ frame := make([]byte, frameSize)
+
+ // First 3 bytes: payload length and self-contained flag
+ headerInt := uint32(payloadLen) & 0x1FFFF
+ if isSelfContained {
+ headerInt |= selfContainedBit // Set the self-contained flag
+ }
+
+ // Encode the first 3 bytes as a single little-endian integer
+ frame[0] = byte(headerInt)
+ frame[1] = byte(headerInt >> 8)
+ frame[2] = byte(headerInt >> 16)
+
+ // Calculate CRC24 for the first 3 bytes of the header
+ crc := KoopmanChecksum(frame[:3])
+
+ // Encode CRC24 into the next 3 bytes of the header
+ frame[3] = byte(crc)
+ frame[4] = byte(crc >> 8)
+ frame[5] = byte(crc >> 16)
+
+ copy(frame[headerSize:], payload) // Copy the payload to the frame
+
+ // Calculate CRC32 for the payload
+ payloadCRC32 := ChecksumIEEE(payload)
+ binary.LittleEndian.PutUint32(frame[headerSize+payloadLen:],
payloadCRC32)
+
+ return frame, nil
+}
+
+func newCompressedFrame(uncompressedPayload []byte, isSelfContained bool,
compressor Compressor) ([]byte, error) {
+ uncompressedLen := len(uncompressedPayload)
+ if uncompressedLen > maxPayloadSize {
+ return nil, fmt.Errorf("uncompressed compressed payload length
exceedes max size of frame payload %d/%d", uncompressedLen, maxPayloadSize)
+ }
+
+ compressedPayload, err := compressor.Encode(uncompressedPayload)
+ if err != nil {
+ return nil, err
+ }
+
+ // Skip the first 4 bytes because the size of the uncompressed payload
is written in the frame header, not in the
+ // body of the compressed envelope
+ compressedPayload = compressedPayload[4:]
+
+ compressedLen := len(compressedPayload)
+
+ // Compression is not worth it
+ if uncompressedLen < compressedLen {
+ // native_protocol_v5.spec
+ // 2.2
+ // An uncompressed length of 0 signals that the compressed
payload
+ // should be used as-is and not decompressed.
+ compressedPayload = uncompressedPayload
+ compressedLen = uncompressedLen
+ uncompressedLen = 0
+ }
+
+ // Combine compressed and uncompressed lengths and set the
self-contained flag if needed
+ combined := uint64(compressedLen) | uint64(uncompressedLen)<<17
+ if isSelfContained {
+ combined |= 1 << 34
+ }
+
+ var headerBuf [8]byte
+
+ // Write the combined value into the header buffer
+ binary.LittleEndian.PutUint64(headerBuf[:], combined)
+
+ // Create a buffer with enough capacity to hold the header, compressed
payload, and checksums
+ buf := bytes.NewBuffer(make([]byte, 0, 8+compressedLen+4))
+
+ // Write the first 5 bytes of the header (compressed and uncompressed
sizes)
+ buf.Write(headerBuf[:5])
+
+ // Compute and write the CRC24 checksum of the first 5 bytes
+ headerChecksum := KoopmanChecksum(headerBuf[:5])
+ binary.LittleEndian.PutUint32(headerBuf[:], headerChecksum)
+ buf.Write(headerBuf[:3])
+ buf.Write(compressedPayload)
+
+ // Compute and write the CRC32 checksum of the payload
+ payloadChecksum := ChecksumIEEE(compressedPayload)
+ binary.LittleEndian.PutUint32(headerBuf[:], payloadChecksum)
+ buf.Write(headerBuf[:4])
+
+ return buf.Bytes(), nil
+}
+
+func readCompressedFrame(r io.Reader, compressor Compressor) ([]byte, bool,
error) {
+ var (
+ headerBuf [8]byte
+ err error
+ )
+
+ if _, err = io.ReadFull(r, headerBuf[:]); err != nil {
+ return nil, false, err
+ }
+
+ // Reading checksum from frame header
+ readHeaderChecksum := uint32(headerBuf[5]) | uint32(headerBuf[6])<<8 |
uint32(headerBuf[7])<<16
+ if computedHeaderChecksum := KoopmanChecksum(headerBuf[:5]);
computedHeaderChecksum != readHeaderChecksum {
+ return nil, false, fmt.Errorf("gocql: crc24 mismatch in frame
header, read: %d, computed: %d", readHeaderChecksum, computedHeaderChecksum)
+ }
+
+ // First 17 bits - payload size after compression
+ compressedLen := uint32(headerBuf[0]) | uint32(headerBuf[1])<<8 |
uint32(headerBuf[2]&0x1)<<16
+
+ // The next 17 bits - payload size before compression
+ uncompressedLen := (uint32(headerBuf[2]) >> 1) |
uint32(headerBuf[3])<<7 | uint32(headerBuf[4]&0b11)<<15
+
+ // Self-contained flag
+ selfContained := (headerBuf[4] & 0b100) != 0
+
+ compressedPayload := make([]byte, compressedLen)
+ if _, err = io.ReadFull(r, compressedPayload); err != nil {
+ return nil, false, fmt.Errorf("gocql: failed to read compressed
frame payload, err: %w", err)
+ }
+
+ if _, err = io.ReadFull(r, headerBuf[:4]); err != nil {
+ return nil, false, fmt.Errorf("gocql: failed to read payload
crc32, err: %w", err)
+ }
+
+ // Ensuring if payload checksum matches
+ readPayloadChecksum := binary.LittleEndian.Uint32(headerBuf[:4])
Review Comment:
NIT: `binary.LittleEndian.Uint32(headerBuf[:crc32Size])`
##########
go.mod:
##########
@@ -20,11 +20,14 @@ module github.com/gocql/gocql
require (
github.com/bitly/go-hostpool v0.0.0-20171023180738-a3a6125de932 //
indirect
github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869 //
indirect
+ github.com/gocql/gocql/lz4 v0.0.0-20240925165811-953e0df999ca
Review Comment:
This doesn't seem intended?
##########
frame.go:
##########
@@ -2070,3 +2133,247 @@ func (f *framer) writeBytesMap(m map[string][]byte) {
f.writeBytes(v)
}
}
+
+func (f *framer) prepareModernLayout() error {
+ // Ensure protocol version is V5 or higher
+ if f.proto < protoVersion5 {
+ panic("Modern layout is not supported with version V4 or less")
+ }
+
+ selfContained := true
+
+ var (
+ adjustedBuf []byte
+ tempBuf []byte
+ err error
+ )
+
+ // Process the buffer in chunks if it exceeds the max payload size
+ for len(f.buf) > maxPayloadSize {
+ if f.compres != nil {
+ tempBuf, err =
newCompressedFrame(f.buf[:maxPayloadSize], false, f.compres)
+ } else {
+ tempBuf, err =
newUncompressedFrame(f.buf[:maxPayloadSize], false)
+ }
+ if err != nil {
+ return err
+ }
+
+ adjustedBuf = append(adjustedBuf, tempBuf...)
+ f.buf = f.buf[maxPayloadSize:]
+ selfContained = false
+ }
+
+ // Process the remaining buffer
+ if f.compres != nil {
+ tempBuf, err = newCompressedFrame(f.buf, selfContained,
f.compres)
+ } else {
+ tempBuf, err = newUncompressedFrame(f.buf, selfContained)
+ }
+ if err != nil {
+ return err
+ }
+
+ adjustedBuf = append(adjustedBuf, tempBuf...)
+ f.buf = adjustedBuf
+
+ return nil
+}
+
+func readUncompressedFrame(r io.Reader) ([]byte, bool, error) {
+ const headerSize = 6
+ header := [headerSize + 1]byte{}
+
+ // Read the frame header
+ if _, err := io.ReadFull(r, header[:headerSize]); err != nil {
+ return nil, false, fmt.Errorf("gocql: failed to read
uncompressed frame, err: %w", err)
+ }
+
+ // Compute and verify the header CRC24
+ computedHeaderCRC24 := KoopmanChecksum(header[:3])
+ readHeaderCRC24 := binary.LittleEndian.Uint32(header[3:]) & 0xFFFFFF
+ if computedHeaderCRC24 != readHeaderCRC24 {
+ return nil, false, fmt.Errorf("gocql: crc24 mismatch in frame
header, computed: %d, got: %d", computedHeaderCRC24, readHeaderCRC24)
+ }
+
+ // Extract the payload length and self-contained flag
+ headerInt := binary.LittleEndian.Uint32(header[:4])
+ payloadLen := int(headerInt & 0x1FFFF)
+ isSelfContained := (headerInt & (1 << 17)) != 0
+
+ // Read the payload
+ payload := make([]byte, payloadLen)
+ if _, err := io.ReadFull(r, payload); err != nil {
+ return nil, false, fmt.Errorf("gocql: failed to read
uncompressed frame payload, err: %w", err)
+ }
+
+ // Read and verify the payload CRC32
+ if _, err := io.ReadFull(r, header[:4]); err != nil {
+ return nil, false, fmt.Errorf("gocql: failed to read payload
crc32, err: %w", err)
+ }
+
+ computedPayloadCRC32 := ChecksumIEEE(payload)
+ readPayloadCRC32 := binary.LittleEndian.Uint32(header[:4])
+ if computedPayloadCRC32 != readPayloadCRC32 {
+ return nil, false, fmt.Errorf("gocql: payload crc32 mismatch,
computed: %d, got: %d", computedPayloadCRC32, readPayloadCRC32)
+ }
+
+ return payload, isSelfContained, nil
+}
+
+func newUncompressedFrame(payload []byte, isSelfContained bool) ([]byte,
error) {
+ const (
+ headerSize = 6
+ selfContainedBit = 1 << 17
+ )
+
+ payloadLen := len(payload)
+ if payloadLen > maxPayloadSize {
+ return nil, fmt.Errorf("payload length (%d) exceeds maximum
size of 128 KiB", payloadLen)
+ }
+
+ // Create the frame
+ frameSize := headerSize + payloadLen + 4 // 4 bytes for CRC32
+ frame := make([]byte, frameSize)
+
+ // First 3 bytes: payload length and self-contained flag
+ headerInt := uint32(payloadLen) & 0x1FFFF
+ if isSelfContained {
+ headerInt |= selfContainedBit // Set the self-contained flag
+ }
+
+ // Encode the first 3 bytes as a single little-endian integer
+ frame[0] = byte(headerInt)
+ frame[1] = byte(headerInt >> 8)
+ frame[2] = byte(headerInt >> 16)
+
+ // Calculate CRC24 for the first 3 bytes of the header
+ crc := KoopmanChecksum(frame[:3])
+
+ // Encode CRC24 into the next 3 bytes of the header
+ frame[3] = byte(crc)
+ frame[4] = byte(crc >> 8)
+ frame[5] = byte(crc >> 16)
+
+ copy(frame[headerSize:], payload) // Copy the payload to the frame
+
+ // Calculate CRC32 for the payload
+ payloadCRC32 := ChecksumIEEE(payload)
+ binary.LittleEndian.PutUint32(frame[headerSize+payloadLen:],
payloadCRC32)
+
+ return frame, nil
+}
+
+func newCompressedFrame(uncompressedPayload []byte, isSelfContained bool,
compressor Compressor) ([]byte, error) {
+ uncompressedLen := len(uncompressedPayload)
+ if uncompressedLen > maxPayloadSize {
+ return nil, fmt.Errorf("uncompressed compressed payload length
exceedes max size of frame payload %d/%d", uncompressedLen, maxPayloadSize)
+ }
+
+ compressedPayload, err := compressor.Encode(uncompressedPayload)
+ if err != nil {
+ return nil, err
+ }
+
+ // Skip the first 4 bytes because the size of the uncompressed payload
is written in the frame header, not in the
+ // body of the compressed envelope
+ compressedPayload = compressedPayload[4:]
+
+ compressedLen := len(compressedPayload)
+
+ // Compression is not worth it
+ if uncompressedLen < compressedLen {
+ // native_protocol_v5.spec
+ // 2.2
+ // An uncompressed length of 0 signals that the compressed
payload
+ // should be used as-is and not decompressed.
+ compressedPayload = uncompressedPayload
+ compressedLen = uncompressedLen
+ uncompressedLen = 0
+ }
+
+ // Combine compressed and uncompressed lengths and set the
self-contained flag if needed
+ combined := uint64(compressedLen) | uint64(uncompressedLen)<<17
+ if isSelfContained {
+ combined |= 1 << 34
+ }
+
+ var headerBuf [8]byte
+
+ // Write the combined value into the header buffer
+ binary.LittleEndian.PutUint64(headerBuf[:], combined)
+
+ // Create a buffer with enough capacity to hold the header, compressed
payload, and checksums
+ buf := bytes.NewBuffer(make([]byte, 0, 8+compressedLen+4))
+
+ // Write the first 5 bytes of the header (compressed and uncompressed
sizes)
+ buf.Write(headerBuf[:5])
+
+ // Compute and write the CRC24 checksum of the first 5 bytes
+ headerChecksum := KoopmanChecksum(headerBuf[:5])
+ binary.LittleEndian.PutUint32(headerBuf[:], headerChecksum)
+ buf.Write(headerBuf[:3])
+ buf.Write(compressedPayload)
+
+ // Compute and write the CRC32 checksum of the payload
+ payloadChecksum := ChecksumIEEE(compressedPayload)
+ binary.LittleEndian.PutUint32(headerBuf[:], payloadChecksum)
+ buf.Write(headerBuf[:4])
+
+ return buf.Bytes(), nil
+}
+
+func readCompressedFrame(r io.Reader, compressor Compressor) ([]byte, bool,
error) {
+ var (
Review Comment:
NIT: add constants: `headerSize = 5`, `crc24Size = 3`
##########
frame.go:
##########
@@ -2070,3 +2133,247 @@ func (f *framer) writeBytesMap(m map[string][]byte) {
f.writeBytes(v)
}
}
+
+func (f *framer) prepareModernLayout() error {
+ // Ensure protocol version is V5 or higher
+ if f.proto < protoVersion5 {
+ panic("Modern layout is not supported with version V4 or less")
+ }
+
+ selfContained := true
+
+ var (
+ adjustedBuf []byte
+ tempBuf []byte
+ err error
+ )
+
+ // Process the buffer in chunks if it exceeds the max payload size
+ for len(f.buf) > maxPayloadSize {
+ if f.compres != nil {
+ tempBuf, err =
newCompressedFrame(f.buf[:maxPayloadSize], false, f.compres)
+ } else {
+ tempBuf, err =
newUncompressedFrame(f.buf[:maxPayloadSize], false)
+ }
+ if err != nil {
+ return err
+ }
+
+ adjustedBuf = append(adjustedBuf, tempBuf...)
+ f.buf = f.buf[maxPayloadSize:]
+ selfContained = false
+ }
+
+ // Process the remaining buffer
+ if f.compres != nil {
+ tempBuf, err = newCompressedFrame(f.buf, selfContained,
f.compres)
+ } else {
+ tempBuf, err = newUncompressedFrame(f.buf, selfContained)
+ }
+ if err != nil {
+ return err
+ }
+
+ adjustedBuf = append(adjustedBuf, tempBuf...)
+ f.buf = adjustedBuf
+
+ return nil
+}
+
+func readUncompressedFrame(r io.Reader) ([]byte, bool, error) {
+ const headerSize = 6
+ header := [headerSize + 1]byte{}
+
+ // Read the frame header
+ if _, err := io.ReadFull(r, header[:headerSize]); err != nil {
+ return nil, false, fmt.Errorf("gocql: failed to read
uncompressed frame, err: %w", err)
+ }
+
+ // Compute and verify the header CRC24
+ computedHeaderCRC24 := KoopmanChecksum(header[:3])
+ readHeaderCRC24 := binary.LittleEndian.Uint32(header[3:]) & 0xFFFFFF
+ if computedHeaderCRC24 != readHeaderCRC24 {
+ return nil, false, fmt.Errorf("gocql: crc24 mismatch in frame
header, computed: %d, got: %d", computedHeaderCRC24, readHeaderCRC24)
+ }
+
+ // Extract the payload length and self-contained flag
+ headerInt := binary.LittleEndian.Uint32(header[:4])
+ payloadLen := int(headerInt & 0x1FFFF)
+ isSelfContained := (headerInt & (1 << 17)) != 0
+
+ // Read the payload
+ payload := make([]byte, payloadLen)
+ if _, err := io.ReadFull(r, payload); err != nil {
+ return nil, false, fmt.Errorf("gocql: failed to read
uncompressed frame payload, err: %w", err)
+ }
+
+ // Read and verify the payload CRC32
+ if _, err := io.ReadFull(r, header[:4]); err != nil {
+ return nil, false, fmt.Errorf("gocql: failed to read payload
crc32, err: %w", err)
+ }
+
+ computedPayloadCRC32 := ChecksumIEEE(payload)
+ readPayloadCRC32 := binary.LittleEndian.Uint32(header[:4])
+ if computedPayloadCRC32 != readPayloadCRC32 {
+ return nil, false, fmt.Errorf("gocql: payload crc32 mismatch,
computed: %d, got: %d", computedPayloadCRC32, readPayloadCRC32)
+ }
+
+ return payload, isSelfContained, nil
+}
+
+func newUncompressedFrame(payload []byte, isSelfContained bool) ([]byte,
error) {
+ const (
+ headerSize = 6
+ selfContainedBit = 1 << 17
+ )
+
+ payloadLen := len(payload)
+ if payloadLen > maxPayloadSize {
+ return nil, fmt.Errorf("payload length (%d) exceeds maximum
size of 128 KiB", payloadLen)
+ }
+
+ // Create the frame
+ frameSize := headerSize + payloadLen + 4 // 4 bytes for CRC32
+ frame := make([]byte, frameSize)
+
+ // First 3 bytes: payload length and self-contained flag
+ headerInt := uint32(payloadLen) & 0x1FFFF
+ if isSelfContained {
+ headerInt |= selfContainedBit // Set the self-contained flag
+ }
+
+ // Encode the first 3 bytes as a single little-endian integer
+ frame[0] = byte(headerInt)
+ frame[1] = byte(headerInt >> 8)
+ frame[2] = byte(headerInt >> 16)
+
+ // Calculate CRC24 for the first 3 bytes of the header
+ crc := KoopmanChecksum(frame[:3])
+
+ // Encode CRC24 into the next 3 bytes of the header
+ frame[3] = byte(crc)
+ frame[4] = byte(crc >> 8)
+ frame[5] = byte(crc >> 16)
+
+ copy(frame[headerSize:], payload) // Copy the payload to the frame
+
+ // Calculate CRC32 for the payload
+ payloadCRC32 := ChecksumIEEE(payload)
+ binary.LittleEndian.PutUint32(frame[headerSize+payloadLen:],
payloadCRC32)
+
+ return frame, nil
+}
+
+func newCompressedFrame(uncompressedPayload []byte, isSelfContained bool,
compressor Compressor) ([]byte, error) {
+ uncompressedLen := len(uncompressedPayload)
+ if uncompressedLen > maxPayloadSize {
+ return nil, fmt.Errorf("uncompressed compressed payload length
exceedes max size of frame payload %d/%d", uncompressedLen, maxPayloadSize)
+ }
+
+ compressedPayload, err := compressor.Encode(uncompressedPayload)
+ if err != nil {
+ return nil, err
+ }
+
+ // Skip the first 4 bytes because the size of the uncompressed payload
is written in the frame header, not in the
+ // body of the compressed envelope
+ compressedPayload = compressedPayload[4:]
+
+ compressedLen := len(compressedPayload)
+
+ // Compression is not worth it
+ if uncompressedLen < compressedLen {
+ // native_protocol_v5.spec
+ // 2.2
+ // An uncompressed length of 0 signals that the compressed
payload
+ // should be used as-is and not decompressed.
+ compressedPayload = uncompressedPayload
+ compressedLen = uncompressedLen
+ uncompressedLen = 0
+ }
+
+ // Combine compressed and uncompressed lengths and set the
self-contained flag if needed
+ combined := uint64(compressedLen) | uint64(uncompressedLen)<<17
+ if isSelfContained {
+ combined |= 1 << 34
+ }
+
+ var headerBuf [8]byte
+
+ // Write the combined value into the header buffer
+ binary.LittleEndian.PutUint64(headerBuf[:], combined)
+
+ // Create a buffer with enough capacity to hold the header, compressed
payload, and checksums
+ buf := bytes.NewBuffer(make([]byte, 0, 8+compressedLen+4))
+
+ // Write the first 5 bytes of the header (compressed and uncompressed
sizes)
+ buf.Write(headerBuf[:5])
+
+ // Compute and write the CRC24 checksum of the first 5 bytes
+ headerChecksum := KoopmanChecksum(headerBuf[:5])
+ binary.LittleEndian.PutUint32(headerBuf[:], headerChecksum)
+ buf.Write(headerBuf[:3])
+ buf.Write(compressedPayload)
+
+ // Compute and write the CRC32 checksum of the payload
+ payloadChecksum := ChecksumIEEE(compressedPayload)
+ binary.LittleEndian.PutUint32(headerBuf[:], payloadChecksum)
+ buf.Write(headerBuf[:4])
+
+ return buf.Bytes(), nil
+}
+
+func readCompressedFrame(r io.Reader, compressor Compressor) ([]byte, bool,
error) {
+ var (
+ headerBuf [8]byte
+ err error
+ )
+
+ if _, err = io.ReadFull(r, headerBuf[:]); err != nil {
+ return nil, false, err
+ }
+
+ // Reading checksum from frame header
+ readHeaderChecksum := uint32(headerBuf[5]) | uint32(headerBuf[6])<<8 |
uint32(headerBuf[7])<<16
+ if computedHeaderChecksum := KoopmanChecksum(headerBuf[:5]);
computedHeaderChecksum != readHeaderChecksum {
Review Comment:
NIT: `KoopmanChecksum(headerBuf[:headerSize]);`
##########
conn_test.go:
##########
@@ -1300,3 +1301,96 @@ func (srv *TestServer) readFrame(conn net.Conn)
(*framer, error) {
return framer, nil
}
+
+func TestConnProcessAllEnvelopesInSingleFrame(t *testing.T) {
Review Comment:
`TestConnProcessAllFramesInSingleSegment`
##########
frame.go:
##########
@@ -2070,3 +2133,247 @@ func (f *framer) writeBytesMap(m map[string][]byte) {
f.writeBytes(v)
}
}
+
+func (f *framer) prepareModernLayout() error {
+ // Ensure protocol version is V5 or higher
+ if f.proto < protoVersion5 {
+ panic("Modern layout is not supported with version V4 or less")
+ }
+
+ selfContained := true
+
+ var (
+ adjustedBuf []byte
+ tempBuf []byte
+ err error
+ )
+
+ // Process the buffer in chunks if it exceeds the max payload size
+ for len(f.buf) > maxPayloadSize {
+ if f.compres != nil {
+ tempBuf, err =
newCompressedFrame(f.buf[:maxPayloadSize], false, f.compres)
+ } else {
+ tempBuf, err =
newUncompressedFrame(f.buf[:maxPayloadSize], false)
+ }
+ if err != nil {
+ return err
+ }
+
+ adjustedBuf = append(adjustedBuf, tempBuf...)
+ f.buf = f.buf[maxPayloadSize:]
+ selfContained = false
+ }
+
+ // Process the remaining buffer
+ if f.compres != nil {
+ tempBuf, err = newCompressedFrame(f.buf, selfContained,
f.compres)
+ } else {
+ tempBuf, err = newUncompressedFrame(f.buf, selfContained)
+ }
+ if err != nil {
+ return err
+ }
+
+ adjustedBuf = append(adjustedBuf, tempBuf...)
+ f.buf = adjustedBuf
+
+ return nil
+}
+
+func readUncompressedFrame(r io.Reader) ([]byte, bool, error) {
+ const headerSize = 6
+ header := [headerSize + 1]byte{}
+
+ // Read the frame header
+ if _, err := io.ReadFull(r, header[:headerSize]); err != nil {
+ return nil, false, fmt.Errorf("gocql: failed to read
uncompressed frame, err: %w", err)
+ }
+
+ // Compute and verify the header CRC24
+ computedHeaderCRC24 := KoopmanChecksum(header[:3])
+ readHeaderCRC24 := binary.LittleEndian.Uint32(header[3:]) & 0xFFFFFF
+ if computedHeaderCRC24 != readHeaderCRC24 {
+ return nil, false, fmt.Errorf("gocql: crc24 mismatch in frame
header, computed: %d, got: %d", computedHeaderCRC24, readHeaderCRC24)
+ }
+
+ // Extract the payload length and self-contained flag
+ headerInt := binary.LittleEndian.Uint32(header[:4])
+ payloadLen := int(headerInt & 0x1FFFF)
+ isSelfContained := (headerInt & (1 << 17)) != 0
+
+ // Read the payload
+ payload := make([]byte, payloadLen)
+ if _, err := io.ReadFull(r, payload); err != nil {
+ return nil, false, fmt.Errorf("gocql: failed to read
uncompressed frame payload, err: %w", err)
+ }
+
+ // Read and verify the payload CRC32
+ if _, err := io.ReadFull(r, header[:4]); err != nil {
+ return nil, false, fmt.Errorf("gocql: failed to read payload
crc32, err: %w", err)
+ }
+
+ computedPayloadCRC32 := ChecksumIEEE(payload)
+ readPayloadCRC32 := binary.LittleEndian.Uint32(header[:4])
Review Comment:
NIT `binary.LittleEndian.Uint32(header[:crc32Size])`
##########
frame.go:
##########
@@ -2070,3 +2133,247 @@ func (f *framer) writeBytesMap(m map[string][]byte) {
f.writeBytes(v)
}
}
+
+func (f *framer) prepareModernLayout() error {
+ // Ensure protocol version is V5 or higher
+ if f.proto < protoVersion5 {
+ panic("Modern layout is not supported with version V4 or less")
+ }
+
+ selfContained := true
+
+ var (
+ adjustedBuf []byte
+ tempBuf []byte
+ err error
+ )
+
+ // Process the buffer in chunks if it exceeds the max payload size
+ for len(f.buf) > maxPayloadSize {
+ if f.compres != nil {
+ tempBuf, err =
newCompressedFrame(f.buf[:maxPayloadSize], false, f.compres)
+ } else {
+ tempBuf, err =
newUncompressedFrame(f.buf[:maxPayloadSize], false)
+ }
+ if err != nil {
+ return err
+ }
+
+ adjustedBuf = append(adjustedBuf, tempBuf...)
+ f.buf = f.buf[maxPayloadSize:]
+ selfContained = false
+ }
+
+ // Process the remaining buffer
+ if f.compres != nil {
+ tempBuf, err = newCompressedFrame(f.buf, selfContained,
f.compres)
+ } else {
+ tempBuf, err = newUncompressedFrame(f.buf, selfContained)
+ }
+ if err != nil {
+ return err
+ }
+
+ adjustedBuf = append(adjustedBuf, tempBuf...)
+ f.buf = adjustedBuf
+
+ return nil
+}
+
+func readUncompressedFrame(r io.Reader) ([]byte, bool, error) {
+ const headerSize = 6
+ header := [headerSize + 1]byte{}
+
+ // Read the frame header
+ if _, err := io.ReadFull(r, header[:headerSize]); err != nil {
+ return nil, false, fmt.Errorf("gocql: failed to read
uncompressed frame, err: %w", err)
+ }
+
+ // Compute and verify the header CRC24
+ computedHeaderCRC24 := KoopmanChecksum(header[:3])
+ readHeaderCRC24 := binary.LittleEndian.Uint32(header[3:]) & 0xFFFFFF
+ if computedHeaderCRC24 != readHeaderCRC24 {
+ return nil, false, fmt.Errorf("gocql: crc24 mismatch in frame
header, computed: %d, got: %d", computedHeaderCRC24, readHeaderCRC24)
+ }
+
+ // Extract the payload length and self-contained flag
+ headerInt := binary.LittleEndian.Uint32(header[:4])
+ payloadLen := int(headerInt & 0x1FFFF)
+ isSelfContained := (headerInt & (1 << 17)) != 0
+
+ // Read the payload
+ payload := make([]byte, payloadLen)
+ if _, err := io.ReadFull(r, payload); err != nil {
+ return nil, false, fmt.Errorf("gocql: failed to read
uncompressed frame payload, err: %w", err)
+ }
+
+ // Read and verify the payload CRC32
+ if _, err := io.ReadFull(r, header[:4]); err != nil {
+ return nil, false, fmt.Errorf("gocql: failed to read payload
crc32, err: %w", err)
+ }
+
+ computedPayloadCRC32 := ChecksumIEEE(payload)
+ readPayloadCRC32 := binary.LittleEndian.Uint32(header[:4])
+ if computedPayloadCRC32 != readPayloadCRC32 {
+ return nil, false, fmt.Errorf("gocql: payload crc32 mismatch,
computed: %d, got: %d", computedPayloadCRC32, readPayloadCRC32)
+ }
+
+ return payload, isSelfContained, nil
+}
+
+func newUncompressedFrame(payload []byte, isSelfContained bool) ([]byte,
error) {
+ const (
+ headerSize = 6
+ selfContainedBit = 1 << 17
+ )
+
+ payloadLen := len(payload)
+ if payloadLen > maxPayloadSize {
+ return nil, fmt.Errorf("payload length (%d) exceeds maximum
size of 128 KiB", payloadLen)
+ }
+
+ // Create the frame
+ frameSize := headerSize + payloadLen + 4 // 4 bytes for CRC32
+ frame := make([]byte, frameSize)
+
+ // First 3 bytes: payload length and self-contained flag
+ headerInt := uint32(payloadLen) & 0x1FFFF
+ if isSelfContained {
+ headerInt |= selfContainedBit // Set the self-contained flag
+ }
+
+ // Encode the first 3 bytes as a single little-endian integer
+ frame[0] = byte(headerInt)
+ frame[1] = byte(headerInt >> 8)
+ frame[2] = byte(headerInt >> 16)
+
+ // Calculate CRC24 for the first 3 bytes of the header
+ crc := KoopmanChecksum(frame[:3])
+
+ // Encode CRC24 into the next 3 bytes of the header
+ frame[3] = byte(crc)
+ frame[4] = byte(crc >> 8)
+ frame[5] = byte(crc >> 16)
+
+ copy(frame[headerSize:], payload) // Copy the payload to the frame
+
+ // Calculate CRC32 for the payload
+ payloadCRC32 := ChecksumIEEE(payload)
+ binary.LittleEndian.PutUint32(frame[headerSize+payloadLen:],
payloadCRC32)
+
+ return frame, nil
+}
+
+func newCompressedFrame(uncompressedPayload []byte, isSelfContained bool,
compressor Compressor) ([]byte, error) {
+ uncompressedLen := len(uncompressedPayload)
+ if uncompressedLen > maxPayloadSize {
+ return nil, fmt.Errorf("uncompressed compressed payload length
exceedes max size of frame payload %d/%d", uncompressedLen, maxPayloadSize)
+ }
+
+ compressedPayload, err := compressor.Encode(uncompressedPayload)
+ if err != nil {
+ return nil, err
+ }
+
+ // Skip the first 4 bytes because the size of the uncompressed payload
is written in the frame header, not in the
+ // body of the compressed envelope
+ compressedPayload = compressedPayload[4:]
+
+ compressedLen := len(compressedPayload)
+
+ // Compression is not worth it
+ if uncompressedLen < compressedLen {
+ // native_protocol_v5.spec
+ // 2.2
+ // An uncompressed length of 0 signals that the compressed
payload
+ // should be used as-is and not decompressed.
+ compressedPayload = uncompressedPayload
+ compressedLen = uncompressedLen
+ uncompressedLen = 0
+ }
+
+ // Combine compressed and uncompressed lengths and set the
self-contained flag if needed
+ combined := uint64(compressedLen) | uint64(uncompressedLen)<<17
+ if isSelfContained {
+ combined |= 1 << 34
+ }
+
+ var headerBuf [8]byte
+
+ // Write the combined value into the header buffer
+ binary.LittleEndian.PutUint64(headerBuf[:], combined)
+
+ // Create a buffer with enough capacity to hold the header, compressed
payload, and checksums
+ buf := bytes.NewBuffer(make([]byte, 0, 8+compressedLen+4))
+
+ // Write the first 5 bytes of the header (compressed and uncompressed
sizes)
+ buf.Write(headerBuf[:5])
Review Comment:
NIT: `buf.Write(headerBuf[:headerSize])`
##########
frame.go:
##########
@@ -2070,3 +2133,247 @@ func (f *framer) writeBytesMap(m map[string][]byte) {
f.writeBytes(v)
}
}
+
+func (f *framer) prepareModernLayout() error {
+ // Ensure protocol version is V5 or higher
+ if f.proto < protoVersion5 {
+ panic("Modern layout is not supported with version V4 or less")
+ }
+
+ selfContained := true
+
+ var (
+ adjustedBuf []byte
+ tempBuf []byte
+ err error
+ )
+
+ // Process the buffer in chunks if it exceeds the max payload size
+ for len(f.buf) > maxPayloadSize {
+ if f.compres != nil {
+ tempBuf, err =
newCompressedFrame(f.buf[:maxPayloadSize], false, f.compres)
+ } else {
+ tempBuf, err =
newUncompressedFrame(f.buf[:maxPayloadSize], false)
+ }
+ if err != nil {
+ return err
+ }
+
+ adjustedBuf = append(adjustedBuf, tempBuf...)
+ f.buf = f.buf[maxPayloadSize:]
+ selfContained = false
+ }
+
+ // Process the remaining buffer
+ if f.compres != nil {
+ tempBuf, err = newCompressedFrame(f.buf, selfContained,
f.compres)
+ } else {
+ tempBuf, err = newUncompressedFrame(f.buf, selfContained)
+ }
+ if err != nil {
+ return err
+ }
+
+ adjustedBuf = append(adjustedBuf, tempBuf...)
+ f.buf = adjustedBuf
+
+ return nil
+}
+
+func readUncompressedFrame(r io.Reader) ([]byte, bool, error) {
+ const headerSize = 6
+ header := [headerSize + 1]byte{}
+
+ // Read the frame header
+ if _, err := io.ReadFull(r, header[:headerSize]); err != nil {
+ return nil, false, fmt.Errorf("gocql: failed to read
uncompressed frame, err: %w", err)
+ }
+
+ // Compute and verify the header CRC24
+ computedHeaderCRC24 := KoopmanChecksum(header[:3])
+ readHeaderCRC24 := binary.LittleEndian.Uint32(header[3:]) & 0xFFFFFF
+ if computedHeaderCRC24 != readHeaderCRC24 {
+ return nil, false, fmt.Errorf("gocql: crc24 mismatch in frame
header, computed: %d, got: %d", computedHeaderCRC24, readHeaderCRC24)
+ }
+
+ // Extract the payload length and self-contained flag
+ headerInt := binary.LittleEndian.Uint32(header[:4])
+ payloadLen := int(headerInt & 0x1FFFF)
+ isSelfContained := (headerInt & (1 << 17)) != 0
+
+ // Read the payload
+ payload := make([]byte, payloadLen)
+ if _, err := io.ReadFull(r, payload); err != nil {
+ return nil, false, fmt.Errorf("gocql: failed to read
uncompressed frame payload, err: %w", err)
+ }
+
+ // Read and verify the payload CRC32
+ if _, err := io.ReadFull(r, header[:4]); err != nil {
+ return nil, false, fmt.Errorf("gocql: failed to read payload
crc32, err: %w", err)
+ }
+
+ computedPayloadCRC32 := ChecksumIEEE(payload)
+ readPayloadCRC32 := binary.LittleEndian.Uint32(header[:4])
+ if computedPayloadCRC32 != readPayloadCRC32 {
+ return nil, false, fmt.Errorf("gocql: payload crc32 mismatch,
computed: %d, got: %d", computedPayloadCRC32, readPayloadCRC32)
+ }
+
+ return payload, isSelfContained, nil
+}
+
+func newUncompressedFrame(payload []byte, isSelfContained bool) ([]byte,
error) {
+ const (
+ headerSize = 6
+ selfContainedBit = 1 << 17
+ )
+
+ payloadLen := len(payload)
+ if payloadLen > maxPayloadSize {
+ return nil, fmt.Errorf("payload length (%d) exceeds maximum
size of 128 KiB", payloadLen)
+ }
+
+ // Create the frame
+ frameSize := headerSize + payloadLen + 4 // 4 bytes for CRC32
+ frame := make([]byte, frameSize)
Review Comment:
```
// Create the frame
segmentSize := headerSize + payloadLen + crc32Size
segment := make([]byte, segmentSize)
```
##########
frame.go:
##########
@@ -2070,3 +2133,247 @@ func (f *framer) writeBytesMap(m map[string][]byte) {
f.writeBytes(v)
}
}
+
+func (f *framer) prepareModernLayout() error {
+ // Ensure protocol version is V5 or higher
+ if f.proto < protoVersion5 {
+ panic("Modern layout is not supported with version V4 or less")
+ }
+
+ selfContained := true
+
+ var (
+ adjustedBuf []byte
+ tempBuf []byte
+ err error
+ )
+
+ // Process the buffer in chunks if it exceeds the max payload size
+ for len(f.buf) > maxPayloadSize {
+ if f.compres != nil {
+ tempBuf, err =
newCompressedFrame(f.buf[:maxPayloadSize], false, f.compres)
+ } else {
+ tempBuf, err =
newUncompressedFrame(f.buf[:maxPayloadSize], false)
+ }
+ if err != nil {
+ return err
+ }
+
+ adjustedBuf = append(adjustedBuf, tempBuf...)
+ f.buf = f.buf[maxPayloadSize:]
+ selfContained = false
+ }
+
+ // Process the remaining buffer
+ if f.compres != nil {
+ tempBuf, err = newCompressedFrame(f.buf, selfContained,
f.compres)
+ } else {
+ tempBuf, err = newUncompressedFrame(f.buf, selfContained)
+ }
+ if err != nil {
+ return err
+ }
+
+ adjustedBuf = append(adjustedBuf, tempBuf...)
+ f.buf = adjustedBuf
+
+ return nil
+}
+
+func readUncompressedFrame(r io.Reader) ([]byte, bool, error) {
+ const headerSize = 6
+ header := [headerSize + 1]byte{}
+
+ // Read the frame header
+ if _, err := io.ReadFull(r, header[:headerSize]); err != nil {
+ return nil, false, fmt.Errorf("gocql: failed to read
uncompressed frame, err: %w", err)
+ }
+
+ // Compute and verify the header CRC24
+ computedHeaderCRC24 := KoopmanChecksum(header[:3])
+ readHeaderCRC24 := binary.LittleEndian.Uint32(header[3:]) & 0xFFFFFF
+ if computedHeaderCRC24 != readHeaderCRC24 {
+ return nil, false, fmt.Errorf("gocql: crc24 mismatch in frame
header, computed: %d, got: %d", computedHeaderCRC24, readHeaderCRC24)
+ }
+
+ // Extract the payload length and self-contained flag
+ headerInt := binary.LittleEndian.Uint32(header[:4])
+ payloadLen := int(headerInt & 0x1FFFF)
+ isSelfContained := (headerInt & (1 << 17)) != 0
+
+ // Read the payload
+ payload := make([]byte, payloadLen)
+ if _, err := io.ReadFull(r, payload); err != nil {
+ return nil, false, fmt.Errorf("gocql: failed to read
uncompressed frame payload, err: %w", err)
+ }
+
+ // Read and verify the payload CRC32
+ if _, err := io.ReadFull(r, header[:4]); err != nil {
+ return nil, false, fmt.Errorf("gocql: failed to read payload
crc32, err: %w", err)
+ }
+
+ computedPayloadCRC32 := ChecksumIEEE(payload)
+ readPayloadCRC32 := binary.LittleEndian.Uint32(header[:4])
+ if computedPayloadCRC32 != readPayloadCRC32 {
+ return nil, false, fmt.Errorf("gocql: payload crc32 mismatch,
computed: %d, got: %d", computedPayloadCRC32, readPayloadCRC32)
+ }
+
+ return payload, isSelfContained, nil
+}
+
+func newUncompressedFrame(payload []byte, isSelfContained bool) ([]byte,
error) {
+ const (
+ headerSize = 6
+ selfContainedBit = 1 << 17
+ )
+
+ payloadLen := len(payload)
+ if payloadLen > maxPayloadSize {
+ return nil, fmt.Errorf("payload length (%d) exceeds maximum
size of 128 KiB", payloadLen)
+ }
+
+ // Create the frame
+ frameSize := headerSize + payloadLen + 4 // 4 bytes for CRC32
+ frame := make([]byte, frameSize)
+
+ // First 3 bytes: payload length and self-contained flag
+ headerInt := uint32(payloadLen) & 0x1FFFF
+ if isSelfContained {
+ headerInt |= selfContainedBit // Set the self-contained flag
+ }
+
+ // Encode the first 3 bytes as a single little-endian integer
+ frame[0] = byte(headerInt)
+ frame[1] = byte(headerInt >> 8)
+ frame[2] = byte(headerInt >> 16)
+
+ // Calculate CRC24 for the first 3 bytes of the header
+ crc := KoopmanChecksum(frame[:3])
+
+ // Encode CRC24 into the next 3 bytes of the header
+ frame[3] = byte(crc)
+ frame[4] = byte(crc >> 8)
+ frame[5] = byte(crc >> 16)
+
+ copy(frame[headerSize:], payload) // Copy the payload to the frame
+
+ // Calculate CRC32 for the payload
+ payloadCRC32 := ChecksumIEEE(payload)
+ binary.LittleEndian.PutUint32(frame[headerSize+payloadLen:],
payloadCRC32)
+
+ return frame, nil
+}
+
+func newCompressedFrame(uncompressedPayload []byte, isSelfContained bool,
compressor Compressor) ([]byte, error) {
+ uncompressedLen := len(uncompressedPayload)
+ if uncompressedLen > maxPayloadSize {
+ return nil, fmt.Errorf("uncompressed compressed payload length
exceedes max size of frame payload %d/%d", uncompressedLen, maxPayloadSize)
+ }
+
+ compressedPayload, err := compressor.Encode(uncompressedPayload)
+ if err != nil {
+ return nil, err
+ }
+
+ // Skip the first 4 bytes because the size of the uncompressed payload
is written in the frame header, not in the
+ // body of the compressed envelope
Review Comment:
```
// Skip the first 4 bytes because the size of the uncompressed payload is
written in the segment header, not in the
// body of the compressed frame
```
##########
frame.go:
##########
@@ -2070,3 +2133,247 @@ func (f *framer) writeBytesMap(m map[string][]byte) {
f.writeBytes(v)
}
}
+
+func (f *framer) prepareModernLayout() error {
+ // Ensure protocol version is V5 or higher
+ if f.proto < protoVersion5 {
+ panic("Modern layout is not supported with version V4 or less")
+ }
+
+ selfContained := true
+
+ var (
+ adjustedBuf []byte
+ tempBuf []byte
+ err error
+ )
+
+ // Process the buffer in chunks if it exceeds the max payload size
+ for len(f.buf) > maxPayloadSize {
+ if f.compres != nil {
+ tempBuf, err =
newCompressedFrame(f.buf[:maxPayloadSize], false, f.compres)
+ } else {
+ tempBuf, err =
newUncompressedFrame(f.buf[:maxPayloadSize], false)
+ }
+ if err != nil {
+ return err
+ }
+
+ adjustedBuf = append(adjustedBuf, tempBuf...)
+ f.buf = f.buf[maxPayloadSize:]
+ selfContained = false
+ }
+
+ // Process the remaining buffer
+ if f.compres != nil {
+ tempBuf, err = newCompressedFrame(f.buf, selfContained,
f.compres)
+ } else {
+ tempBuf, err = newUncompressedFrame(f.buf, selfContained)
+ }
+ if err != nil {
+ return err
+ }
+
+ adjustedBuf = append(adjustedBuf, tempBuf...)
+ f.buf = adjustedBuf
+
+ return nil
+}
+
+func readUncompressedFrame(r io.Reader) ([]byte, bool, error) {
+ const headerSize = 6
+ header := [headerSize + 1]byte{}
+
+ // Read the frame header
+ if _, err := io.ReadFull(r, header[:headerSize]); err != nil {
+ return nil, false, fmt.Errorf("gocql: failed to read
uncompressed frame, err: %w", err)
+ }
+
+ // Compute and verify the header CRC24
+ computedHeaderCRC24 := KoopmanChecksum(header[:3])
+ readHeaderCRC24 := binary.LittleEndian.Uint32(header[3:]) & 0xFFFFFF
+ if computedHeaderCRC24 != readHeaderCRC24 {
+ return nil, false, fmt.Errorf("gocql: crc24 mismatch in frame
header, computed: %d, got: %d", computedHeaderCRC24, readHeaderCRC24)
+ }
+
+ // Extract the payload length and self-contained flag
+ headerInt := binary.LittleEndian.Uint32(header[:4])
+ payloadLen := int(headerInt & 0x1FFFF)
+ isSelfContained := (headerInt & (1 << 17)) != 0
+
+ // Read the payload
+ payload := make([]byte, payloadLen)
+ if _, err := io.ReadFull(r, payload); err != nil {
+ return nil, false, fmt.Errorf("gocql: failed to read
uncompressed frame payload, err: %w", err)
+ }
+
+ // Read and verify the payload CRC32
+ if _, err := io.ReadFull(r, header[:4]); err != nil {
+ return nil, false, fmt.Errorf("gocql: failed to read payload
crc32, err: %w", err)
+ }
+
+ computedPayloadCRC32 := ChecksumIEEE(payload)
+ readPayloadCRC32 := binary.LittleEndian.Uint32(header[:4])
+ if computedPayloadCRC32 != readPayloadCRC32 {
+ return nil, false, fmt.Errorf("gocql: payload crc32 mismatch,
computed: %d, got: %d", computedPayloadCRC32, readPayloadCRC32)
+ }
+
+ return payload, isSelfContained, nil
+}
+
+func newUncompressedFrame(payload []byte, isSelfContained bool) ([]byte,
error) {
+ const (
+ headerSize = 6
+ selfContainedBit = 1 << 17
+ )
+
+ payloadLen := len(payload)
+ if payloadLen > maxPayloadSize {
+ return nil, fmt.Errorf("payload length (%d) exceeds maximum
size of 128 KiB", payloadLen)
+ }
+
+ // Create the frame
+ frameSize := headerSize + payloadLen + 4 // 4 bytes for CRC32
+ frame := make([]byte, frameSize)
+
+ // First 3 bytes: payload length and self-contained flag
+ headerInt := uint32(payloadLen) & 0x1FFFF
+ if isSelfContained {
+ headerInt |= selfContainedBit // Set the self-contained flag
+ }
+
+ // Encode the first 3 bytes as a single little-endian integer
+ frame[0] = byte(headerInt)
+ frame[1] = byte(headerInt >> 8)
+ frame[2] = byte(headerInt >> 16)
+
+ // Calculate CRC24 for the first 3 bytes of the header
+ crc := KoopmanChecksum(frame[:3])
+
+ // Encode CRC24 into the next 3 bytes of the header
+ frame[3] = byte(crc)
+ frame[4] = byte(crc >> 8)
+ frame[5] = byte(crc >> 16)
+
+ copy(frame[headerSize:], payload) // Copy the payload to the frame
+
+ // Calculate CRC32 for the payload
+ payloadCRC32 := ChecksumIEEE(payload)
+ binary.LittleEndian.PutUint32(frame[headerSize+payloadLen:],
payloadCRC32)
+
+ return frame, nil
+}
+
+func newCompressedFrame(uncompressedPayload []byte, isSelfContained bool,
compressor Compressor) ([]byte, error) {
+ uncompressedLen := len(uncompressedPayload)
+ if uncompressedLen > maxPayloadSize {
+ return nil, fmt.Errorf("uncompressed compressed payload length
exceedes max size of frame payload %d/%d", uncompressedLen, maxPayloadSize)
+ }
+
+ compressedPayload, err := compressor.Encode(uncompressedPayload)
+ if err != nil {
+ return nil, err
+ }
+
+ // Skip the first 4 bytes because the size of the uncompressed payload
is written in the frame header, not in the
+ // body of the compressed envelope
+ compressedPayload = compressedPayload[4:]
+
+ compressedLen := len(compressedPayload)
+
+ // Compression is not worth it
+ if uncompressedLen < compressedLen {
+ // native_protocol_v5.spec
+ // 2.2
+ // An uncompressed length of 0 signals that the compressed
payload
+ // should be used as-is and not decompressed.
+ compressedPayload = uncompressedPayload
+ compressedLen = uncompressedLen
+ uncompressedLen = 0
+ }
+
+ // Combine compressed and uncompressed lengths and set the
self-contained flag if needed
+ combined := uint64(compressedLen) | uint64(uncompressedLen)<<17
+ if isSelfContained {
+ combined |= 1 << 34
+ }
+
+ var headerBuf [8]byte
+
+ // Write the combined value into the header buffer
+ binary.LittleEndian.PutUint64(headerBuf[:], combined)
+
+ // Create a buffer with enough capacity to hold the header, compressed
payload, and checksums
+ buf := bytes.NewBuffer(make([]byte, 0, 8+compressedLen+4))
+
+ // Write the first 5 bytes of the header (compressed and uncompressed
sizes)
+ buf.Write(headerBuf[:5])
+
+ // Compute and write the CRC24 checksum of the first 5 bytes
+ headerChecksum := KoopmanChecksum(headerBuf[:5])
Review Comment:
NIT: `headerChecksum := KoopmanChecksum(headerBuf[:headerSize])`
##########
frame.go:
##########
@@ -2070,3 +2133,247 @@ func (f *framer) writeBytesMap(m map[string][]byte) {
f.writeBytes(v)
}
}
+
+func (f *framer) prepareModernLayout() error {
+ // Ensure protocol version is V5 or higher
+ if f.proto < protoVersion5 {
+ panic("Modern layout is not supported with version V4 or less")
+ }
+
+ selfContained := true
+
+ var (
+ adjustedBuf []byte
+ tempBuf []byte
+ err error
+ )
+
+ // Process the buffer in chunks if it exceeds the max payload size
+ for len(f.buf) > maxPayloadSize {
+ if f.compres != nil {
+ tempBuf, err =
newCompressedFrame(f.buf[:maxPayloadSize], false, f.compres)
+ } else {
+ tempBuf, err =
newUncompressedFrame(f.buf[:maxPayloadSize], false)
+ }
+ if err != nil {
+ return err
+ }
+
+ adjustedBuf = append(adjustedBuf, tempBuf...)
+ f.buf = f.buf[maxPayloadSize:]
+ selfContained = false
+ }
+
+ // Process the remaining buffer
+ if f.compres != nil {
+ tempBuf, err = newCompressedFrame(f.buf, selfContained,
f.compres)
+ } else {
+ tempBuf, err = newUncompressedFrame(f.buf, selfContained)
+ }
+ if err != nil {
+ return err
+ }
+
+ adjustedBuf = append(adjustedBuf, tempBuf...)
+ f.buf = adjustedBuf
+
+ return nil
+}
+
+func readUncompressedFrame(r io.Reader) ([]byte, bool, error) {
+ const headerSize = 6
+ header := [headerSize + 1]byte{}
+
+ // Read the frame header
+ if _, err := io.ReadFull(r, header[:headerSize]); err != nil {
+ return nil, false, fmt.Errorf("gocql: failed to read
uncompressed frame, err: %w", err)
+ }
+
+ // Compute and verify the header CRC24
+ computedHeaderCRC24 := KoopmanChecksum(header[:3])
+ readHeaderCRC24 := binary.LittleEndian.Uint32(header[3:]) & 0xFFFFFF
+ if computedHeaderCRC24 != readHeaderCRC24 {
+ return nil, false, fmt.Errorf("gocql: crc24 mismatch in frame
header, computed: %d, got: %d", computedHeaderCRC24, readHeaderCRC24)
+ }
+
+ // Extract the payload length and self-contained flag
+ headerInt := binary.LittleEndian.Uint32(header[:4])
+ payloadLen := int(headerInt & 0x1FFFF)
+ isSelfContained := (headerInt & (1 << 17)) != 0
+
+ // Read the payload
+ payload := make([]byte, payloadLen)
+ if _, err := io.ReadFull(r, payload); err != nil {
+ return nil, false, fmt.Errorf("gocql: failed to read
uncompressed frame payload, err: %w", err)
+ }
+
+ // Read and verify the payload CRC32
+ if _, err := io.ReadFull(r, header[:4]); err != nil {
+ return nil, false, fmt.Errorf("gocql: failed to read payload
crc32, err: %w", err)
+ }
+
+ computedPayloadCRC32 := ChecksumIEEE(payload)
+ readPayloadCRC32 := binary.LittleEndian.Uint32(header[:4])
+ if computedPayloadCRC32 != readPayloadCRC32 {
+ return nil, false, fmt.Errorf("gocql: payload crc32 mismatch,
computed: %d, got: %d", computedPayloadCRC32, readPayloadCRC32)
+ }
+
+ return payload, isSelfContained, nil
+}
+
+func newUncompressedFrame(payload []byte, isSelfContained bool) ([]byte,
error) {
+ const (
+ headerSize = 6
+ selfContainedBit = 1 << 17
+ )
+
+ payloadLen := len(payload)
+ if payloadLen > maxPayloadSize {
+ return nil, fmt.Errorf("payload length (%d) exceeds maximum
size of 128 KiB", payloadLen)
+ }
+
+ // Create the frame
+ frameSize := headerSize + payloadLen + 4 // 4 bytes for CRC32
+ frame := make([]byte, frameSize)
+
+ // First 3 bytes: payload length and self-contained flag
+ headerInt := uint32(payloadLen) & 0x1FFFF
+ if isSelfContained {
+ headerInt |= selfContainedBit // Set the self-contained flag
+ }
+
+ // Encode the first 3 bytes as a single little-endian integer
+ frame[0] = byte(headerInt)
+ frame[1] = byte(headerInt >> 8)
+ frame[2] = byte(headerInt >> 16)
+
+ // Calculate CRC24 for the first 3 bytes of the header
+ crc := KoopmanChecksum(frame[:3])
+
+ // Encode CRC24 into the next 3 bytes of the header
+ frame[3] = byte(crc)
+ frame[4] = byte(crc >> 8)
+ frame[5] = byte(crc >> 16)
+
+ copy(frame[headerSize:], payload) // Copy the payload to the frame
+
+ // Calculate CRC32 for the payload
+ payloadCRC32 := ChecksumIEEE(payload)
+ binary.LittleEndian.PutUint32(frame[headerSize+payloadLen:],
payloadCRC32)
+
+ return frame, nil
+}
+
+func newCompressedFrame(uncompressedPayload []byte, isSelfContained bool,
compressor Compressor) ([]byte, error) {
+ uncompressedLen := len(uncompressedPayload)
+ if uncompressedLen > maxPayloadSize {
+ return nil, fmt.Errorf("uncompressed compressed payload length
exceedes max size of frame payload %d/%d", uncompressedLen, maxPayloadSize)
+ }
+
+ compressedPayload, err := compressor.Encode(uncompressedPayload)
+ if err != nil {
+ return nil, err
+ }
+
+ // Skip the first 4 bytes because the size of the uncompressed payload
is written in the frame header, not in the
+ // body of the compressed envelope
+ compressedPayload = compressedPayload[4:]
+
+ compressedLen := len(compressedPayload)
+
+ // Compression is not worth it
+ if uncompressedLen < compressedLen {
+ // native_protocol_v5.spec
+ // 2.2
+ // An uncompressed length of 0 signals that the compressed
payload
+ // should be used as-is and not decompressed.
+ compressedPayload = uncompressedPayload
+ compressedLen = uncompressedLen
+ uncompressedLen = 0
+ }
+
+ // Combine compressed and uncompressed lengths and set the
self-contained flag if needed
+ combined := uint64(compressedLen) | uint64(uncompressedLen)<<17
+ if isSelfContained {
+ combined |= 1 << 34
+ }
+
+ var headerBuf [8]byte
Review Comment:
NIT: `var headerBuf [headerSize + crc24Size]byte`
##########
conn.go:
##########
@@ -1756,6 +1888,41 @@ func (c *Conn) awaitSchemaAgreement(ctx context.Context)
(err error) {
return fmt.Errorf("gocql: cluster schema versions not consistent: %+v",
schemas)
}
+// recvLastsFrames reads proto v5 frames from Conn.r and writes decoded
payload to dst.
+// It reads data until the bytesToRead is reached.
+// If Conn.compressor is not nil, it processes Compressed Format frames.
+func (c *Conn) recvLastsFrames(dst *bytes.Buffer, bytesToRead int) error {
Review Comment:
NIT: also can we move this function so it is located right after
`recvProtoV5Frame`? It would help with code navigation
##########
frame.go:
##########
@@ -2070,3 +2133,247 @@ func (f *framer) writeBytesMap(m map[string][]byte) {
f.writeBytes(v)
}
}
+
+func (f *framer) prepareModernLayout() error {
+ // Ensure protocol version is V5 or higher
+ if f.proto < protoVersion5 {
+ panic("Modern layout is not supported with version V4 or less")
+ }
+
+ selfContained := true
+
+ var (
+ adjustedBuf []byte
+ tempBuf []byte
+ err error
+ )
+
+ // Process the buffer in chunks if it exceeds the max payload size
+ for len(f.buf) > maxPayloadSize {
+ if f.compres != nil {
+ tempBuf, err =
newCompressedFrame(f.buf[:maxPayloadSize], false, f.compres)
+ } else {
+ tempBuf, err =
newUncompressedFrame(f.buf[:maxPayloadSize], false)
+ }
+ if err != nil {
+ return err
+ }
+
+ adjustedBuf = append(adjustedBuf, tempBuf...)
+ f.buf = f.buf[maxPayloadSize:]
+ selfContained = false
+ }
+
+ // Process the remaining buffer
+ if f.compres != nil {
+ tempBuf, err = newCompressedFrame(f.buf, selfContained,
f.compres)
+ } else {
+ tempBuf, err = newUncompressedFrame(f.buf, selfContained)
+ }
+ if err != nil {
+ return err
+ }
+
+ adjustedBuf = append(adjustedBuf, tempBuf...)
+ f.buf = adjustedBuf
+
+ return nil
+}
+
+func readUncompressedFrame(r io.Reader) ([]byte, bool, error) {
+ const headerSize = 6
+ header := [headerSize + 1]byte{}
+
+ // Read the frame header
+ if _, err := io.ReadFull(r, header[:headerSize]); err != nil {
+ return nil, false, fmt.Errorf("gocql: failed to read
uncompressed frame, err: %w", err)
+ }
+
+ // Compute and verify the header CRC24
+ computedHeaderCRC24 := KoopmanChecksum(header[:3])
+ readHeaderCRC24 := binary.LittleEndian.Uint32(header[3:]) & 0xFFFFFF
+ if computedHeaderCRC24 != readHeaderCRC24 {
+ return nil, false, fmt.Errorf("gocql: crc24 mismatch in frame
header, computed: %d, got: %d", computedHeaderCRC24, readHeaderCRC24)
+ }
+
+ // Extract the payload length and self-contained flag
+ headerInt := binary.LittleEndian.Uint32(header[:4])
+ payloadLen := int(headerInt & 0x1FFFF)
+ isSelfContained := (headerInt & (1 << 17)) != 0
+
+ // Read the payload
+ payload := make([]byte, payloadLen)
+ if _, err := io.ReadFull(r, payload); err != nil {
+ return nil, false, fmt.Errorf("gocql: failed to read
uncompressed frame payload, err: %w", err)
+ }
+
+ // Read and verify the payload CRC32
+ if _, err := io.ReadFull(r, header[:4]); err != nil {
+ return nil, false, fmt.Errorf("gocql: failed to read payload
crc32, err: %w", err)
+ }
+
+ computedPayloadCRC32 := ChecksumIEEE(payload)
+ readPayloadCRC32 := binary.LittleEndian.Uint32(header[:4])
+ if computedPayloadCRC32 != readPayloadCRC32 {
+ return nil, false, fmt.Errorf("gocql: payload crc32 mismatch,
computed: %d, got: %d", computedPayloadCRC32, readPayloadCRC32)
+ }
+
+ return payload, isSelfContained, nil
+}
+
+func newUncompressedFrame(payload []byte, isSelfContained bool) ([]byte,
error) {
+ const (
+ headerSize = 6
+ selfContainedBit = 1 << 17
+ )
+
+ payloadLen := len(payload)
+ if payloadLen > maxPayloadSize {
+ return nil, fmt.Errorf("payload length (%d) exceeds maximum
size of 128 KiB", payloadLen)
+ }
+
+ // Create the frame
+ frameSize := headerSize + payloadLen + 4 // 4 bytes for CRC32
+ frame := make([]byte, frameSize)
+
+ // First 3 bytes: payload length and self-contained flag
+ headerInt := uint32(payloadLen) & 0x1FFFF
+ if isSelfContained {
+ headerInt |= selfContainedBit // Set the self-contained flag
+ }
+
+ // Encode the first 3 bytes as a single little-endian integer
+ frame[0] = byte(headerInt)
+ frame[1] = byte(headerInt >> 8)
+ frame[2] = byte(headerInt >> 16)
+
+ // Calculate CRC24 for the first 3 bytes of the header
+ crc := KoopmanChecksum(frame[:3])
+
+ // Encode CRC24 into the next 3 bytes of the header
+ frame[3] = byte(crc)
+ frame[4] = byte(crc >> 8)
+ frame[5] = byte(crc >> 16)
+
+ copy(frame[headerSize:], payload) // Copy the payload to the frame
+
+ // Calculate CRC32 for the payload
+ payloadCRC32 := ChecksumIEEE(payload)
+ binary.LittleEndian.PutUint32(frame[headerSize+payloadLen:],
payloadCRC32)
+
+ return frame, nil
+}
+
+func newCompressedFrame(uncompressedPayload []byte, isSelfContained bool,
compressor Compressor) ([]byte, error) {
+ uncompressedLen := len(uncompressedPayload)
+ if uncompressedLen > maxPayloadSize {
+ return nil, fmt.Errorf("uncompressed compressed payload length
exceedes max size of frame payload %d/%d", uncompressedLen, maxPayloadSize)
+ }
+
+ compressedPayload, err := compressor.Encode(uncompressedPayload)
+ if err != nil {
+ return nil, err
+ }
+
+ // Skip the first 4 bytes because the size of the uncompressed payload
is written in the frame header, not in the
+ // body of the compressed envelope
+ compressedPayload = compressedPayload[4:]
+
+ compressedLen := len(compressedPayload)
+
+ // Compression is not worth it
+ if uncompressedLen < compressedLen {
+ // native_protocol_v5.spec
+ // 2.2
+ // An uncompressed length of 0 signals that the compressed
payload
+ // should be used as-is and not decompressed.
+ compressedPayload = uncompressedPayload
+ compressedLen = uncompressedLen
+ uncompressedLen = 0
+ }
+
+ // Combine compressed and uncompressed lengths and set the
self-contained flag if needed
+ combined := uint64(compressedLen) | uint64(uncompressedLen)<<17
+ if isSelfContained {
+ combined |= 1 << 34
+ }
+
+ var headerBuf [8]byte
+
+ // Write the combined value into the header buffer
+ binary.LittleEndian.PutUint64(headerBuf[:], combined)
+
+ // Create a buffer with enough capacity to hold the header, compressed
payload, and checksums
+ buf := bytes.NewBuffer(make([]byte, 0, 8+compressedLen+4))
Review Comment:
NIT: `buf := bytes.NewBuffer(make([]byte, 0, headerSize + crc24Size +
compressedLen + crc32Size))`
##########
frame.go:
##########
@@ -2070,3 +2133,247 @@ func (f *framer) writeBytesMap(m map[string][]byte) {
f.writeBytes(v)
}
}
+
+func (f *framer) prepareModernLayout() error {
+ // Ensure protocol version is V5 or higher
+ if f.proto < protoVersion5 {
+ panic("Modern layout is not supported with version V4 or less")
+ }
+
+ selfContained := true
+
+ var (
+ adjustedBuf []byte
+ tempBuf []byte
+ err error
+ )
+
+ // Process the buffer in chunks if it exceeds the max payload size
+ for len(f.buf) > maxPayloadSize {
+ if f.compres != nil {
+ tempBuf, err =
newCompressedFrame(f.buf[:maxPayloadSize], false, f.compres)
+ } else {
+ tempBuf, err =
newUncompressedFrame(f.buf[:maxPayloadSize], false)
+ }
+ if err != nil {
+ return err
+ }
+
+ adjustedBuf = append(adjustedBuf, tempBuf...)
Review Comment:
I have some concerns about performance not being the best when compared to
v4 but I don't think it will be bad enough to justify making it a blocker for
this PR. It would be nice to get some benchmark data comparing the v4 and v5
implementations for the same workloads though.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]