Author: kclark
Date: Fri Jul 18 14:49:50 2008
New Revision: 678053

URL: http://svn.apache.org/viewvc?rev=678053&view=rev
Log:
rb: Add optional timeout argument to Thrift::Socket [THRIFT-74]

Socket.new and UNIXSocket.new both now have a new optional argument: timeout.
There's also a timeout field accessor. This timeout is used when reading or
writing.

Author: Kevin Ballard <[EMAIL PROTECTED]>

Modified:
    incubator/thrift/trunk/lib/rb/lib/thrift/transport/socket.rb
    incubator/thrift/trunk/lib/rb/lib/thrift/transport/unixsocket.rb
    incubator/thrift/trunk/lib/rb/spec/socket_spec.rb
    incubator/thrift/trunk/lib/rb/spec/socket_spec_shared.rb
    incubator/thrift/trunk/lib/rb/spec/unixsocket_spec.rb

Modified: incubator/thrift/trunk/lib/rb/lib/thrift/transport/socket.rb
URL: 
http://svn.apache.org/viewvc/incubator/thrift/trunk/lib/rb/lib/thrift/transport/socket.rb?rev=678053&r1=678052&r2=678053&view=diff
==============================================================================
--- incubator/thrift/trunk/lib/rb/lib/thrift/transport/socket.rb (original)
+++ incubator/thrift/trunk/lib/rb/lib/thrift/transport/socket.rb Fri Jul 18 
14:49:50 2008
@@ -11,14 +11,15 @@
 
 module Thrift
   class Socket < Transport
-    def initialize(host='localhost', port=9090)
+    def initialize(host='localhost', port=9090, timeout=nil)
       @host = host
       @port = port
+      @timeout = timeout
       @desc = "#{host}:#{port}"
       @handle = nil
     end
 
-    attr_accessor :handle
+    attr_accessor :handle, :timeout
 
     def open
       begin
@@ -35,11 +36,31 @@
     def write(str)
       raise IOError, "closed stream" unless open?
       begin
-        @handle.write(str)
-      rescue StandardError
+        if @timeout.nil? or @timeout == 0
+          @handle.write(str)
+        else
+          len = 0
+          start = Time.now
+          while Time.now - start < @timeout
+            rd, wr, = IO.select(nil, [EMAIL PROTECTED], nil, @timeout)
+            if wr and not wr.empty?
+              len += @handle.write_nonblock(str[len..-1])
+              break if len >= str.length
+            end
+          end
+          if len < str.length
+            raise TransportException.new(TransportException::TIMED_OUT, 
"Socket: Timed out writing #{str.length} bytes to [EMAIL PROTECTED]")
+          else
+            len
+          end
+        end
+      rescue TransportException => e
+        # pass this on
+        raise e
+      rescue StandardError => e
         @handle.close
         @handle = nil
-        raise TransportException.new(TransportException::NOT_OPEN)
+        raise TransportException.new(TransportException::NOT_OPEN, e.message)
       end
     end
 
@@ -47,7 +68,26 @@
       raise IOError, "closed stream" unless open?
 
       begin
-        data = @handle.readpartial(sz)
+        if @timeout.nil? or @timeout == 0
+          data = @handle.readpartial(sz)
+        else
+          # it's possible to interrupt select for something other than the 
timeout
+          # so we need to ensure we've waited long enough
+          start = Time.now
+          rd = nil # scoping
+          loop do
+            rd, = IO.select([EMAIL PROTECTED], nil, nil, @timeout)
+            break if (rd and not rd.empty?) or Time.now - start >= @timeout
+          end
+          if rd.nil? or rd.empty?
+            raise TransportException.new(TransportException::TIMED_OUT, 
"Socket: Timed out reading #{sz} bytes from [EMAIL PROTECTED]")
+          else
+            data = @handle.readpartial(sz)
+          end
+        end
+      rescue TransportException => e
+        # don't let this get caught by the StandardError handler
+        raise e
       rescue StandardError => e
         @handle.close unless @handle.closed?
         @handle = nil

Modified: incubator/thrift/trunk/lib/rb/lib/thrift/transport/unixsocket.rb
URL: 
http://svn.apache.org/viewvc/incubator/thrift/trunk/lib/rb/lib/thrift/transport/unixsocket.rb?rev=678053&r1=678052&r2=678053&view=diff
==============================================================================
--- incubator/thrift/trunk/lib/rb/lib/thrift/transport/unixsocket.rb (original)
+++ incubator/thrift/trunk/lib/rb/lib/thrift/transport/unixsocket.rb Fri Jul 18 
14:49:50 2008
@@ -3,8 +3,9 @@
 
 module Thrift
   class UNIXSocket < Socket
-    def initialize(path)
+    def initialize(path, timeout=nil)
       @path = path
+      @timeout = timeout
       @desc = @path # for read()'s error
       @handle = nil
     end

Modified: incubator/thrift/trunk/lib/rb/spec/socket_spec.rb
URL: 
http://svn.apache.org/viewvc/incubator/thrift/trunk/lib/rb/spec/socket_spec.rb?rev=678053&r1=678052&r2=678053&view=diff
==============================================================================
--- incubator/thrift/trunk/lib/rb/spec/socket_spec.rb (original)
+++ incubator/thrift/trunk/lib/rb/spec/socket_spec.rb Fri Jul 18 14:49:50 2008
@@ -28,6 +28,11 @@
       TCPSocket.should_receive(:new).with('my.domain', 1234)
       Socket.new('my.domain', 1234).open
     end
+
+    it "should accept an optional timeout" do
+      TCPSocket.stub!(:new)
+      Socket.new('localhost', 8080, 5).timeout.should == 5
+    end
   end
 
   describe ServerSocket do

Modified: incubator/thrift/trunk/lib/rb/spec/socket_spec_shared.rb
URL: 
http://svn.apache.org/viewvc/incubator/thrift/trunk/lib/rb/spec/socket_spec_shared.rb?rev=678053&r1=678052&r2=678053&view=diff
==============================================================================
--- incubator/thrift/trunk/lib/rb/spec/socket_spec_shared.rb (original)
+++ incubator/thrift/trunk/lib/rb/spec/socket_spec_shared.rb Fri Jul 18 
14:49:50 2008
@@ -51,4 +51,35 @@
     lambda { @socket.write("fail") }.should raise_error(IOError, "closed 
stream")
     lambda { @socket.read(10) }.should raise_error(IOError, "closed stream")
   end
+
+  it "should support the timeout accessor for read" do
+    @socket.timeout = 3
+    @socket.open
+    IO.should_receive(:select).with([EMAIL PROTECTED], nil, nil, 
3).and_return([EMAIL PROTECTED], [], []])
+    @handle.should_receive(:readpartial).with(17).and_return("test data")
+    @socket.read(17).should == "test data"
+  end
+
+  it "should support the timeout accessor for write" do
+    @socket.timeout = 3
+    @socket.open
+    IO.should_receive(:select).with(nil, [EMAIL PROTECTED], nil, 
3).twice.and_return([[], [EMAIL PROTECTED], []])
+    @handle.should_receive(:write_nonblock).with("test data").and_return(4)
+    @handle.should_receive(:write_nonblock).with(" data").and_return(5)
+    @socket.write("test data").should == 9
+  end
+
+  it "should raise an error when read times out" do
+    @socket.timeout = 0.5
+    @socket.open
+    IO.should_receive(:select).with([EMAIL PROTECTED], nil, nil, 
0.5).at_least(1).times.and_return(nil)
+    lambda { @socket.read(17) }.should raise_error(Thrift::TransportException) 
{ |e| e.type.should == Thrift::TransportException::TIMED_OUT }
+  end
+
+  it "should raise an error when write times out" do
+    @socket.timeout = 0.5
+    @socket.open
+    IO.should_receive(:select).with(nil, [EMAIL PROTECTED], nil, 
0.5).any_number_of_times.and_return(nil)
+    lambda { @socket.write("test data") }.should 
raise_error(Thrift::TransportException) { |e| e.type.should == 
Thrift::TransportException::TIMED_OUT }
+  end
 end

Modified: incubator/thrift/trunk/lib/rb/spec/unixsocket_spec.rb
URL: 
http://svn.apache.org/viewvc/incubator/thrift/trunk/lib/rb/spec/unixsocket_spec.rb?rev=678053&r1=678052&r2=678053&view=diff
==============================================================================
--- incubator/thrift/trunk/lib/rb/spec/unixsocket_spec.rb (original)
+++ incubator/thrift/trunk/lib/rb/spec/unixsocket_spec.rb Fri Jul 18 14:49:50 
2008
@@ -20,6 +20,11 @@
       ::UNIXSocket.should_receive(:new).and_raise(StandardError)
       lambda { @socket.open }.should raise_error(Thrift::TransportException) { 
|e| e.type.should == Thrift::TransportException::NOT_OPEN }
     end
+
+    it "should accept an optional timeout" do
+      ::UNIXSocket.stub!(:new)
+      UNIXSocket.new(@path, 5).timeout.should == 5
+    end
   end
 
   describe UNIXServerSocket do


Reply via email to