Author: dreiss
Date: Tue Jun 10 18:01:48 2008
New Revision: 666447
URL: http://svn.apache.org/viewvc?rev=666447&view=rev
Log:
Erlang: add framed_transport and non-strict binary_protocol
- thrift_client now takes as its fourth parameter Options: framed,
strict_{read,write}, connect_timeout (P.S. fourth param used to be Timeout)
- binary protocol now takes options: strict_{read,write}
- buffers in framed and buffered transport are now iolists and not reversed
lists of binaries
- rename buffer in buffered transport "write_buffer" to match framed transport
Added:
incubator/thrift/trunk/lib/alterl/src/thrift_framed_transport.erl
- copied, changed from r666446,
incubator/thrift/trunk/lib/alterl/src/thrift_buffered_transport.erl
Modified:
incubator/thrift/trunk/lib/alterl/src/thrift_binary_protocol.erl
incubator/thrift/trunk/lib/alterl/src/thrift_buffered_transport.erl
incubator/thrift/trunk/lib/alterl/src/thrift_client.erl
incubator/thrift/trunk/lib/alterl/src/thrift_socket_transport.erl
incubator/thrift/trunk/lib/alterl/src/thrift_transport.erl
Modified: incubator/thrift/trunk/lib/alterl/src/thrift_binary_protocol.erl
URL:
http://svn.apache.org/viewvc/incubator/thrift/trunk/lib/alterl/src/thrift_binary_protocol.erl?rev=666447&r1=666446&r2=666447&view=diff
==============================================================================
--- incubator/thrift/trunk/lib/alterl/src/thrift_binary_protocol.erl (original)
+++ incubator/thrift/trunk/lib/alterl/src/thrift_binary_protocol.erl Tue Jun 10
18:01:48 2008
@@ -11,22 +11,37 @@
-include("thrift_constants.hrl").
-include("thrift_protocol.hrl").
--export([new/1,
+-export([new/1, new/2,
read/2,
write/2,
flush_transport/1,
close_transport/1
-]).
-
--record(binary_protocol, {transport}).
+ ]).
+-record(binary_protocol, {transport,
+ strict_read=true,
+ strict_write=true
+ }).
-define(VERSION_MASK, 16#FFFF0000).
-define(VERSION_1, 16#80010000).
-
+-define(TYPE_MASK, 16#000000ff).
new(Transport) ->
- thrift_protocol:new(?MODULE, #binary_protocol{transport = Transport}).
+ new(Transport, _Options = []).
+
+new(Transport, Options) ->
+ State = #binary_protocol{transport = Transport},
+ State1 = parse_options(Options, State),
+ thrift_protocol:new(?MODULE, State1).
+
+parse_options([], State) ->
+ State;
+parse_options([{strict_read, Bool} | Rest], State) when is_boolean(Bool) ->
+ parse_options(Rest, State#binary_protocol{strict_read=Bool});
+parse_options([{strict_write, Bool} | Rest], State) when is_boolean(Bool) ->
+ parse_options(Rest, State#binary_protocol{strict_write=Bool}).
+
flush_transport(#binary_protocol{transport = Transport}) ->
thrift_transport:flush(Transport).
@@ -42,9 +57,16 @@
name = Name,
type = Type,
seqid = Seqid}) ->
- write(This, {i32, ?VERSION_1 bor Type}),
- write(This, {string, Name}),
- write(This, {i32, Seqid}),
+ case This#binary_protocol.strict_write of
+ true ->
+ write(This, {i32, ?VERSION_1 bor Type}),
+ write(This, {string, Name}),
+ write(This, {i32, Seqid});
+ false ->
+ write(This, {string, Name}),
+ write(This, {byte, Type}),
+ write(This, {i32, Seqid})
+ end,
ok;
write(This, message_end) -> ok;
@@ -121,20 +143,40 @@
write(This, {i32, size(Bin)}),
write(This, Bin);
-write(This, Binary) when is_binary(Binary) ->
- thrift_transport:write(This#binary_protocol.transport, Binary).
+%% Data :: iolist()
+write(This, Data) ->
+ thrift_transport:write(This#binary_protocol.transport, Data).
%%
read(This, message_begin) ->
case read(This, i32) of
- {ok, Version} when Version band ?VERSION_MASK == ?VERSION_1 ->
- Type = Version band 16#000000ff,
+ {ok, Sz} when Sz band ?VERSION_MASK =:= ?VERSION_1 ->
+ %% we're at version 1
{ok, Name} = read(This, string),
+ Type = Sz band ?TYPE_MASK,
+ {ok, SeqId} = read(This, i32),
+ #protocol_message_begin{name = binary_to_list(Name),
+ type = Type,
+ seqid = SeqId};
+
+ {ok, Sz} when Sz < 0 ->
+ %% there's a version number but it's unexpected
+ {error, {bad_binary_protocol_version, Sz}};
+
+ {ok, Sz} when This#binary_protocol.strict_read =:= true ->
+ %% strict_read is true and there's no version header; that's an
error
+ {error, no_binary_protocol_version};
+
+ {ok, Sz} when This#binary_protocol.strict_read =:= false ->
+ %% strict_read is false, so just read the old way
+ {ok, Name} = read(This, Sz),
+ {ok, Type} = read(This, byte),
{ok, SeqId} = read(This, i32),
#protocol_message_begin{name = binary_to_list(Name),
type = Type,
seqid = SeqId};
+
Err = {error, closed} -> Err;
Err = {error, ebadf} -> Err
end;
Modified: incubator/thrift/trunk/lib/alterl/src/thrift_buffered_transport.erl
URL:
http://svn.apache.org/viewvc/incubator/thrift/trunk/lib/alterl/src/thrift_buffered_transport.erl?rev=666447&r1=666446&r2=666447&view=diff
==============================================================================
--- incubator/thrift/trunk/lib/alterl/src/thrift_buffered_transport.erl
(original)
+++ incubator/thrift/trunk/lib/alterl/src/thrift_buffered_transport.erl Tue Jun
10 18:01:48 2008
@@ -21,11 +21,7 @@
-export([write/2, read/2, flush/1, close/1]).
-record(buffered_transport, {wrapped, % a thrift_transport
- buffer
- %% a list of binaries which will be concatenated
and sent during
- %% a flush.
- %%
- %% *** THIS LIST IS STORED IN REVERSE ORDER!!! ***
+ write_buffer % iolist()
}).
%%====================================================================
@@ -46,11 +42,11 @@
%%--------------------------------------------------------------------
%% Function: write(Transport, Data) -> ok
%%
-%% Data = binary()
+%% Data = iolist()
%%
%% Description: Writes data into the buffer
%%--------------------------------------------------------------------
-write(Transport, Data) when is_binary(Data) ->
+write(Transport, Data) ->
gen_server:call(Transport, {write, Data}).
%%--------------------------------------------------------------------
@@ -94,7 +90,7 @@
%% TODO(cpiro): need to trap exits here so when transport exits
%% normally from under our feet we exit normally
{ok, #buffered_transport{wrapped = Wrapped,
- buffer = []}}.
+ write_buffer = []}}.
%%--------------------------------------------------------------------
%% Function: %% handle_call(Request, From, State) -> {reply, Reply, State} |
@@ -105,19 +101,18 @@
%% {stop, Reason, State}
%% Description: Handling call messages
%%--------------------------------------------------------------------
-handle_call({write, Data}, _From, State = #buffered_transport{buffer =
Buffer}) ->
- {reply, ok, State#buffered_transport{buffer = [Data | Buffer]}};
+handle_call({write, Data}, _From, State = #buffered_transport{write_buffer =
WBuf}) ->
+ {reply, ok, State#buffered_transport{write_buffer = [WBuf, Data]}};
handle_call({read, Len}, _From, State = #buffered_transport{wrapped =
Wrapped}) ->
Response = thrift_transport:read(Wrapped, Len),
{reply, Response, State};
-handle_call(flush, _From, State = #buffered_transport{buffer = Buffer,
+handle_call(flush, _From, State = #buffered_transport{write_buffer = WBuf,
wrapped = Wrapped}) ->
- Concat = concat_binary(lists:reverse(Buffer)),
- Response = thrift_transport:write(Wrapped, Concat),
+ Response = thrift_transport:write(Wrapped, WBuf),
thrift_transport:flush(Wrapped),
- {reply, Response, State#buffered_transport{buffer = []}}.
+ {reply, Response, State#buffered_transport{write_buffer = []}}.
%%--------------------------------------------------------------------
%% Function: handle_cast(Msg, State) -> {noreply, State} |
@@ -125,9 +120,9 @@
%% {stop, Reason, State}
%% Description: Handling cast messages
%%--------------------------------------------------------------------
-handle_cast(close, State = #buffered_transport{buffer = Buffer,
+handle_cast(close, State = #buffered_transport{write_buffer = WBuf,
wrapped = Wrapped}) ->
- thrift_transport:write(Wrapped, concat_binary(lists:reverse(Buffer))),
+ thrift_transport:write(Wrapped, WBuf),
%% Wrapped is closed by terminate/2
%% error_logger:info_msg("thrift_buffered_transport ~p: closing",
[self()]),
{stop, normal, State};
Modified: incubator/thrift/trunk/lib/alterl/src/thrift_client.erl
URL:
http://svn.apache.org/viewvc/incubator/thrift/trunk/lib/alterl/src/thrift_client.erl?rev=666447&r1=666446&r2=666447&view=diff
==============================================================================
--- incubator/thrift/trunk/lib/alterl/src/thrift_client.erl (original)
+++ incubator/thrift/trunk/lib/alterl/src/thrift_client.erl Tue Jun 10 18:01:48
2008
@@ -20,7 +20,12 @@
-include("thrift_constants.hrl").
-include("thrift_protocol.hrl").
--record(state, {service, protocol, seqid}).
+-record(state, {service, protocol, seqid,
+ strict_read = true,
+ strict_write = true,
+ framed = false,
+ connect_timeout = infinity
+ }).
%%====================================================================
%% API
@@ -30,10 +35,10 @@
%% Description: Starts the server
%%--------------------------------------------------------------------
start_link(Host, Port, Service) ->
- start_link(Host, Port, Service, _Timeout = infinity).
+ start_link(Host, Port, Service, []).
-start_link(Host, Port, Service, Timeout) when is_integer(Port),
is_atom(Service) ->
- gen_server:start_link(?MODULE, [Host, Port, Service, Timeout], []).
+start_link(Host, Port, Service, Options) when is_integer(Port),
is_atom(Service), is_list(Options) ->
+ gen_server:start_link(?MODULE, [Host, Port, Service, Options], []).
call(Client, Function, Args)
when is_pid(Client), is_atom(Function), is_list(Args) ->
@@ -57,24 +62,41 @@
%% {stop, Reason}
%% Description: Initiates the server
%%--------------------------------------------------------------------
-init([Host, Port, Service]) ->
- init([Host, Port, Service, infinity]);
+init([Host, Port, Service, Options]) ->
+ State = parse_options(Options, #state{}),
-init([Host, Port, Service, Timeout]) ->
{ok, Sock} = gen_tcp:connect(Host, Port,
[binary,
{packet, 0},
{active, false},
{nodelay, true}
],
- Timeout),
+ State#state.connect_timeout),
- {ok, Transport} = thrift_socket_transport:new(Sock),
- {ok, BufTransport} = thrift_buffered_transport:new(Transport),
- {ok, Protocol} = thrift_binary_protocol:new(BufTransport),
- {ok, #state{service = Service,
- protocol = Protocol,
- seqid = 0}}.
+ {ok, Transport} = thrift_socket_transport:new(Sock),
+ {ok, BufTransport} =
+ case State#state.framed of
+ true -> thrift_framed_transport:new(Transport);
+ false -> thrift_buffered_transport:new(Transport)
+ end,
+ {ok, Protocol} = thrift_binary_protocol:new(BufTransport,
+ [{strict_read, State#state.strict_read},
+ {strict_write, State#state.strict_write}]),
+
+ {ok, State#state{service = Service,
+ protocol = Protocol,
+ seqid = 0}}.
+
+parse_options([], State) ->
+ State;
+parse_options([{strict_read, Bool} | Rest], State) when is_boolean(Bool) ->
+ parse_options(Rest, State#state{strict_read=Bool});
+parse_options([{strict_write, Bool} | Rest], State) when is_boolean(Bool) ->
+ parse_options(Rest, State#state{strict_write=Bool});
+parse_options([{framed, Bool} | Rest], State) when is_boolean(Bool) ->
+ parse_options(Rest, State#state{framed=Bool});
+parse_options([{connect_timeout, TO} | Rest], State) when TO =:= infinity;
is_integer(TO) ->
+ parse_options(Rest, State#state{connect_timeout=TO}).
%%--------------------------------------------------------------------
%% Function: %% handle_call(Request, From, State) -> {reply, Reply, State} |
Copied: incubator/thrift/trunk/lib/alterl/src/thrift_framed_transport.erl (from
r666446, incubator/thrift/trunk/lib/alterl/src/thrift_buffered_transport.erl)
URL:
http://svn.apache.org/viewvc/incubator/thrift/trunk/lib/alterl/src/thrift_framed_transport.erl?p2=incubator/thrift/trunk/lib/alterl/src/thrift_framed_transport.erl&p1=incubator/thrift/trunk/lib/alterl/src/thrift_buffered_transport.erl&r1=666446&r2=666447&rev=666447&view=diff
==============================================================================
--- incubator/thrift/trunk/lib/alterl/src/thrift_buffered_transport.erl
(original)
+++ incubator/thrift/trunk/lib/alterl/src/thrift_framed_transport.erl Tue Jun
10 18:01:48 2008
@@ -1,11 +1,11 @@
%%%-------------------------------------------------------------------
-%%% File : thrift_buffered_transport.erl
-%%% Author : <[EMAIL PROTECTED]>
-%%% Description : Buffered transport for thrift
+%%% File : thrift_framed_transport.erl
+%%% Author : <[EMAIL PROTECTED]>
+%%% Description : Framed transport for thrift
%%%
-%%% Created : 30 Jan 2008 by <[EMAIL PROTECTED]>
+%%% Created : 12 Mar 2008 by <[EMAIL PROTECTED]>
%%%-------------------------------------------------------------------
--module(thrift_buffered_transport).
+-module(thrift_framed_transport).
-behaviour(gen_server).
-behaviour(thrift_transport).
@@ -20,13 +20,10 @@
%% thrift_transport callbacks
-export([write/2, read/2, flush/1, close/1]).
--record(buffered_transport, {wrapped, % a thrift_transport
- buffer
- %% a list of binaries which will be concatenated
and sent during
- %% a flush.
- %%
- %% *** THIS LIST IS STORED IN REVERSE ORDER!!! ***
- }).
+-record(framed_transport, {wrapped, % a thrift_transport
+ read_buffer, % iolist()
+ write_buffer % iolist()
+ }).
%%====================================================================
%% API
@@ -46,11 +43,11 @@
%%--------------------------------------------------------------------
%% Function: write(Transport, Data) -> ok
%%
-%% Data = binary()
+%% Data = iolist()
%%
%% Description: Writes data into the buffer
%%--------------------------------------------------------------------
-write(Transport, Data) when is_binary(Data) ->
+write(Transport, Data) ->
gen_server:call(Transport, {write, Data}).
%%--------------------------------------------------------------------
@@ -93,8 +90,9 @@
init([Wrapped]) ->
%% TODO(cpiro): need to trap exits here so when transport exits
%% normally from under our feet we exit normally
- {ok, #buffered_transport{wrapped = Wrapped,
- buffer = []}}.
+ {ok, #framed_transport{wrapped = Wrapped,
+ read_buffer = [],
+ write_buffer = []}}.
%%--------------------------------------------------------------------
%% Function: %% handle_call(Request, From, State) -> {reply, Reply, State} |
@@ -105,19 +103,39 @@
%% {stop, Reason, State}
%% Description: Handling call messages
%%--------------------------------------------------------------------
-handle_call({write, Data}, _From, State = #buffered_transport{buffer =
Buffer}) ->
- {reply, ok, State#buffered_transport{buffer = [Data | Buffer]}};
+handle_call({write, Data}, _From, State = #framed_transport{write_buffer =
WBuf}) ->
+ {reply, ok, State#framed_transport{write_buffer = [WBuf, Data]}};
-handle_call({read, Len}, _From, State = #buffered_transport{wrapped =
Wrapped}) ->
- Response = thrift_transport:read(Wrapped, Len),
- {reply, Response, State};
-
-handle_call(flush, _From, State = #buffered_transport{buffer = Buffer,
- wrapped = Wrapped}) ->
- Concat = concat_binary(lists:reverse(Buffer)),
- Response = thrift_transport:write(Wrapped, Concat),
- thrift_transport:flush(Wrapped),
- {reply, Response, State#buffered_transport{buffer = []}}.
+handle_call({read, Len}, _From, State = #framed_transport{wrapped = Wrapped,
+ read_buffer = RBuf})
->
+ {RBuf1, RBuf1Size} =
+ %% if the read buffer is empty, read another frame
+ %% otherwise, just read from what's left in the buffer
+ case iolist_size(RBuf) of
+ 0 ->
+ %% read the frame length
+ {ok, <<FrameLen:32/integer-signed-big, _/binary>>} =
+ thrift_transport:read(Wrapped, 4),
+ %% then read the data
+ {ok, Bin} =
+ thrift_transport:read(Wrapped, FrameLen),
+ {Bin, size(Bin)};
+ Sz ->
+ {RBuf, Sz}
+ end,
+
+ %% pull off Give bytes, return them to the user, leave the rest in the
buffer
+ Give = min(RBuf1Size, Len),
+ <<Data:Give/binary, RBuf2/binary>> = iolist_to_binary(RBuf1),
+
+ Response = {ok, Data},
+ State1 = State#framed_transport{read_buffer=RBuf2},
+
+ {reply, Response, State1};
+
+handle_call(flush, _From, State) ->
+ {Response, State1} = do_flush(State),
+ {reply, Response, State1}.
%%--------------------------------------------------------------------
%% Function: handle_cast(Msg, State) -> {noreply, State} |
@@ -125,13 +143,12 @@
%% {stop, Reason, State}
%% Description: Handling cast messages
%%--------------------------------------------------------------------
-handle_cast(close, State = #buffered_transport{buffer = Buffer,
- wrapped = Wrapped}) ->
- thrift_transport:write(Wrapped, concat_binary(lists:reverse(Buffer))),
+handle_cast(close, State) ->
+ {_, State1} = do_flush(State),
%% Wrapped is closed by terminate/2
- %% error_logger:info_msg("thrift_buffered_transport ~p: closing",
[self()]),
+ %% error_logger:info_msg("thrift_framed_transport ~p: closing", [self()]),
{stop, normal, State};
-handle_cast(Msg, State=#buffered_transport{}) ->
+handle_cast(Msg, State=#framed_transport{}) ->
{noreply, State}.
%%--------------------------------------------------------------------
@@ -150,7 +167,7 @@
%% cleaning up. When it returns, the gen_server terminates with Reason.
%% The return value is ignored.
%%--------------------------------------------------------------------
-terminate(_Reason, State = #buffered_transport{wrapped=Wrapped}) ->
+terminate(_Reason, State = #framed_transport{wrapped=Wrapped}) ->
thrift_transport:close(Wrapped),
ok.
@@ -164,3 +181,18 @@
%%--------------------------------------------------------------------
%%% Internal functions
%%--------------------------------------------------------------------
+do_flush(State = #framed_transport{write_buffer = Buffer,
+ wrapped = Wrapped}) ->
+ FrameLen = iolist_size(Buffer),
+ Data = [<<FrameLen:32/integer-signed-big>>, Buffer],
+
+ Response = thrift_transport:write(Wrapped, Data),
+
+ thrift_transport:flush(Wrapped),
+
+ State1 = State#framed_transport{write_buffer = []},
+ {Response, State1}.
+
+min(A,B) when A<B -> A;
+min(_,B) -> B.
+
Modified: incubator/thrift/trunk/lib/alterl/src/thrift_socket_transport.erl
URL:
http://svn.apache.org/viewvc/incubator/thrift/trunk/lib/alterl/src/thrift_socket_transport.erl?rev=666447&r1=666446&r2=666447&view=diff
==============================================================================
--- incubator/thrift/trunk/lib/alterl/src/thrift_socket_transport.erl (original)
+++ incubator/thrift/trunk/lib/alterl/src/thrift_socket_transport.erl Tue Jun
10 18:01:48 2008
@@ -23,8 +23,8 @@
end,
thrift_transport:new(?MODULE, State).
-write(#data{socket = Socket}, Data)
- when is_binary(Data) ->
+%% Data :: iolist()
+write(#data{socket = Socket}, Data) ->
gen_tcp:send(Socket, Data).
read(#data{socket=Socket, recv_timeout=Timeout}, Len)
Modified: incubator/thrift/trunk/lib/alterl/src/thrift_transport.erl
URL:
http://svn.apache.org/viewvc/incubator/thrift/trunk/lib/alterl/src/thrift_transport.erl?rev=666447&r1=666446&r2=666447&view=diff
==============================================================================
--- incubator/thrift/trunk/lib/alterl/src/thrift_transport.erl (original)
+++ incubator/thrift/trunk/lib/alterl/src/thrift_transport.erl Tue Jun 10
18:01:48 2008
@@ -22,7 +22,8 @@
{ok, #transport{module = Module,
data = Data}}.
-write(Transport, Data) when is_binary(Data) ->
+%% Data :: iolist()
+write(Transport, Data) ->
Module = Transport#transport.module,
Module:write(Transport#transport.data, Data).