Author: dreiss
Date: Tue Jun 10 18:01:48 2008
New Revision: 666447

URL: http://svn.apache.org/viewvc?rev=666447&view=rev
Log:
Erlang: add framed_transport and non-strict binary_protocol

- thrift_client now takes as its fourth parameter Options: framed, 
strict_{read,write}, connect_timeout (P.S. fourth param used to be Timeout)
- binary protocol now takes options: strict_{read,write}
- buffers in framed and buffered transport are now iolists and not reversed 
lists of binaries
- rename buffer in buffered transport "write_buffer" to match framed transport

Added:
    incubator/thrift/trunk/lib/alterl/src/thrift_framed_transport.erl
      - copied, changed from r666446, 
incubator/thrift/trunk/lib/alterl/src/thrift_buffered_transport.erl
Modified:
    incubator/thrift/trunk/lib/alterl/src/thrift_binary_protocol.erl
    incubator/thrift/trunk/lib/alterl/src/thrift_buffered_transport.erl
    incubator/thrift/trunk/lib/alterl/src/thrift_client.erl
    incubator/thrift/trunk/lib/alterl/src/thrift_socket_transport.erl
    incubator/thrift/trunk/lib/alterl/src/thrift_transport.erl

Modified: incubator/thrift/trunk/lib/alterl/src/thrift_binary_protocol.erl
URL: 
http://svn.apache.org/viewvc/incubator/thrift/trunk/lib/alterl/src/thrift_binary_protocol.erl?rev=666447&r1=666446&r2=666447&view=diff
==============================================================================
--- incubator/thrift/trunk/lib/alterl/src/thrift_binary_protocol.erl (original)
+++ incubator/thrift/trunk/lib/alterl/src/thrift_binary_protocol.erl Tue Jun 10 
18:01:48 2008
@@ -11,22 +11,37 @@
 -include("thrift_constants.hrl").
 -include("thrift_protocol.hrl").
 
--export([new/1,
+-export([new/1, new/2,
          read/2,
          write/2,
          flush_transport/1,
          close_transport/1
-]).
-
--record(binary_protocol, {transport}).
+        ]).
 
+-record(binary_protocol, {transport,
+                          strict_read=true,
+                          strict_write=true
+                         }).
 
 -define(VERSION_MASK, 16#FFFF0000).
 -define(VERSION_1, 16#80010000).
-
+-define(TYPE_MASK, 16#000000ff).
 
 new(Transport) ->
-    thrift_protocol:new(?MODULE, #binary_protocol{transport = Transport}).
+    new(Transport, _Options = []).
+
+new(Transport, Options) ->
+    State  = #binary_protocol{transport = Transport},
+    State1 = parse_options(Options, State),
+    thrift_protocol:new(?MODULE, State1).
+
+parse_options([], State) ->
+    State;
+parse_options([{strict_read, Bool} | Rest], State) when is_boolean(Bool) ->
+    parse_options(Rest, State#binary_protocol{strict_read=Bool});
+parse_options([{strict_write, Bool} | Rest], State) when is_boolean(Bool) ->
+    parse_options(Rest, State#binary_protocol{strict_write=Bool}).
+
 
 flush_transport(#binary_protocol{transport = Transport}) ->
     thrift_transport:flush(Transport).
@@ -42,9 +57,16 @@
         name = Name,
         type = Type,
         seqid = Seqid}) ->
-    write(This, {i32, ?VERSION_1 bor Type}),
-    write(This, {string, Name}),
-    write(This, {i32, Seqid}),
+    case This#binary_protocol.strict_write of
+        true ->
+            write(This, {i32, ?VERSION_1 bor Type}),
+            write(This, {string, Name}),
+            write(This, {i32, Seqid});
+        false ->
+            write(This, {string, Name}),
+            write(This, {byte, Type}),
+            write(This, {i32, Seqid})
+    end,
     ok;
 
 write(This, message_end) -> ok;
@@ -121,20 +143,40 @@
     write(This, {i32, size(Bin)}),
     write(This, Bin);
 
-write(This, Binary) when is_binary(Binary) ->
-    thrift_transport:write(This#binary_protocol.transport, Binary).
+%% Data :: iolist()
+write(This, Data) ->
+    thrift_transport:write(This#binary_protocol.transport, Data).
 
 %%
 
 read(This, message_begin) ->
     case read(This, i32) of
-        {ok, Version} when Version band ?VERSION_MASK == ?VERSION_1 ->
-            Type = Version band 16#000000ff,
+        {ok, Sz} when Sz band ?VERSION_MASK =:= ?VERSION_1 ->
+            %% we're at version 1
             {ok, Name}  = read(This, string),
+            Type        = Sz band ?TYPE_MASK,
+            {ok, SeqId} = read(This, i32),
+            #protocol_message_begin{name  = binary_to_list(Name),
+                                    type  = Type,
+                                    seqid = SeqId};
+
+        {ok, Sz} when Sz < 0 ->
+            %% there's a version number but it's unexpected
+            {error, {bad_binary_protocol_version, Sz}};
+
+        {ok, Sz} when This#binary_protocol.strict_read =:= true ->
+            %% strict_read is true and there's no version header; that's an 
error
+            {error, no_binary_protocol_version};
+
+        {ok, Sz} when This#binary_protocol.strict_read =:= false ->
+            %% strict_read is false, so just read the old way
+            {ok, Name}  = read(This, Sz),
+            {ok, Type}  = read(This, byte),
             {ok, SeqId} = read(This, i32),
             #protocol_message_begin{name  = binary_to_list(Name),
                                     type  = Type,
                                     seqid = SeqId};
+
         Err = {error, closed} -> Err;
         Err = {error, ebadf}  -> Err
     end;

Modified: incubator/thrift/trunk/lib/alterl/src/thrift_buffered_transport.erl
URL: 
http://svn.apache.org/viewvc/incubator/thrift/trunk/lib/alterl/src/thrift_buffered_transport.erl?rev=666447&r1=666446&r2=666447&view=diff
==============================================================================
--- incubator/thrift/trunk/lib/alterl/src/thrift_buffered_transport.erl 
(original)
+++ incubator/thrift/trunk/lib/alterl/src/thrift_buffered_transport.erl Tue Jun 
10 18:01:48 2008
@@ -21,11 +21,7 @@
 -export([write/2, read/2, flush/1, close/1]).
 
 -record(buffered_transport, {wrapped, % a thrift_transport
-                             buffer
-                             %% a list of binaries which will be concatenated 
and sent during
-                             %% a flush.
-                             %%
-                             %% *** THIS LIST IS STORED IN REVERSE ORDER!!! ***
+                             write_buffer % iolist()
                             }).
 
 %%====================================================================
@@ -46,11 +42,11 @@
 %%--------------------------------------------------------------------
 %% Function: write(Transport, Data) -> ok
 %%
-%% Data = binary()
+%% Data = iolist()
 %%
 %% Description: Writes data into the buffer
 %%--------------------------------------------------------------------
-write(Transport, Data) when is_binary(Data) ->
+write(Transport, Data) ->
     gen_server:call(Transport, {write, Data}).
 
 %%--------------------------------------------------------------------
@@ -94,7 +90,7 @@
     %% TODO(cpiro): need to trap exits here so when transport exits
     %% normally from under our feet we exit normally
     {ok, #buffered_transport{wrapped = Wrapped,
-                             buffer = []}}.
+                             write_buffer = []}}.
 
 %%--------------------------------------------------------------------
 %% Function: %% handle_call(Request, From, State) -> {reply, Reply, State} |
@@ -105,19 +101,18 @@
 %%                                      {stop, Reason, State}
 %% Description: Handling call messages
 %%--------------------------------------------------------------------
-handle_call({write, Data}, _From, State = #buffered_transport{buffer = 
Buffer}) ->
-    {reply, ok, State#buffered_transport{buffer = [Data | Buffer]}};
+handle_call({write, Data}, _From, State = #buffered_transport{write_buffer = 
WBuf}) ->
+    {reply, ok, State#buffered_transport{write_buffer = [WBuf, Data]}};
 
 handle_call({read, Len}, _From, State = #buffered_transport{wrapped = 
Wrapped}) ->
     Response = thrift_transport:read(Wrapped, Len),
     {reply, Response, State};
 
-handle_call(flush, _From, State = #buffered_transport{buffer = Buffer,
+handle_call(flush, _From, State = #buffered_transport{write_buffer = WBuf,
                                                       wrapped = Wrapped}) ->
-    Concat   = concat_binary(lists:reverse(Buffer)),
-    Response = thrift_transport:write(Wrapped, Concat),
+    Response = thrift_transport:write(Wrapped, WBuf),
     thrift_transport:flush(Wrapped),
-    {reply, Response, State#buffered_transport{buffer = []}}.
+    {reply, Response, State#buffered_transport{write_buffer = []}}.
 
 %%--------------------------------------------------------------------
 %% Function: handle_cast(Msg, State) -> {noreply, State} |
@@ -125,9 +120,9 @@
 %%                                      {stop, Reason, State}
 %% Description: Handling cast messages
 %%--------------------------------------------------------------------
-handle_cast(close, State = #buffered_transport{buffer = Buffer,
+handle_cast(close, State = #buffered_transport{write_buffer = WBuf,
                                                wrapped = Wrapped}) ->
-    thrift_transport:write(Wrapped, concat_binary(lists:reverse(Buffer))),
+    thrift_transport:write(Wrapped, WBuf),
     %% Wrapped is closed by terminate/2
     %%  error_logger:info_msg("thrift_buffered_transport ~p: closing", 
[self()]),
     {stop, normal, State};

Modified: incubator/thrift/trunk/lib/alterl/src/thrift_client.erl
URL: 
http://svn.apache.org/viewvc/incubator/thrift/trunk/lib/alterl/src/thrift_client.erl?rev=666447&r1=666446&r2=666447&view=diff
==============================================================================
--- incubator/thrift/trunk/lib/alterl/src/thrift_client.erl (original)
+++ incubator/thrift/trunk/lib/alterl/src/thrift_client.erl Tue Jun 10 18:01:48 
2008
@@ -20,7 +20,12 @@
 -include("thrift_constants.hrl").
 -include("thrift_protocol.hrl").
 
--record(state, {service, protocol, seqid}).
+-record(state, {service, protocol, seqid,
+                strict_read = true,
+                strict_write = true,
+                framed = false,
+                connect_timeout = infinity
+               }).
 
 %%====================================================================
 %% API
@@ -30,10 +35,10 @@
 %% Description: Starts the server
 %%--------------------------------------------------------------------
 start_link(Host, Port, Service) ->
-    start_link(Host, Port, Service, _Timeout = infinity).
+    start_link(Host, Port, Service, []).
 
-start_link(Host, Port, Service, Timeout) when is_integer(Port), 
is_atom(Service) ->
-    gen_server:start_link(?MODULE, [Host, Port, Service, Timeout], []).
+start_link(Host, Port, Service, Options) when is_integer(Port), 
is_atom(Service), is_list(Options) ->
+    gen_server:start_link(?MODULE, [Host, Port, Service, Options], []).
 
 call(Client, Function, Args)
   when is_pid(Client), is_atom(Function), is_list(Args) ->
@@ -57,24 +62,41 @@
 %%                         {stop, Reason}
 %% Description: Initiates the server
 %%--------------------------------------------------------------------
-init([Host, Port, Service]) ->
-    init([Host, Port, Service, infinity]);
+init([Host, Port, Service, Options]) ->
+    State = parse_options(Options, #state{}),
 
-init([Host, Port, Service, Timeout]) ->
     {ok, Sock} = gen_tcp:connect(Host, Port,
                                  [binary,
                                   {packet, 0},
                                   {active, false},
                                   {nodelay, true}
                                  ],
-                                Timeout),
+                                State#state.connect_timeout),
 
-    {ok, Transport}    = thrift_socket_transport:new(Sock),
-    {ok, BufTransport} = thrift_buffered_transport:new(Transport),
-    {ok, Protocol}     = thrift_binary_protocol:new(BufTransport),
-    {ok, #state{service  = Service,
-                protocol = Protocol,
-                seqid    = 0}}.
+    {ok, Transport} = thrift_socket_transport:new(Sock),
+    {ok, BufTransport} =
+        case State#state.framed of
+            true  -> thrift_framed_transport:new(Transport);
+            false -> thrift_buffered_transport:new(Transport)
+        end,
+    {ok, Protocol} = thrift_binary_protocol:new(BufTransport,
+                         [{strict_read,  State#state.strict_read},
+                          {strict_write, State#state.strict_write}]),
+
+    {ok, State#state{service  = Service,
+                     protocol = Protocol,
+                     seqid    = 0}}.
+
+parse_options([], State) ->
+    State;
+parse_options([{strict_read, Bool} | Rest], State) when is_boolean(Bool) ->
+    parse_options(Rest, State#state{strict_read=Bool});
+parse_options([{strict_write, Bool} | Rest], State) when is_boolean(Bool) ->
+    parse_options(Rest, State#state{strict_write=Bool});
+parse_options([{framed, Bool} | Rest], State) when is_boolean(Bool) ->
+    parse_options(Rest, State#state{framed=Bool});
+parse_options([{connect_timeout, TO} | Rest], State) when TO =:= infinity; 
is_integer(TO) ->
+    parse_options(Rest, State#state{connect_timeout=TO}).
 
 %%--------------------------------------------------------------------
 %% Function: %% handle_call(Request, From, State) -> {reply, Reply, State} |

Copied: incubator/thrift/trunk/lib/alterl/src/thrift_framed_transport.erl (from 
r666446, incubator/thrift/trunk/lib/alterl/src/thrift_buffered_transport.erl)
URL: 
http://svn.apache.org/viewvc/incubator/thrift/trunk/lib/alterl/src/thrift_framed_transport.erl?p2=incubator/thrift/trunk/lib/alterl/src/thrift_framed_transport.erl&p1=incubator/thrift/trunk/lib/alterl/src/thrift_buffered_transport.erl&r1=666446&r2=666447&rev=666447&view=diff
==============================================================================
--- incubator/thrift/trunk/lib/alterl/src/thrift_buffered_transport.erl 
(original)
+++ incubator/thrift/trunk/lib/alterl/src/thrift_framed_transport.erl Tue Jun 
10 18:01:48 2008
@@ -1,11 +1,11 @@
 %%%-------------------------------------------------------------------
-%%% File    : thrift_buffered_transport.erl
-%%% Author  :  <[EMAIL PROTECTED]>
-%%% Description : Buffered transport for thrift
+%%% File    : thrift_framed_transport.erl
+%%% Author  : <[EMAIL PROTECTED]>
+%%% Description : Framed transport for thrift
 %%%
-%%% Created : 30 Jan 2008 by  <[EMAIL PROTECTED]>
+%%% Created : 12 Mar 2008 by <[EMAIL PROTECTED]>
 %%%-------------------------------------------------------------------
--module(thrift_buffered_transport).
+-module(thrift_framed_transport).
 
 -behaviour(gen_server).
 -behaviour(thrift_transport).
@@ -20,13 +20,10 @@
 %% thrift_transport callbacks
 -export([write/2, read/2, flush/1, close/1]).
 
--record(buffered_transport, {wrapped, % a thrift_transport
-                             buffer
-                             %% a list of binaries which will be concatenated 
and sent during
-                             %% a flush.
-                             %%
-                             %% *** THIS LIST IS STORED IN REVERSE ORDER!!! ***
-                            }).
+-record(framed_transport, {wrapped, % a thrift_transport
+                           read_buffer, % iolist()
+                           write_buffer % iolist()
+                          }).
 
 %%====================================================================
 %% API
@@ -46,11 +43,11 @@
 %%--------------------------------------------------------------------
 %% Function: write(Transport, Data) -> ok
 %%
-%% Data = binary()
+%% Data = iolist()
 %%
 %% Description: Writes data into the buffer
 %%--------------------------------------------------------------------
-write(Transport, Data) when is_binary(Data) ->
+write(Transport, Data) ->
     gen_server:call(Transport, {write, Data}).
 
 %%--------------------------------------------------------------------
@@ -93,8 +90,9 @@
 init([Wrapped]) ->
     %% TODO(cpiro): need to trap exits here so when transport exits
     %% normally from under our feet we exit normally
-    {ok, #buffered_transport{wrapped = Wrapped,
-                             buffer = []}}.
+    {ok, #framed_transport{wrapped = Wrapped,
+                           read_buffer = [],
+                           write_buffer = []}}.
 
 %%--------------------------------------------------------------------
 %% Function: %% handle_call(Request, From, State) -> {reply, Reply, State} |
@@ -105,19 +103,39 @@
 %%                                      {stop, Reason, State}
 %% Description: Handling call messages
 %%--------------------------------------------------------------------
-handle_call({write, Data}, _From, State = #buffered_transport{buffer = 
Buffer}) ->
-    {reply, ok, State#buffered_transport{buffer = [Data | Buffer]}};
+handle_call({write, Data}, _From, State = #framed_transport{write_buffer = 
WBuf}) ->
+    {reply, ok, State#framed_transport{write_buffer = [WBuf, Data]}};
 
-handle_call({read, Len}, _From, State = #buffered_transport{wrapped = 
Wrapped}) ->
-    Response = thrift_transport:read(Wrapped, Len),
-    {reply, Response, State};
-
-handle_call(flush, _From, State = #buffered_transport{buffer = Buffer,
-                                                      wrapped = Wrapped}) ->
-    Concat   = concat_binary(lists:reverse(Buffer)),
-    Response = thrift_transport:write(Wrapped, Concat),
-    thrift_transport:flush(Wrapped),
-    {reply, Response, State#buffered_transport{buffer = []}}.
+handle_call({read, Len}, _From, State = #framed_transport{wrapped = Wrapped,
+                                                          read_buffer = RBuf}) 
->
+    {RBuf1, RBuf1Size} =
+        %% if the read buffer is empty, read another frame
+        %% otherwise, just read from what's left in the buffer
+        case iolist_size(RBuf) of
+            0 ->
+                %% read the frame length
+                {ok, <<FrameLen:32/integer-signed-big, _/binary>>} =
+                    thrift_transport:read(Wrapped, 4),
+                %% then read the data
+                {ok, Bin} =
+                    thrift_transport:read(Wrapped, FrameLen),
+                {Bin, size(Bin)};
+            Sz ->
+                {RBuf, Sz}
+        end,
+
+    %% pull off Give bytes, return them to the user, leave the rest in the 
buffer
+    Give = min(RBuf1Size, Len),
+    <<Data:Give/binary, RBuf2/binary>> = iolist_to_binary(RBuf1),
+
+    Response = {ok, Data},
+    State1 = State#framed_transport{read_buffer=RBuf2},
+
+    {reply, Response, State1};
+
+handle_call(flush, _From, State) ->
+    {Response, State1} = do_flush(State),
+    {reply, Response, State1}.
 
 %%--------------------------------------------------------------------
 %% Function: handle_cast(Msg, State) -> {noreply, State} |
@@ -125,13 +143,12 @@
 %%                                      {stop, Reason, State}
 %% Description: Handling cast messages
 %%--------------------------------------------------------------------
-handle_cast(close, State = #buffered_transport{buffer = Buffer,
-                                               wrapped = Wrapped}) ->
-    thrift_transport:write(Wrapped, concat_binary(lists:reverse(Buffer))),
+handle_cast(close, State) ->
+    {_, State1} = do_flush(State),
     %% Wrapped is closed by terminate/2
-    %%  error_logger:info_msg("thrift_buffered_transport ~p: closing", 
[self()]),
+    %%  error_logger:info_msg("thrift_framed_transport ~p: closing", [self()]),
     {stop, normal, State};
-handle_cast(Msg, State=#buffered_transport{}) ->
+handle_cast(Msg, State=#framed_transport{}) ->
     {noreply, State}.
 
 %%--------------------------------------------------------------------
@@ -150,7 +167,7 @@
 %% cleaning up. When it returns, the gen_server terminates with Reason.
 %% The return value is ignored.
 %%--------------------------------------------------------------------
-terminate(_Reason, State = #buffered_transport{wrapped=Wrapped}) ->
+terminate(_Reason, State = #framed_transport{wrapped=Wrapped}) ->
     thrift_transport:close(Wrapped),
     ok.
 
@@ -164,3 +181,18 @@
 %%--------------------------------------------------------------------
 %%% Internal functions
 %%--------------------------------------------------------------------
+do_flush(State = #framed_transport{write_buffer = Buffer,
+                                   wrapped = Wrapped}) ->
+    FrameLen = iolist_size(Buffer),
+    Data     = [<<FrameLen:32/integer-signed-big>>, Buffer],
+
+    Response = thrift_transport:write(Wrapped, Data),
+
+    thrift_transport:flush(Wrapped),
+
+    State1 = State#framed_transport{write_buffer = []},
+    {Response, State1}.
+
+min(A,B) when A<B -> A;
+min(_,B)          -> B.
+

Modified: incubator/thrift/trunk/lib/alterl/src/thrift_socket_transport.erl
URL: 
http://svn.apache.org/viewvc/incubator/thrift/trunk/lib/alterl/src/thrift_socket_transport.erl?rev=666447&r1=666446&r2=666447&view=diff
==============================================================================
--- incubator/thrift/trunk/lib/alterl/src/thrift_socket_transport.erl (original)
+++ incubator/thrift/trunk/lib/alterl/src/thrift_socket_transport.erl Tue Jun 
10 18:01:48 2008
@@ -23,8 +23,8 @@
         end,
     thrift_transport:new(?MODULE, State).
 
-write(#data{socket = Socket}, Data)
-  when is_binary(Data) ->
+%% Data :: iolist()
+write(#data{socket = Socket}, Data) ->
     gen_tcp:send(Socket, Data).
 
 read(#data{socket=Socket, recv_timeout=Timeout}, Len)

Modified: incubator/thrift/trunk/lib/alterl/src/thrift_transport.erl
URL: 
http://svn.apache.org/viewvc/incubator/thrift/trunk/lib/alterl/src/thrift_transport.erl?rev=666447&r1=666446&r2=666447&view=diff
==============================================================================
--- incubator/thrift/trunk/lib/alterl/src/thrift_transport.erl (original)
+++ incubator/thrift/trunk/lib/alterl/src/thrift_transport.erl Tue Jun 10 
18:01:48 2008
@@ -22,7 +22,8 @@
     {ok, #transport{module = Module,
                     data = Data}}.
 
-write(Transport, Data) when is_binary(Data) ->
+%% Data :: iolist()
+write(Transport, Data) ->
     Module = Transport#transport.module,
     Module:write(Transport#transport.data, Data).
 


Reply via email to