fishy commented on a change in pull request #2181:
URL: https://github.com/apache/thrift/pull/2181#discussion_r439778871



##########
File path: lib/go/thrift/binary_protocol.go
##########
@@ -455,16 +455,27 @@ func (p *TBinaryProtocol) Flush(ctx context.Context) (err 
error) {
        return NewTProtocolException(p.trans.Flush(ctx))
 }
 
-func (p *TBinaryProtocol) Skip(fieldType TType) (err error) {
-       return SkipDefaultDepth(p, fieldType)
+func (p *TBinaryProtocol) Skip(ctx context.Context, fieldType TType) (err 
error) {
+       return SkipDefaultDepth(ctx, p, fieldType)
 }
 
 func (p *TBinaryProtocol) Transport() TTransport {
        return p.origTransport
 }
 
-func (p *TBinaryProtocol) readAll(buf []byte) error {
-       _, err := io.ReadFull(p.trans, buf)
+func (p *TBinaryProtocol) readAll(ctx context.Context, buf []byte) (err error) 
{
+       var read int
+       _, deadlineSet := ctx.Deadline()
+       for {
+               read, err = io.ReadFull(p.trans, buf)
+               if deadlineSet && read == 0 && isTimeoutError(err) && ctx.Err() 
== nil {
+                       // This is I/O timeout without anything read,
+                       // and we still have time left, keep retrying.
+                       continue
+               }
+               // For anything else, don't retry
+               break
+       }

Review comment:
       This for loop is how we implement context deadline check for 
TBinaryProtocol, as the first read in `ReadMessageBegin` is `ReadI32` which 
calls `readAll`.

##########
File path: lib/go/thrift/compact_protocol.go
##########
@@ -329,9 +329,20 @@ func (p *TCompactProtocol) WriteBinary(bin []byte) error {
 //
 
 // Read a message header.
-func (p *TCompactProtocol) ReadMessageBegin() (name string, typeId 
TMessageType, seqId int32, err error) {
+func (p *TCompactProtocol) ReadMessageBegin(ctx context.Context) (name string, 
typeId TMessageType, seqId int32, err error) {
+       var protocolId byte
 
-       protocolId, err := p.readByteDirect()
+       _, deadlineSet := ctx.Deadline()
+       for {
+               protocolId, err = p.readByteDirect()
+               if deadlineSet && isTimeoutError(err) && ctx.Err() == nil {
+                       // keep retrying I/O timeout errors since we still have
+                       // time left
+                       continue
+               }
+               // For anything else, don't retry
+               break
+       }

Review comment:
       This for loop is how we implement context deadline check in 
TCompactProtocol.

##########
File path: lib/go/thrift/transport_exception.go
##########
@@ -64,6 +64,10 @@ func (p *tTransportException) Unwrap() error {
        return p.err
 }
 
+func (p *tTransportException) Timeout() bool {
+       return p.typeId == TIMED_OUT
+}

Review comment:
       This is also newly added to make `isTimeoutError` implementation easier 
(so it does not need to try to check for TTransportException and unwrap it).

##########
File path: lib/go/thrift/header_transport.go
##########
@@ -297,18 +297,34 @@ func (t *THeaderTransport) IsOpen() bool {
 
 // ReadFrame tries to read the frame header, guess the client type, and handle
 // unframed clients.
-func (t *THeaderTransport) ReadFrame() error {
+func (t *THeaderTransport) ReadFrame(ctx context.Context) error {
        if !t.needReadFrame() {
                // No need to read frame, skipping.
                return nil
        }
+
        // Peek and handle the first 32 bits.
        // They could either be the length field of a framed message,
        // or the first bytes of an unframed message.
-       buf, err := t.reader.Peek(size32)
+       var buf []byte
+       var err error
+       // This is also usually the first read from a connection,
+       // so handle retries around socket timeouts.
+       _, deadlineSet := ctx.Deadline()
+       for {
+               buf, err = t.reader.Peek(size32)
+               if deadlineSet && isTimeoutError(err) && ctx.Err() == nil {
+                       // This is I/O timeout and we still have time,
+                       // continue trying
+                       continue
+               }
+               // For anything else, do not retry
+               break
+       }

Review comment:
       This for loop is how we implement context deadline check in 
THeaderProtocol, as this is the first read ReadMessageBegin (if 
`t.reedReadFrame` returned false above, then the actual `ReadMessageBegin` will 
call the underlying TBinaryProtocol.ReadMessageBegin or 
TCompactProtocol.ReadMessageBegin, which already handled it).




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to