This is an automated email from the ASF dual-hosted git repository. dlive pushed a commit to branch 0.4.0 in repository https://gitbox.apache.org/repos/asf/dubbo-erlang.git
commit 1f64b4b0ca14f3178e1b590b1dd6611cf92f81ab Author: DLive <xsxgm...@163.com> AuthorDate: Fri Jun 21 10:28:25 2019 +0800 dev client pool --- include/dubbo.hrl | 2 +- ...o_netty_client.erl => dubbo_client_default.erl} | 21 +- src/dubbo_consumer_pool.erl | 305 -------------------- .../dubbo_exchanger.erl | 35 ++- src/dubbo_invoker_old.erl | 2 +- src/dubbo_netty_client.erl | 2 +- src/dubbo_protocol_dubbo.erl | 54 +++- src/dubbo_provider_consumer_reg_table.erl | 320 ++++++++++++++++++++- src/dubbo_registry_zookeeper.erl | 2 +- ...r_pool_sup.erl => dubbo_transport_pool_sup.erl} | 13 +- src/dubbo_zookeeper.erl | 2 +- src/dubboerl_sup.erl | 4 +- test/dubbo_consumer_pool_tests.erl | 6 +- 13 files changed, 415 insertions(+), 353 deletions(-) diff --git a/include/dubbo.hrl b/include/dubbo.hrl index ad2277a..727c71e 100644 --- a/include/dubbo.hrl +++ b/include/dubbo.hrl @@ -102,7 +102,7 @@ -record(interface_list, {interface, pid, connection_info}). --record(provider_node_list, {host_flag, connection_info}). +-record(provider_node_list, {host_flag, pid, weight, readonly = false}). -record(connection_info, {connection_id, pid, weight, host_flag, readonly = false}). -type dubbo_request() :: #dubbo_request{}. diff --git a/src/dubbo_netty_client.erl b/src/dubbo_client_default.erl similarity index 97% copy from src/dubbo_netty_client.erl copy to src/dubbo_client_default.erl index 0181d33..220c30f 100644 --- a/src/dubbo_netty_client.erl +++ b/src/dubbo_client_default.erl @@ -14,13 +14,12 @@ %% See the License for the specific language governing permissions and %% limitations under the License. %%------------------------------------------------------------------------------ --module(dubbo_netty_client). +-module(dubbo_client_default). -behaviour(gen_server). + -include("dubbo.hrl"). -%% API --export([start_link/4]). %% gen_server callbacks -export([init/1, @@ -29,6 +28,8 @@ handle_info/2, terminate/2, code_change/3]). +-export([start_link/1]). + -export([check_recv_data/2]). -define(SERVER, ?MODULE). @@ -38,7 +39,8 @@ heartbeat = #heartbeat{}, recv_buffer = <<>>, %%从服务端接收的数据 host_flag, - reconnection_timer + reconnection_timer, + handler }). %%%=================================================================== @@ -51,10 +53,10 @@ %% %% @end %%-------------------------------------------------------------------- --spec(start_link(Name :: binary(), HostFlag :: binary(), ProviderConfig :: #provider_config{}, integer()) -> +-spec(start_link(Name :: binary(), ProviderConfig :: #provider_config{}) -> {ok, Pid :: pid()} | ignore | {error, Reason :: term()}). -start_link(Name, HostFlag, ProviderConfig, Index) -> - gen_server:start_link({local, Name}, ?MODULE, [HostFlag, ProviderConfig, Index], []). +start_link(ProviderConfig) -> + gen_server:start_link(?MODULE, [ProviderConfig], []). %%%=================================================================== %%% gen_server callbacks @@ -74,8 +76,7 @@ start_link(Name, HostFlag, ProviderConfig, Index) -> -spec(init(Args :: term()) -> {ok, State :: #state{}} | {ok, State :: #state{}, timeout() | hibernate} | {stop, Reason :: term()} | ignore). -init([HostFlag, ProviderConfig, Index]) -> - erlang:process_flag(min_bin_vheap_size, 1024 * 1024), +init([HostFlag, ProviderConfig]) -> #provider_config{host = Host, port = Port} = ProviderConfig, State = case open(Host, Port) of {ok, Socket} -> @@ -415,7 +416,7 @@ process_response(true, _ResponseInfo, _RestData, State) -> {ok, State}. process_request(true, #dubbo_request{data = <<"R">>}, State) -> - {ok, _} = dubbo_consumer_pool:update_connection_readonly(self(), true), + {ok, _} = dubbo_provider_consumer_reg_table:update_connection_readonly(self(), true), {ok, State}; process_request(true, Request, State) -> {ok, NewState} = send_heartbeat_msg(Request#dubbo_request.mid, false, State), diff --git a/src/dubbo_consumer_pool.erl b/src/dubbo_consumer_pool.erl deleted file mode 100644 index 0a01d38..0000000 --- a/src/dubbo_consumer_pool.erl +++ /dev/null @@ -1,305 +0,0 @@ -%%------------------------------------------------------------------------------ -%% Licensed to the Apache Software Foundation (ASF) under one or more -%% contributor license agreements. See the NOTICE file distributed with -%% this work for additional information regarding copyright ownership. -%% The ASF licenses this file to You under the Apache License, Version 2.0 -%% (the "License"); you may not use this file except in compliance with -%% the License. You may obtain a copy of the License at -%% -%% http://www.apache.org/licenses/LICENSE-2.0 -%% -%% Unless required by applicable law or agreed to in writing, software -%% distributed under the License is distributed on an "AS IS" BASIS, -%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -%% See the License for the specific language governing permissions and -%% limitations under the License. -%%------------------------------------------------------------------------------ --module(dubbo_consumer_pool). - --behaviour(gen_server). - -%% API --export([start_link/0, start_consumer/2]). - -%% gen_server callbacks --export([init/1, - handle_call/3, - handle_cast/2, - handle_info/2, - terminate/2, - code_change/3]). - --export([select_connection/1, select_connection/2, update_connection_readonly/2]). - --include("dubbo.hrl"). --define(SERVER, ?MODULE). - --define(INTERFCE_LIST_TABLE, interface_list). --define(PROVIDER_NODE_LIST_TABLE, provider_node_list). - --record(state, {}). - --ifdef(TEST). --compile([export_all]). --endif. - - -%%%=================================================================== -%%% API -%%%=================================================================== - -%%-------------------------------------------------------------------- -%% @doc -%% Starts the server -%% -%% @end -%%-------------------------------------------------------------------- --spec(start_link() -> - {ok, Pid :: pid()} | ignore | {error, Reason :: term()}). -start_link() -> - gen_server:start_link({local, ?SERVER}, ?MODULE, [], []). - -%%%=================================================================== -%%% gen_server callbacks -%%%=================================================================== - -%%-------------------------------------------------------------------- -%% @private -%% @doc -%% Initializes the server -%% -%% @spec init(Args) -> {ok, State} | -%% {ok, State, Timeout} | -%% ignore | -%% {stop, Reason} -%% @end -%%-------------------------------------------------------------------- --spec(init(Args :: term()) -> - {ok, State :: #state{}} | {ok, State :: #state{}, timeout() | hibernate} | - {stop, Reason :: term()} | ignore). -init([]) -> - init_ets_table(), - {ok, #state{}}. -init_ets_table() -> - try ets:new(?INTERFCE_LIST_TABLE, [bag, public, named_table, {keypos, 2}]) of - ?INTERFCE_LIST_TABLE -> - ok - catch - _Type:Reason -> - logger:error("new ets table error ~p", [Reason]), - error - end, - try ets:new(?PROVIDER_NODE_LIST_TABLE, [bag, public, named_table, {keypos, 2}]) of - ?PROVIDER_NODE_LIST_TABLE -> - ok - catch - _Type1:Reason1 -> - logger:error("new ets table error ~p", [Reason1]), - error - end, - ok. -%%-------------------------------------------------------------------- -%% @private -%% @doc -%% Handling call messages -%% -%% @end -%%-------------------------------------------------------------------- --spec(handle_call(Request :: term(), From :: {pid(), Tag :: term()}, - State :: #state{}) -> - {reply, Reply :: term(), NewState :: #state{}} | - {reply, Reply :: term(), NewState :: #state{}, timeout() | hibernate} | - {noreply, NewState :: #state{}} | - {noreply, NewState :: #state{}, timeout() | hibernate} | - {stop, Reason :: term(), Reply :: term(), NewState :: #state{}} | - {stop, Reason :: term(), NewState :: #state{}}). - -handle_call({add_consumer, Interface, ProviderNodeList}, _From, State) -> - - OldProviderList = get_interface_provider_node(Interface), - NewProviderList = add_consumer(ProviderNodeList, []), - DeleteProverList = OldProviderList -- NewProviderList, - clean_invalid_provider(DeleteProverList), - {reply, ok, State}; -handle_call(_Request, _From, State) -> - {reply, ok, State}. - -%%-------------------------------------------------------------------- -%% @private -%% @doc -%% Handling cast messages -%% -%% @end -%%-------------------------------------------------------------------- --spec(handle_cast(Request :: term(), State :: #state{}) -> - {noreply, NewState :: #state{}} | - {noreply, NewState :: #state{}, timeout() | hibernate} | - {stop, Reason :: term(), NewState :: #state{}}). -handle_cast(_Request, State) -> - {noreply, State}. - -%%-------------------------------------------------------------------- -%% @private -%% @doc -%% Handling all non call/cast messages -%% -%% @spec handle_info(Info, State) -> {noreply, State} | -%% {noreply, State, Timeout} | -%% {stop, Reason, State} -%% @end -%%-------------------------------------------------------------------- --spec(handle_info(Info :: timeout() | term(), State :: #state{}) -> - {noreply, NewState :: #state{}} | - {noreply, NewState :: #state{}, timeout() | hibernate} | - {stop, Reason :: term(), NewState :: #state{}}). -handle_info(_Info, State) -> - {noreply, State}. - -%%-------------------------------------------------------------------- -%% @private -%% @doc -%% This function is called by a gen_server when it is about to -%% terminate. It should be the opposite of Module:init/1 and do any -%% necessary cleaning up. When it returns, the gen_server terminates -%% with Reason. The return value is ignored. -%% -%% @spec terminate(Reason, State) -> void() -%% @end -%%-------------------------------------------------------------------- --spec(terminate(Reason :: (normal | shutdown | {shutdown, term()} | term()), - State :: #state{}) -> term()). -terminate(_Reason, _State) -> - ok. - -%%-------------------------------------------------------------------- -%% @private -%% @doc -%% Convert process state when code is changed -%% -%% @spec code_change(OldVsn, State, Extra) -> {ok, NewState} -%% @end -%%-------------------------------------------------------------------- --spec(code_change(OldVsn :: term() | {down, term()}, State :: #state{}, - Extra :: term()) -> - {ok, NewState :: #state{}} | {error, Reason :: term()}). -code_change(_OldVsn, State, _Extra) -> - {ok, State}. - -start_consumer(Interface, ProviderNodeInfo) -> - gen_server:call(?SERVER, {add_consumer, Interface, ProviderNodeInfo}). - - -%%%=================================================================== -%%% Internal functions -%%%=================================================================== -add_consumer([], RegisterList) -> - RegisterList; -add_consumer([ProviderNodeInfo | ProviderList], RegisterList) -> - case dubbo_node_config_util:parse_provider_info(ProviderNodeInfo) of - {ok, ProviderConfig} -> - HostFlag = get_host_flag(ProviderConfig), - case ets:lookup(?PROVIDER_NODE_LIST_TABLE, HostFlag) of - [] -> - ConnectionList = start_provider_process(HostFlag, 30, ProviderConfig), - ok = update_connection_info(ProviderConfig#provider_config.interface, HostFlag, ConnectionList, true), - ok; - List -> - List2 = lists:map(fun(#provider_node_list{connection_info = ConnectionItem}) -> - ConnectionItem - end, List), - ok = update_connection_info(ProviderConfig#provider_config.interface, HostFlag, List2, false), - ok - end, - add_consumer(ProviderList, [HostFlag] ++ RegisterList); - {error, R1} -> - logger:error("parse provider info error reason ~p", [R1]), - add_consumer(ProviderList, RegisterList) - end. - -start_provider_process(HostFlag, Weight, ProviderConfig) -> - ExecutesList = lists:seq(1, ProviderConfig#provider_config.executes), - ConnectionList = lists:map(fun(Item) -> - ConnectionFlag = <<HostFlag/binary, (integer_to_binary(Item))/binary>>, - ConnectionFlagTerm = binary_to_atom(ConnectionFlag, utf8), - AChild = {ConnectionFlagTerm, {dubbo_netty_client, start_link, [ConnectionFlagTerm, HostFlag, ProviderConfig, Item]}, permanent, 2000, worker, [dubbo_netty_client]}, - {ok, Pid} = dubbo_consumer_pool_sup:add_children(AChild), - logger:info("start provider ~p pid info ~p~n", [HostFlag, Pid]), - #connection_info{connection_id = ConnectionFlagTerm, pid = Pid, weight = Weight, host_flag = HostFlag} - end, ExecutesList), - ConnectionList. -get_host_flag(ProviderConfig) -> - HostFlag = <<(list_to_binary(ProviderConfig#provider_config.host))/binary, <<"_">>/binary, (integer_to_binary(ProviderConfig#provider_config.port))/binary>>, - HostFlag. - -update_connection_info(Interface, HostFlag, ConnectionList, IsUpdateProvideNode) -> - lists:map(fun(Item) -> - I1 = ets:insert(?INTERFCE_LIST_TABLE, #interface_list{interface = Interface, pid = Item#connection_info.pid, connection_info = Item}), - logger:debug("insert interface conection info ~p ~p ~p", [Interface, Item#connection_info.pid, I1]), - case IsUpdateProvideNode of - true -> - I2 = ets:insert(?PROVIDER_NODE_LIST_TABLE, #provider_node_list{host_flag = HostFlag, connection_info = Item}), - logger:debug("insert PROVIDER_NODE_LIST_TABLE ~p info:~p", [HostFlag, I2]); - false -> - ok - end, - ok - end, ConnectionList), - ok. - -get_interface_provider_node(Interface) -> - case ets:lookup(?INTERFCE_LIST_TABLE, Interface) of - [] -> - []; - List -> - ListRet = [Item#interface_list.connection_info#connection_info.host_flag || Item <- List], - dubbo_lists_util:del_duplicate(ListRet) - end. - -select_connection(Interface) -> - RandNum = rand:uniform(2048), - select_connection(Interface, RandNum). -select_connection(Interface, RandNum) -> - case ets:lookup(?INTERFCE_LIST_TABLE, Interface) of - [] -> - {error, none}; - List -> - Len = length(List), - RemNum = (RandNum rem Len) + 1, - InterfaceListItem = lists:nth(RemNum, List), - {ok, InterfaceListItem#interface_list.connection_info} - end. - --spec(update_connection_readonly(pid(), boolean()) -> ok). -update_connection_readonly(ConnectionPid, Readonly) -> - Pattern = #interface_list{pid = ConnectionPid, _ = '_'}, - Objects = ets:match_object(?INTERFCE_LIST_TABLE, Pattern), - lists:map(fun(#interface_list{interface = Interface, pid = Pid, connection_info = ConnectionInfo} = InterferConnection) -> - logger:debug("[dubbo] update interface ~p ~p readonly", [Interface, Pid]), - NewConnectionInfo = ConnectionInfo#connection_info{readonly = Readonly}, - NewObject = InterferConnection#interface_list{connection_info = NewConnectionInfo}, - ets:delete_object(?INTERFCE_LIST_TABLE, InterferConnection), - ets:insert(?INTERFCE_LIST_TABLE, NewObject) - end, Objects), - {ok, length(Objects)}. - -clean_invalid_provider([]) -> - ok; -clean_invalid_provider([HostFlag | DeleteProverList]) -> - case ets:lookup(?PROVIDER_NODE_LIST_TABLE, HostFlag) of - [] -> - ok; - ProviderNodeList -> - ProviderNodeList1 = dubbo_lists_util:del_duplicate(ProviderNodeList), - clean_connection_info(ProviderNodeList1) - end, - clean_invalid_provider(DeleteProverList). - -clean_connection_info(ProviderNodeList) -> - lists:map(fun(Item) -> - Pid = Item#provider_node_list.connection_info#connection_info.pid, - ConnectionId = Item#provider_node_list.connection_info#connection_info.connection_id, - Pattern = #interface_list{pid = Pid, _ = '_'}, - ets:delete_object(?INTERFCE_LIST_TABLE, Pattern), - dubbo_consumer_pool_sup:stop_children(ConnectionId) - end, ProviderNodeList), - ok. \ No newline at end of file diff --git a/test/dubbo_consumer_pool_tests.erl b/src/dubbo_exchanger.erl similarity index 57% copy from test/dubbo_consumer_pool_tests.erl copy to src/dubbo_exchanger.erl index 740ed84..09a4833 100644 --- a/test/dubbo_consumer_pool_tests.erl +++ b/src/dubbo_exchanger.erl @@ -14,20 +14,27 @@ %% See the License for the specific language governing permissions and %% limitations under the License. %%------------------------------------------------------------------------------ --module(dubbo_consumer_pool_tests). --author("dlive"). +-module(dubbo_exchanger). --include_lib("eunit/include/eunit.hrl"). -include("dubbo.hrl"). -update_readonly_test() -> - dubbo_consumer_pool:start_link(), - InterfaceName= <<"testinterfacename">>, - HostFalg= <<"127.0.0.1/20880">>, - ConnectionList = [ - #connection_info{connection_id=1,pid= testpid,weight = 30,host_flag = HostFalg}, - #connection_info{connection_id=2,pid= testpid2,weight = 30,host_flag = HostFalg} - ], - dubbo_consumer_pool:update_connection_info(InterfaceName,HostFalg,ConnectionList,true), - {ok,Size} = dubbo_consumer_pool:update_connection_readonly(testpid,false), - ?assertEqual(1,Size). +%% API +-export([connect/2]). + +connect(Url,Handler) -> + case dubbo_node_config_util:parse_provider_info(Url) of + {ok, ProviderConfig} -> + HostFlag= dubbo_provider_consumer_reg_table:get_host_flag(ProviderConfig), + {ok, Pid} = dubbo_transport_pool_sup:add_children(ProviderConfig,Handler), + logger:info("start provider ~p pid info ~p~n", [HostFlag, Pid]), + {ok,#connection_info{ pid = Pid, weight = get_weight(ProviderConfig), host_flag = HostFlag}}; + {error, R1} -> + logger:error("parse provider info error reason ~p", [R1]), + {error,R1} + end. + + + +get_weight(_ProviderConfig)-> + %% todo get weight from provider info + 30. \ No newline at end of file diff --git a/src/dubbo_invoker_old.erl b/src/dubbo_invoker_old.erl index c878656..354cef9 100644 --- a/src/dubbo_invoker_old.erl +++ b/src/dubbo_invoker_old.erl @@ -40,7 +40,7 @@ invoke_request(Interface, Request, RequestOption) -> {ok, reference(), Data :: any(), RpcContent :: list()}| {error, Reason :: timeout|no_provider|request_full|any()}. invoke_request(Interface, Request, RpcContext, RequestState, CallBackPid) -> - case dubbo_consumer_pool:select_connection(Interface, Request#dubbo_request.mid) of + case dubbo_provider_consumer_reg_table:select_connection(Interface, Request#dubbo_request.mid) of {ok, #connection_info{pid = Pid, host_flag = HostFlag}} -> case dubbo_traffic_control:check_goon(HostFlag, 199) of ok -> diff --git a/src/dubbo_netty_client.erl b/src/dubbo_netty_client.erl index 0181d33..06f9d1e 100644 --- a/src/dubbo_netty_client.erl +++ b/src/dubbo_netty_client.erl @@ -415,7 +415,7 @@ process_response(true, _ResponseInfo, _RestData, State) -> {ok, State}. process_request(true, #dubbo_request{data = <<"R">>}, State) -> - {ok, _} = dubbo_consumer_pool:update_connection_readonly(self(), true), + {ok, _} = dubbo_provider_consumer_reg_table:update_connection_readonly(self(), true), {ok, State}; process_request(true, Request, State) -> {ok, NewState} = send_heartbeat_msg(Request#dubbo_request.mid, false, State), diff --git a/src/dubbo_protocol_dubbo.erl b/src/dubbo_protocol_dubbo.erl index 06c36e6..1ede1d8 100644 --- a/src/dubbo_protocol_dubbo.erl +++ b/src/dubbo_protocol_dubbo.erl @@ -17,19 +17,61 @@ -module(dubbo_protocol_dubbo). -include("dubboerl.hrl"). +-include("dubbo.hrl"). %% API -export([refer/2]). -refer(Url,Acc)-> - {ok,UrlInfo} = dubbo_common_fun:parse_url(Url), +refer(Url, Acc) -> + {ok, UrlInfo} = dubbo_common_fun:parse_url(Url), case UrlInfo#dubbo_url.scheme of <<"dubbo">> -> - {ok,todo}; + do_refer(UrlInfo), + {ok, todo}; _ -> - {skip,Acc} + {skip, Acc} end. -do_refer(UrlInfo)-> +do_refer(UrlInfo) -> + + ok. + + +getClients(ProviderUrl) -> + case new_transport(ProviderUrl) of + {ok,ConnectionInfoList} -> + ConnectionList = start_provider_process(HostFlag, 30, ProviderConfig), + ok; + {error,Reason} -> + {error,Reason} + end. + + + +%%ok = update_connection_info(ProviderConfig#provider_config.interface, HostFlag, ConnectionList, true), + + +new_transport(ProviderUrl)-> + case dubbo_node_config_util:parse_provider_info(ProviderUrl) of + {ok, ProviderConfig} -> + HostFlag = get_host_flag(ProviderConfig), + case dubbo_provider_consumer_reg_table:get_host_connections(ProviderConfig#provider_config) of + [] -> + case dubbo_exchanger:connect(ProviderUrl,?MODULE) of + {ok,ConnectionInfo} -> + {ok,[ConnectionInfo]}; + {error,Reason} -> + logger:warning("start client fail ~p ~p",[Reason,HostFlag]), + {error,Reason} + end; + ConnectionInfoList -> + {ok,ConnectionInfoList} + end; + {error, R1} -> + logger:error("parse provider info error reason ~p", [R1]), + {error,R1} + end. + + + - ok. \ No newline at end of file diff --git a/src/dubbo_provider_consumer_reg_table.erl b/src/dubbo_provider_consumer_reg_table.erl index 3386cdc..c7a8dfa 100644 --- a/src/dubbo_provider_consumer_reg_table.erl +++ b/src/dubbo_provider_consumer_reg_table.erl @@ -15,7 +15,323 @@ %% limitations under the License. %%------------------------------------------------------------------------------ -module(dubbo_provider_consumer_reg_table). --author("dlive"). + +-behaviour(gen_server). %% API --export([]). +-export([start_link/0, start_consumer/2]). + +%% gen_server callbacks +-export([init/1, + handle_call/3, + handle_cast/2, + handle_info/2, + terminate/2, + code_change/3]). + +-export([update_consumer_connections/2,get_host_connections/2, select_connection/1, select_connection/2, update_connection_readonly/2, get_host_flag/1, get_host_flag/2]). + +-include("dubbo.hrl"). +-define(SERVER, ?MODULE). + +-define(INTERFCE_LIST_TABLE, interface_list). +-define(PROVIDER_NODE_LIST_TABLE, provider_node_list). + +-record(state, {}). + +-ifdef(TEST). +-compile([export_all]). +-endif. + + +%%%=================================================================== +%%% API +%%%=================================================================== + +%%-------------------------------------------------------------------- +%% @doc +%% Starts the server +%% +%% @end +%%-------------------------------------------------------------------- +-spec(start_link() -> + {ok, Pid :: pid()} | ignore | {error, Reason :: term()}). +start_link() -> + gen_server:start_link({local, ?SERVER}, ?MODULE, [], []). + +%%%=================================================================== +%%% gen_server callbacks +%%%=================================================================== + +%%-------------------------------------------------------------------- +%% @private +%% @doc +%% Initializes the server +%% +%% @spec init(Args) -> {ok, State} | +%% {ok, State, Timeout} | +%% ignore | +%% {stop, Reason} +%% @end +%%-------------------------------------------------------------------- +-spec(init(Args :: term()) -> + {ok, State :: #state{}} | {ok, State :: #state{}, timeout() | hibernate} | + {stop, Reason :: term()} | ignore). +init([]) -> + init_ets_table(), + {ok, #state{}}. +init_ets_table() -> + try ets:new(?INTERFCE_LIST_TABLE, [bag, public, named_table, {keypos, 2}]) of + ?INTERFCE_LIST_TABLE -> + ok + catch + _Type:Reason -> + logger:error("new ets table error ~p", [Reason]), + error + end, + try ets:new(?PROVIDER_NODE_LIST_TABLE, [bag, public, named_table, {keypos, 2}]) of + ?PROVIDER_NODE_LIST_TABLE -> + ok + catch + _Type1:Reason1 -> + logger:error("new ets table error ~p", [Reason1]), + error + end, + ok. +%%-------------------------------------------------------------------- +%% @private +%% @doc +%% Handling call messages +%% +%% @end +%%-------------------------------------------------------------------- +-spec(handle_call(Request :: term(), From :: {pid(), Tag :: term()}, + State :: #state{}) -> + {reply, Reply :: term(), NewState :: #state{}} | + {reply, Reply :: term(), NewState :: #state{}, timeout() | hibernate} | + {noreply, NewState :: #state{}} | + {noreply, NewState :: #state{}, timeout() | hibernate} | + {stop, Reason :: term(), Reply :: term(), NewState :: #state{}} | + {stop, Reason :: term(), NewState :: #state{}}). + +handle_call({add_consumer, Interface, ProviderNodeList}, _From, State) -> + + OldProviderList = get_interface_provider_node(Interface), + NewProviderList = add_consumer(ProviderNodeList, []), + DeleteProverList = OldProviderList -- NewProviderList, + clean_invalid_provider(DeleteProverList), + {reply, ok, State}; +handle_call(_Request, _From, State) -> + {reply, ok, State}. + +%%-------------------------------------------------------------------- +%% @private +%% @doc +%% Handling cast messages +%% +%% @end +%%-------------------------------------------------------------------- +-spec(handle_cast(Request :: term(), State :: #state{}) -> + {noreply, NewState :: #state{}} | + {noreply, NewState :: #state{}, timeout() | hibernate} | + {stop, Reason :: term(), NewState :: #state{}}). +handle_cast(_Request, State) -> + {noreply, State}. + +%%-------------------------------------------------------------------- +%% @private +%% @doc +%% Handling all non call/cast messages +%% +%% @spec handle_info(Info, State) -> {noreply, State} | +%% {noreply, State, Timeout} | +%% {stop, Reason, State} +%% @end +%%-------------------------------------------------------------------- +-spec(handle_info(Info :: timeout() | term(), State :: #state{}) -> + {noreply, NewState :: #state{}} | + {noreply, NewState :: #state{}, timeout() | hibernate} | + {stop, Reason :: term(), NewState :: #state{}}). +handle_info(_Info, State) -> + {noreply, State}. + +%%-------------------------------------------------------------------- +%% @private +%% @doc +%% This function is called by a gen_server when it is about to +%% terminate. It should be the opposite of Module:init/1 and do any +%% necessary cleaning up. When it returns, the gen_server terminates +%% with Reason. The return value is ignored. +%% +%% @spec terminate(Reason, State) -> void() +%% @end +%%-------------------------------------------------------------------- +-spec(terminate(Reason :: (normal | shutdown | {shutdown, term()} | term()), + State :: #state{}) -> term()). +terminate(_Reason, _State) -> + ok. + +%%-------------------------------------------------------------------- +%% @private +%% @doc +%% Convert process state when code is changed +%% +%% @spec code_change(OldVsn, State, Extra) -> {ok, NewState} +%% @end +%%-------------------------------------------------------------------- +-spec(code_change(OldVsn :: term() | {down, term()}, State :: #state{}, + Extra :: term()) -> + {ok, NewState :: #state{}} | {error, Reason :: term()}). +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +start_consumer(Interface, ProviderNodeInfo) -> + gen_server:call(?SERVER, {add_consumer, Interface, ProviderNodeInfo}). + + + +get_host_connections(Host, Port) -> + HostFlag = get_host_flag(Host, Port), + List = ets:lookup(?PROVIDER_NODE_LIST_TABLE, HostFlag), + List2 = lists:map( + fun(#provider_node_list{host_flag = HostFlag,pid = Pid,readonly = Readonly}) -> + #connection_info{host_flag = HostFlag,pid = Pid,readonly = Readonly} + end, List), + List2. + + +%%%=================================================================== +%%% Internal functions +%%%=================================================================== +%%add_consumer([], RegisterList) -> +%% RegisterList; +%%add_consumer([ProviderNodeInfo | ProviderList], RegisterList) -> +%% case dubbo_node_config_util:parse_provider_info(ProviderNodeInfo) of +%% {ok, ProviderConfig} -> +%% HostFlag = get_host_flag(ProviderConfig), +%% case ets:lookup(?PROVIDER_NODE_LIST_TABLE, HostFlag) of +%% [] -> +%% ConnectionList = start_provider_process(HostFlag, 30, ProviderConfig), +%% ok = update_connection_info(ProviderConfig#provider_config.interface, HostFlag, ConnectionList, true), +%% ok; +%% List -> +%% List2 = lists:map(fun(#provider_node_list{connection_info = ConnectionItem}) -> +%% ConnectionItem +%% end, List), +%% ok = update_connection_info(ProviderConfig#provider_config.interface, HostFlag, List2, false), +%% ok +%% end, +%% add_consumer(ProviderList, [HostFlag] ++ RegisterList); +%% {error, R1} -> +%% logger:error("parse provider info error reason ~p", [R1]), +%% add_consumer(ProviderList, RegisterList) +%% end. +%% +%%start_provider_process(HostFlag, Weight, ProviderConfig) -> +%% ExecutesList = lists:seq(1, ProviderConfig#provider_config.executes), +%% ConnectionList = lists:map(fun(Item) -> +%% ConnectionFlag = <<HostFlag/binary, (integer_to_binary(Item))/binary>>, +%% ConnectionFlagTerm = binary_to_atom(ConnectionFlag, utf8), +%% AChild = {ConnectionFlagTerm, {dubbo_netty_client, start_link, [ConnectionFlagTerm, HostFlag, ProviderConfig, Item]}, permanent, 2000, worker, [dubbo_netty_client]}, +%% {ok, Pid} = dubbo_transport_pool_sup:add_children(AChild), +%% logger:info("start provider ~p pid info ~p~n", [HostFlag, Pid]), +%% #connection_info{connection_id = ConnectionFlagTerm, pid = Pid, weight = Weight, host_flag = HostFlag} +%% end, ExecutesList), +%% ConnectionList. + +update_consumer_connections(Interface, Connections) -> + lists:map( + fun(Item) -> + HostFlag= Item#connection_info.host_flag, + + case ets:lookup_element(?PROVIDER_NODE_LIST_TABLE,#provider_node_list{host_flag = HostFlag,pid = Item#connection_info.pid,_="_"}) of + '$end_of_table' -> + I2 = ets:insert(?PROVIDER_NODE_LIST_TABLE, #provider_node_list{host_flag = HostFlag,pid = Item#connection_info.pid}), + logger:debug("insert PROVIDER_NODE_LIST_TABLE ~p info:~p", [HostFlag, I2]); + {_ObjectList,_Continuation} -> + ok + end, + I1 = ets:insert(?INTERFCE_LIST_TABLE, #interface_list{interface = Interface, pid = Item#connection_info.pid, connection_info = Item}), + logger:debug("insert interface conection info ~p ~p ~p", [Interface, Item#connection_info.pid, I1]), + ok + end, Connections), + ok. + +get_host_flag(ProviderConfig) -> + HostFlag = <<(list_to_binary(ProviderConfig#provider_config.host))/binary, <<"_">>/binary, (integer_to_binary(ProviderConfig#provider_config.port))/binary>>, + HostFlag. +get_host_flag(Host, Port) -> + <<(list_to_binary(Host))/binary, <<"_">>/binary, (integer_to_binary(Port))/binary>>. + +update_connection_info(Interface, HostFlag, ConnectionList, IsUpdateProvideNode) -> + lists:map(fun(Item) -> + I1 = ets:insert(?INTERFCE_LIST_TABLE, #interface_list{interface = Interface, pid = Item#connection_info.pid, connection_info = Item}), + logger:debug("insert interface conection info ~p ~p ~p", [Interface, Item#connection_info.pid, I1]), + case IsUpdateProvideNode of + true -> + I2 = ets:insert(?PROVIDER_NODE_LIST_TABLE, #provider_node_list{host_flag = HostFlag, connection_info = Item}), + logger:debug("insert PROVIDER_NODE_LIST_TABLE ~p info:~p", [HostFlag, I2]); + false -> + ok + end, + ok + end, ConnectionList), + ok. + +get_interface_provider_node(Interface) -> + case ets:lookup(?INTERFCE_LIST_TABLE, Interface) of + [] -> + []; + List -> + ListRet = [Item#interface_list.connection_info#connection_info.host_flag || Item <- List], + dubbo_lists_util:del_duplicate(ListRet) + end. + +select_connection(Interface) -> + RandNum = rand:uniform(2048), + select_connection(Interface, RandNum). +select_connection(Interface, RandNum) -> + case ets:lookup(?INTERFCE_LIST_TABLE, Interface) of + [] -> + {error, none}; + List -> + Len = length(List), + RemNum = (RandNum rem Len) + 1, + InterfaceListItem = lists:nth(RemNum, List), + {ok, InterfaceListItem#interface_list.connection_info} + end. + +-spec(update_connection_readonly(pid(), boolean()) -> ok). +update_connection_readonly(ConnectionPid, Readonly) -> + Pattern = #interface_list{pid = ConnectionPid, _ = '_'}, + Objects = ets:match_object(?INTERFCE_LIST_TABLE, Pattern), + lists:map(fun(#interface_list{interface = Interface, pid = Pid, connection_info = ConnectionInfo} = InterferConnection) -> + logger:debug("[dubbo] update interface ~p ~p readonly", [Interface, Pid]), + NewConnectionInfo = ConnectionInfo#connection_info{readonly = Readonly}, + NewObject = InterferConnection#interface_list{connection_info = NewConnectionInfo}, + ets:delete_object(?INTERFCE_LIST_TABLE, InterferConnection), + ets:insert(?INTERFCE_LIST_TABLE, NewObject) + end, Objects), + {ok, length(Objects)}. + +clean_invalid_provider([]) -> + ok; +clean_invalid_provider([HostFlag | DeleteProverList]) -> + case ets:lookup(?PROVIDER_NODE_LIST_TABLE, HostFlag) of + [] -> + ok; + ProviderNodeList -> + ProviderNodeList1 = dubbo_lists_util:del_duplicate(ProviderNodeList), + clean_connection_info(ProviderNodeList1) + end, + clean_invalid_provider(DeleteProverList). + +clean_connection_info(ProviderNodeList) -> + lists:map(fun(Item) -> + Pid = Item#provider_node_list.connection_info#connection_info.pid, + ConnectionId = Item#provider_node_list.connection_info#connection_info.connection_id, + Pattern = #interface_list{pid = Pid, _ = '_'}, + ets:delete_object(?INTERFCE_LIST_TABLE, Pattern), + dubbo_transport_pool_sup:stop_children(ConnectionId) + end, ProviderNodeList), + ok. \ No newline at end of file diff --git a/src/dubbo_registry_zookeeper.erl b/src/dubbo_registry_zookeeper.erl index 0b5b3f5..5d14588 100644 --- a/src/dubbo_registry_zookeeper.erl +++ b/src/dubbo_registry_zookeeper.erl @@ -311,4 +311,4 @@ gen_consumer_node_info(Consumer) -> %%dubbo_zookeeper:register_consumer(<<"com.ifcoder.abcd">>,[]). start_provider_process(Interface, ProviderList) -> - dubbo_consumer_pool:start_consumer(Interface, ProviderList). \ No newline at end of file + dubbo_provider_consumer_reg_table:start_consumer(Interface, ProviderList). \ No newline at end of file diff --git a/src/dubbo_consumer_pool_sup.erl b/src/dubbo_transport_pool_sup.erl similarity index 87% rename from src/dubbo_consumer_pool_sup.erl rename to src/dubbo_transport_pool_sup.erl index 77a6dbe..019c57d 100644 --- a/src/dubbo_consumer_pool_sup.erl +++ b/src/dubbo_transport_pool_sup.erl @@ -14,12 +14,12 @@ %% See the License for the specific language governing permissions and %% limitations under the License. %%------------------------------------------------------------------------------ --module(dubbo_consumer_pool_sup). +-module(dubbo_transport_pool_sup). -behaviour(supervisor). %% API --export([start_link/0, add_children/1, stop_children/1]). +-export([start_link/0, add_children/2, stop_children/1]). %% Supervisor callbacks -export([init/1]). @@ -63,17 +63,18 @@ start_link() -> ignore | {error, Reason :: term()}). init([]) -> - RestartStrategy = one_for_one, + RestartStrategy = simple_one_for_one, MaxRestarts = 1000, MaxSecondsBetweenRestarts = 3600, SupFlags = {RestartStrategy, MaxRestarts, MaxSecondsBetweenRestarts}, + Child = {dubbo_client_default, {dubbo_client_default, start_link, []}, permanent, 2000, worker, [dubbo_client_default]}, + {ok, {SupFlags, [Child]}}. - {ok, {SupFlags, []}}. +add_children(ProvideConfig, Handler) -> + supervisor:start_child(?SERVER, [ProvideConfig, Handler]). -add_children(ChildSpec) -> - supervisor:start_child(?SERVER, ChildSpec). stop_children(ChildID) -> supervisor:terminate_child(?SERVER, ChildID). %%%=================================================================== diff --git a/src/dubbo_zookeeper.erl b/src/dubbo_zookeeper.erl index f62ace1..84a95d6 100644 --- a/src/dubbo_zookeeper.erl +++ b/src/dubbo_zookeeper.erl @@ -272,5 +272,5 @@ gen_consumer_node_info(Consumer) -> %%dubbo_zookeeper:register_consumer(<<"com.ifcoder.abcd">>,[]). start_provider_process(Interface, ProviderList) -> - dubbo_consumer_pool:start_consumer(Interface, ProviderList). + dubbo_provider_consumer_reg_table:start_consumer(Interface, ProviderList). diff --git a/src/dubboerl_sup.erl b/src/dubboerl_sup.erl index a829015..a0d2fb0 100644 --- a/src/dubboerl_sup.erl +++ b/src/dubboerl_sup.erl @@ -45,8 +45,8 @@ init([]) -> %% NettySer = {dubbo_netty_client,{dubbo_netty_client, start_link, []},transient,5000,worker,[dubbo_netty_client]}, Id_count = {dubbo_id_generator, {dubbo_id_generator, start_link, []}, transient, 5000, worker, [dubbo_id_generator]}, ProviderPoolSup = {dubbo_provider_worker_sup, {dubbo_provider_worker_sup, start_link, []}, transient, 5000, supervisor, [dubbo_provider_worker_sup]}, - ConsumerPoolSup = {dubbo_consumer_pool_sup, {dubbo_consumer_pool_sup, start_link, []}, transient, 5000, supervisor, [dubbo_consumer_pool_sup]}, - ConsumerPool = {dubbo_consumer_pool, {dubbo_consumer_pool, start_link, []}, transient, 5000, worker, [dubbo_consumer_pool]}, + ConsumerPoolSup = {dubbo_transport_pool_sup, {dubbo_transport_pool_sup, start_link, []}, transient, 5000, supervisor, [dubbo_transport_pool_sup]}, + ConsumerPool = {dubbo_provider_consumer_reg_table, {dubbo_provider_consumer_reg_table, start_link, []}, transient, 5000, worker, [dubbo_provider_consumer_reg_table]}, ListNew1 = case application:get_env(dubboerl, registry, false) of true -> diff --git a/test/dubbo_consumer_pool_tests.erl b/test/dubbo_consumer_pool_tests.erl index 740ed84..0a67f1a 100644 --- a/test/dubbo_consumer_pool_tests.erl +++ b/test/dubbo_consumer_pool_tests.erl @@ -21,13 +21,13 @@ -include("dubbo.hrl"). update_readonly_test() -> - dubbo_consumer_pool:start_link(), + dubbo_provider_consumer_reg_table:start_link(), InterfaceName= <<"testinterfacename">>, HostFalg= <<"127.0.0.1/20880">>, ConnectionList = [ #connection_info{connection_id=1,pid= testpid,weight = 30,host_flag = HostFalg}, #connection_info{connection_id=2,pid= testpid2,weight = 30,host_flag = HostFalg} ], - dubbo_consumer_pool:update_connection_info(InterfaceName,HostFalg,ConnectionList,true), - {ok,Size} = dubbo_consumer_pool:update_connection_readonly(testpid,false), + dubbo_provider_consumer_reg_table:update_connection_info(InterfaceName,HostFalg,ConnectionList,true), + {ok,Size} = dubbo_provider_consumer_reg_table:update_connection_readonly(testpid,false), ?assertEqual(1,Size).