worryg0d commented on code in PR #1946:
URL: 
https://github.com/apache/cassandra-gocql-driver/pull/1946#discussion_r3273826818


##########
conn.go:
##########
@@ -2047,6 +1937,343 @@ func (c *Conn) awaitSchemaAgreementWithTimeout(ctx 
context.Context, timeout time
        return fmt.Errorf("gocql: cluster schema versions not consistent: %+v", 
schemas)
 }
 
+// segmentWriter allows batching multiple frames into a signle segment before 
flushing them to the connection.
+type segmentWriter struct {
+       w    contextWriter
+       quit <-chan struct{}
+
+       // Holds write requests for the current segment.
+       writeRequests     []writeRequest
+       totalFramesLength int
+       writeCh           chan writeRequest
+
+       segmentCodec segmentCodec
+}
+
+func newSegmentWriter(w contextWriter, writeInterval time.Duration, quit 
<-chan struct{}, compressor Compressor) *segmentWriter {
+       sw := &segmentWriter{
+               w:            w,
+               quit:         quit,
+               writeCh:      make(chan writeRequest),
+               segmentCodec: newSegmentCodec(compressor),
+       }
+
+       go sw.runFlusher(writeInterval)
+
+       return sw
+}
+
+func (sw *segmentWriter) writeContext(ctx context.Context, frame []byte) (int, 
error) {
+       resultChan := make(chan writeResult, 1)
+       req := writeRequest{
+               resultChan: resultChan,
+               data:       frame,
+       }
+
+       select {
+       case <-ctx.Done():
+               return 0, ctx.Err()
+       case <-sw.quit:
+               return 0, ErrConnectionClosed
+       case sw.writeCh <- req:
+               // Enqueued for writing
+       }
+
+       result := <-resultChan
+       return result.n, result.err
+}
+
+func (sw *segmentWriter) runFlusher(interval time.Duration) {
+       timer := time.NewTimer(interval)
+       defer timer.Stop()
+
+       if !timer.Stop() {
+               <-timer.C
+       }
+
+       // Indicates whether the flush timer is running
+       running := false
+
+       for {
+               select {
+               case <-sw.quit:
+                       return
+               case req := <-sw.writeCh:
+                       frame := req.data
+                       if len(frame) > maxSegmentPayloadSize {
+                               sw.flushBigFrameImmediately(req)
+                       } else if sw.fitsSegment(frame) {
+                               sw.appendWriteRequest(req)
+                               if !running {
+                                       running = true
+                                       timer.Reset(interval)
+                               }
+                       } else {
+                               // Frame doesn't fit into current segment,
+                               // so we need to flush the current one and 
start a new one
+                               sw.flushCurrentSegment()
+                               sw.reset()
+                               sw.appendWriteRequest(req)
+                               timer.Reset(interval)
+                       }
+               case <-timer.C:
+                       running = false
+                       sw.flushCurrentSegment()
+                       sw.reset()
+               }
+       }
+}
+
+func (sw *segmentWriter) appendWriteRequest(req writeRequest) {
+       sw.writeRequests = append(sw.writeRequests, req)
+       sw.totalFramesLength += len(req.data)
+}
+
+func (sw *segmentWriter) fitsSegment(frame []byte) bool {
+       return sw.totalFramesLength+len(frame) <= maxSegmentPayloadSize
+}
+
+// Flushes the current segment and writes the results to the result listeners.
+// Should be called before resetting the segment writer.
+func (sw *segmentWriter) flushCurrentSegment() {
+       framesBuf := make([]byte, 0, sw.totalFramesLength)
+       for _, req := range sw.writeRequests {
+               // TODO: interesting if compiler optimizes this
+               framesBuf = append(framesBuf, req.data...)
+       }
+
+       err := sw.encodeAndWrite(framesBuf, true)
+       if err != nil {
+               for _, req := range sw.writeRequests {
+                       req.resultChan <- writeResult{
+                               n:   0,
+                               err: err,
+                       }
+               }
+               return
+       }
+
+       for _, req := range sw.writeRequests {
+               req.resultChan <- writeResult{
+                       n:   len(req.data),
+                       err: nil,
+               }
+       }
+}
+
+func (sw *segmentWriter) reset() {
+       sw.writeRequests = nil
+       sw.totalFramesLength = 0
+}
+
+// Encodes a big frame which size is larger than maxSegmentPayloadSize
+// into multiple non self-contained segments and flushes them immediately
+func (sw *segmentWriter) flushBigFrameImmediately(req writeRequest) {
+       // Calculate the number of segment the frame will be split into
+       segmentsCount := 0
+       frame := req.data
+       frameLength := len(frame)
+       exactFit := frameLength%maxSegmentPayloadSize == 0
+       if exactFit {
+               segmentsCount = frameLength / maxSegmentPayloadSize
+       } else {
+               // An extra segment for the remainder of the frame
+               segmentsCount = frameLength/maxSegmentPayloadSize + 1
+       }
+
+       var flushErr error
+
+       for i := 0; i < segmentsCount; i++ {
+               // Calculate the length of the current frame part which will be 
encoded into a segment
+               partialFrameLength := 0
+               if i < segmentsCount-1 || exactFit {
+                       partialFrameLength = maxSegmentPayloadSize
+               } else {
+                       partialFrameLength = frameLength % maxSegmentPayloadSize
+               }
+               err := sw.encodeAndWrite(frame[:partialFrameLength], false)
+               if err != nil {
+                       flushErr = err
+                       break
+               }
+               frame = frame[partialFrameLength:]
+       }
+
+       written := len(req.data)
+       if flushErr != nil {
+               written = 0
+       }
+
+       req.resultChan <- writeResult{
+               n:   written,
+               err: flushErr,
+       }
+}
+
+// Encodes a frame into a segment and writes it to the underlying connection
+func (sw *segmentWriter) encodeAndWrite(frame []byte, isSelfContained bool) 
error {
+       segmentBuf, err := sw.segmentCodec.encode(frame, isSelfContained)
+       if err != nil {
+               return err
+       }
+       _, err = sw.w.writeContext(context.Background(), segmentBuf)
+       if err != nil {
+               return err
+       }
+       return nil
+}
+
+// segmentReader allows reading segments from the underlying connection.
+// Implements ConnReader interface.
+type segmentReader struct {
+       r ConnReader
+
+       segmentCodec segmentCodec
+
+       // Reusable buffer for decoded frames
+       // This buffer might have multiple frames inside if self-contained 
segment is decoded
+       readBufferDecoded bytes.Reader
+       // Reusable buffer for reading frame header
+       frameHeaderBuf [frameHeadSize]byte
+}
+
+func newSegmentReader(r ConnReader, segmentCodec segmentCodec) *segmentReader {
+       return &segmentReader{
+               r:            r,
+               segmentCodec: segmentCodec,
+       }
+}
+
+// why do we have a write method for reader lol
+func (sr *segmentReader) Write(b []byte) (n int, err error) {
+       return sr.r.Write(b)
+}
+
+func (sr *segmentReader) Close() error {
+       return sr.r.Close()
+}
+
+func (sr *segmentReader) LocalAddr() net.Addr {
+       return sr.r.LocalAddr()
+}
+
+func (sr *segmentReader) RemoteAddr() net.Addr {
+       return sr.r.RemoteAddr()
+}
+
+func (sr *segmentReader) SetDeadline(t time.Time) error {
+       return sr.r.SetDeadline(t)
+}
+
+func (sr *segmentReader) SetReadDeadline(t time.Time) error {
+       return sr.r.SetReadDeadline(t)
+}
+
+func (sr *segmentReader) SetWriteDeadline(t time.Time) error {
+       return sr.r.SetWriteDeadline(t)
+}
+
+func (sr *segmentReader) SetTimeout(timeout time.Duration) {
+       sr.r.SetTimeout(timeout)
+}
+
+func (sr *segmentReader) GetTimeout() time.Duration {
+       return sr.r.GetTimeout()
+}
+
+func (sr *segmentReader) Read(p []byte) (n int, err error) {
+       // If we don't have a read buffer, or it's empty, read the first 
segment.
+       // If we have read all the frames from the current segment, read the 
next segment.
+       // If segment is non self-container, it will read all segments and read 
buffer will hold the full frame.
+       if sr.readBufferDecoded.Len() == 0 {
+               err = sr.readSegment()
+               if err != nil {
+                       return 0, err
+               }
+       }
+
+       return sr.readBufferDecoded.Read(p)
+}
+
+func (sr *segmentReader) readSegment() error {
+       segment, isSelfContained, err := sr.segmentCodec.decode(sr.r)
+       if err != nil {
+               // TODO: does only network related errors should result in 
connection closure?
+               // var verr net.Error
+               // if errors.As(err, &verr) {
+               //      return nil, false, verr
+               // }

Review Comment:
   
https://github.com/apache/cassandra-gocql-driver/blob/c65c762b83eccdb6a6a083c123a5935c4302745c/conn.go#L904-L925
   
   In case of net.Error it do not break if I'm not missing anything.
   
   https://pkg.go.dev/net#Error tells us whether it is timeout or not. 
Actually, probably worth updating the code to rely on net.Error.Timeout() 
intstead of Temporary() as suggested



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to