Author: dreiss
Date: Tue Jun 10 18:00:20 2008
New Revision: 666435

URL: http://svn.apache.org/viewvc?rev=666435&view=rev
Log:
allow configurable recv_timeouts

Modified:
    incubator/thrift/trunk/lib/alterl/src/thrift_socket_server.erl
    incubator/thrift/trunk/lib/alterl/src/thrift_socket_transport.erl

Modified: incubator/thrift/trunk/lib/alterl/src/thrift_socket_server.erl
URL: 
http://svn.apache.org/viewvc/incubator/thrift/trunk/lib/alterl/src/thrift_socket_server.erl?rev=666435&r1=666434&r2=666435&view=diff
==============================================================================
--- incubator/thrift/trunk/lib/alterl/src/thrift_socket_server.erl (original)
+++ incubator/thrift/trunk/lib/alterl/src/thrift_socket_server.erl Tue Jun 10 
18:00:20 2008
@@ -27,7 +27,9 @@
         max=2048,
         ip=any,
         listen=null,
-        acceptor=null}).
+        acceptor=null,
+         socket_opts=[{recv_timeout, 500}]
+        }).
 
 start(State=#thrift_socket_server{}) ->
     io:format("~p~n", [State]),
@@ -78,6 +80,8 @@
                       IpTuple
               end,
     parse_options(Rest, State#thrift_socket_server{ip=ParsedIp});
+parse_options([{socket_opts, L} | Rest], State) when is_list(L), length(L) > 0 
->
+    parse_options(Rest, State#thrift_socket_server{socket_opts=L});
 parse_options([{handler, Handler} | Rest], State) ->
     parse_options(Rest, State#thrift_socket_server{handler=Handler});
 parse_options([{service, Service} | Rest], State) ->
@@ -152,19 +156,22 @@
 new_acceptor(State=#thrift_socket_server{max=0}) ->
     error_logger:error_msg("Not accepting new connections"),
     State#thrift_socket_server{acceptor=null};
-new_acceptor(State=#thrift_socket_server{acceptor=OldPid, 
listen=Listen,service=Service, handler=Handler}) ->
+new_acceptor(State=#thrift_socket_server{acceptor=OldPid, listen=Listen,
+                                         service=Service, handler=Handler,
+                                         socket_opts=Opts
+                                        }) ->
     Pid = proc_lib:spawn_link(?MODULE, acceptor_loop,
-                              [{self(), Listen, Service, Handler}]),
+                              [{self(), Listen, Service, Handler, Opts}]),
 %%     error_logger:info_msg("Spawning new acceptor: ~p => ~p", [OldPid, Pid]),
     State#thrift_socket_server{acceptor=Pid}.
 
-acceptor_loop({Server, Listen, Service, Handler})
-  when is_pid(Server) ->
-    case catch gen_tcp:accept(Listen) of
+acceptor_loop({Server, Listen, Service, Handler, SocketOpts})
+  when is_pid(Server), is_list(SocketOpts) ->
+    case catch gen_tcp:accept(Listen) of % infiinite timeout
        {ok, Socket} ->
            gen_server:cast(Server, {accepted, self()}),
             ProtoGen = fun() ->
-                               {ok, SocketTransport}   = 
thrift_socket_transport:new(Socket),
+                               {ok, SocketTransport}   = 
thrift_socket_transport:new(Socket, SocketOpts),
                                {ok, BufferedTransport} = 
thrift_buffered_transport:new(SocketTransport),
                                {ok, Protocol}          = 
thrift_binary_protocol:new(BufferedTransport),
                                {ok, IProt=Protocol, OProt=Protocol}

Modified: incubator/thrift/trunk/lib/alterl/src/thrift_socket_transport.erl
URL: 
http://svn.apache.org/viewvc/incubator/thrift/trunk/lib/alterl/src/thrift_socket_transport.erl?rev=666435&r1=666434&r2=666435&view=diff
==============================================================================
--- incubator/thrift/trunk/lib/alterl/src/thrift_socket_transport.erl (original)
+++ incubator/thrift/trunk/lib/alterl/src/thrift_socket_transport.erl Tue Jun 
10 18:00:20 2008
@@ -3,18 +3,33 @@
 -behaviour(thrift_transport).
 
 -export([new/1,
+         new/2,
          write/2, read/2, flush/1, close/1]).
 
--record(data, {socket}).
+-record(data, {socket,
+               recv_timeout=infinity}).
 
 new(Socket) ->
-    thrift_transport:new(?MODULE, #data{socket = Socket}).
+    new(Socket, []).
 
-write(#data{socket = Socket}, Data) when is_binary(Data) ->
+new(Socket, Opts) when is_list(Opts) ->
+    State =
+        case lists:keysearch(recv_timeout, 1, Opts) of
+            {value, {recv_timeout, Timeout}}
+            when is_integer(Timeout), Timeout > 0 ->
+                #data{socket=Socket, recv_timeout=Timeout};
+            _ ->
+                #data{socket=Socket}
+        end,
+    thrift_transport:new(?MODULE, State).
+
+write(#data{socket = Socket}, Data)
+  when is_binary(Data) ->
     gen_tcp:send(Socket, Data).
 
-read(#data{socket = Socket}, Len) when is_integer(Len), Len >= 0 ->
-    gen_tcp:recv(Socket, Len).
+read(D = #data{socket=Socket, recv_timeout=Timeout}, Len)
+  when is_integer(Len), Len >= 0 ->
+    gen_tcp:recv(Socket, Len, Timeout).
 
 %% We can't really flush - everything is flushed when we write
 flush(_) ->


Reply via email to