This is an automated email from the ASF dual-hosted git repository. wangxin pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/dubbo-php-framework.git
commit af3df5c02176b4c598ea55b147b2f7e27af60cd8 Author: wangjinxi <wangji...@che001.com> AuthorDate: Thu Jul 4 18:26:45 2019 +0800 update --- common/file/FSOFRedis.php | 64 ++++++++++------------ common/protocol/fsof/DubboParser.php | 52 ++++++------------ common/url/FSOFUrl.php | 2 + config/global/conf/fsof.ini | 10 ++-- consumer/ConsumerException.php | 16 ++++++ consumer/Type.php | 38 ++++++++++++- consumer/fsof/FSOFProcessor.php | 47 ++++++++-------- consumer/proxy/Proxy.php | 5 +- consumer/proxy/ProxyFactory.php | 4 +- demo/demo-consumer/config/log4php.xml | 6 +- demo/demo-consumer/consumer/demo-consumer.consumer | 10 +--- 11 files changed, 143 insertions(+), 111 deletions(-) diff --git a/common/file/FSOFRedis.php b/common/file/FSOFRedis.php index e61a8e3..914e0a9 100644 --- a/common/file/FSOFRedis.php +++ b/common/file/FSOFRedis.php @@ -22,7 +22,6 @@ use com\fenqile\fsof\common\config\FSOFConstants; class FSOFRedis { - const REDIS_TIME_OUT = 1; private static $_instance; @@ -30,7 +29,9 @@ class FSOFRedis private $logger; - private $timeout = self::REDIS_TIME_OUT; + private $connect_timeout = 1; + + private $read_timeout = 2; private $retry_count = 1; @@ -40,13 +41,14 @@ class FSOFRedis [FSOFConstants::FSOF_SERVICE_REDIS_HOST, FSOFConstants::FSOF_SERVICE_REDIS_PORT], ]; - public static function instance($config) + public static function instance($config = []) { if (extension_loaded('redis')) { if (!isset(FSOFRedis::$_instance)) { FSOFRedis::$_instance = new FSOFRedis($config); + FSOFRedis::$_instance->get_redis(); } return FSOFRedis::$_instance; } @@ -57,10 +59,10 @@ class FSOFRedis return NULL; } - public function __construct($config) + public function __construct($config = []) { $this->logger = \Logger::getLogger(__CLASS__); - if($config['redis_hosts']) + if(isset($config['redis_hosts'])) { $this->hosts = []; $address = explode(',', $config['redis_hosts']); @@ -69,20 +71,22 @@ class FSOFRedis $this->hosts[] = [$host, $port??FSOFConstants::FSOF_SERVICE_REDIS_PORT]; } } - if($config['redis_connect_timeout']) + if(isset($config['redis_connect_timeout'])) + { + $this->connect_timeout = $config['redis_connect_timeout']; + } + if(isset($config['redis_read_timeout'])) { - $this->timeout = $config['redis_connect_timeout']; + $this->read_timeout = $config['redis_read_timeout']; } - if($config['redis_connect_type']) + if(isset($config['redis_connect_type'])) { $this->connect_type = $config['redis_connect_type']; } - if($config['redis_retry_count']) + if(isset($config['redis_retry_count'])) { $this->retry = min($config['redis_retry_count'], 1); } - - $this->get_redis(); } public function get_redis() @@ -92,24 +96,25 @@ class FSOFRedis $hosts_count = count($this->hosts); $retry = $this->retry_count; $rand_num = rand() % $hosts_count; + $ret = false; do{ try{ $redis_cli = new \Redis(); - if($this->connect_type == FSOFConstants::FSOF_SERVICE_REDIS_CONNECT_TYPE_SOCK) + if($this->connect_type == FSOFConstants::FSOF_SERVICE_REDIS_CONNECT_TYPE_TCP) { $node = $this->hosts[$rand_num]; - $ret = $redis_cli->connect($node[0],$node[1],$this->timeout); + $ret = $redis_cli->connect($node[0],$node[1],$this->connect_timeout); + $redis_cli->setOption(\Redis::OPT_READ_TIMEOUT, $this->read_timeout); $rand_num = ($rand_num + 1)%$hosts_count; if (!$ret) { $this->logger->warn("connect redis failed[{$node[0]}:{$node[1]}]"); } }else{ - $ret = $redis_cli->connect("/var/fsof/redis.sock",-1,FSOFConstants::FSOF_SERVICE_REDIS_PORT,$this->timeout); + $ret = $redis_cli->connect("/var/fsof/redis.sock",-1,FSOFConstants::FSOF_SERVICE_REDIS_PORT,$this->connect_timeout); } if($ret) { - $e = null; break; } }catch (\Exception $e){ @@ -162,27 +167,14 @@ class FSOFRedis public function getlist($key) { - $ret = NULL; - if (!empty($key)) - { - try - { - if(!isset($this->m_redis)) - { - $this->get_redis(); - } - $ret = $this->getlRange($key); - } - catch (\Exception $e) - { - $this->logger->warn('redis current connect excepiton'.' |errcode:'.$e->getCode().' |errmsg:'.$e->getMessage()); - $this->close(); - //重试一次 - $this->get_redis(); - $ret = $this->getlRange($key); - } - } - return $ret; + if (!empty($key) && isset($this->m_redis)) + { + return $this->getlRange($key); + } + else + { + return null; + } } public function set($key, $value) diff --git a/common/protocol/fsof/DubboParser.php b/common/protocol/fsof/DubboParser.php index d939b07..b812fae 100644 --- a/common/protocol/fsof/DubboParser.php +++ b/common/protocol/fsof/DubboParser.php @@ -1,8 +1,9 @@ <?php - namespace com\fenqile\fsof\common\protocol\fsof; use com\fenqile\fsof\consumer\Type; +use Icecave\Flax\Serialization\Encoder; +use Icecave\Flax\DubboParser as Decoder; /** * @@ -71,7 +72,7 @@ class DubboParser public function packRequest(DubboRequest $request) { - if (self::DUBBO_PROTOCOL_SERIALIZE_HESSIAN2 == self::DUBBO_PROTOCOL_NAME_MAP_CODE[$request->getSerialization()]) { + if (self::DUBBO_PROTOCOL_SERIALIZE_HESSIAN2 == (self::DUBBO_PROTOCOL_NAME_MAP_CODE[$request->getSerialization()]??null)) { $reqData = $this->buildBodyForHessian2($request); $serialize_type = self::DUBBO_PROTOCOL_SERIALIZE_HESSIAN2; } else { @@ -104,12 +105,7 @@ class DubboParser } $reqData .= json_encode($request->getMethod()) . PHP_EOL; $reqData .= json_encode($this->typeRefs($request)) . PHP_EOL; - foreach ($request->getParams() as $value) { - if ($value instanceof \stdClass) { - $value = $value->object; - } elseif ($value instanceof Type) { - $value = $value->value; - } + foreach (Type::getDataForSafed($request->getParams()) as $value) { $reqData .= json_encode($value) . PHP_EOL; } $attach = array(); @@ -130,27 +126,19 @@ class DubboParser public function buildBodyForHessian2(DubboRequest $request) { - $hess_stream = new \HessianStream(); - $hess_options = new \HessianOptions(); - $hess_factory = new \HessianFactory(); - $writer = $hess_factory->getWriter($hess_stream, $hess_options); + $encode = new Encoder(); $reqData = ''; - $reqData .= $writer->writeValue($request->getDubboVersion()); - $reqData .= $writer->writeValue($request->getService()); + $reqData .= $encode->encode($request->getDubboVersion()); + $reqData .= $encode->encode($request->getService()); if ($request->getVersion()) { - $reqData .= $writer->writeValue($request->getVersion()); + $reqData .= $encode->encode($request->getVersion()); } else { - $reqData .= $writer->writeValue(''); + $reqData .= $encode->encode(''); } - $reqData .= $writer->writeValue($request->getMethod()); - $reqData .= $writer->writeValue($this->typeRefs($request)); - foreach ($request->getParams() as $value) { - if ($value instanceof \stdClass) { - $value = $value->object; - } elseif ($value instanceof Type) { - $value = $value->value; - } - $reqData .= $writer->writeValue($value); + $reqData .= $encode->encode($request->getMethod()); + $reqData .= $encode->encode($this->typeRefs($request)); + foreach (Type::getDataForSafed($request->getParams()) as $value) { + $reqData .= $encode->encode($value); } $attach = ['path' => $request->getService(), 'interface' => $request->getService(), 'timeout' => $request->getTimeout()]; if ($request->getGroup()) { @@ -159,7 +147,7 @@ class DubboParser if ($request->getVersion()) { $attach['version'] = $request->getVersion(); } - $reqData .= $writer->writeValue($attach); + $reqData .= $encode->encode($attach); return $reqData; } @@ -184,7 +172,7 @@ class DubboParser if (($flag & self::FLAG_HEARTBEAT_EVENT) != 0) { $response->setHeartbeatEvent(true); } - $response->setSerialization($flag & self::DUBBO_PROTOCOL_SERIALIZE_FAST_JSON); + $response->setSerialization($flag & self::SERIALIZATION_MASK); $response->setLen($_arr["len"]); return $response; } @@ -239,13 +227,9 @@ class DubboParser private function parseResponseBodyForHessian2(DubboResponse $response) { if (!$response->isHeartbeatEvent()) { - $_data = substr($response->getFullData(), self::PACKAGE_HEDA_LEN + 1); - $response->setResponseBody($_data); - $hess_stream = new \HessianStream($_data); - $hess_options = new \HessianOptions(); - $hess_factory = new \HessianFactory(); - $parser = $hess_factory->getParser($hess_stream, $hess_options); - $content = $parser->parseReply(); + $_data = $response->getFullData(); + $decode = new Decoder($_data); + $content = $decode->getData($_data); $response->setResult($content); } return $response; diff --git a/common/url/FSOFUrl.php b/common/url/FSOFUrl.php index f9042a0..bb9fcb4 100644 --- a/common/url/FSOFUrl.php +++ b/common/url/FSOFUrl.php @@ -153,6 +153,8 @@ class FSOFUrl if(isset($getArgs[self::URL_SERIALIZATION])) { $this->serialization = $getArgs[self::URL_SERIALIZATION]; + } else { + $this->serialization = 'hessian2'; } } diff --git a/config/global/conf/fsof.ini b/config/global/conf/fsof.ini index d4d698b..aa92cfc 100644 --- a/config/global/conf/fsof.ini +++ b/config/global/conf/fsof.ini @@ -1,6 +1,6 @@ [fsof_container_setting] ;php path -php = '/usr/local/php-7.1.12/bin/php' +php = '/usr/bin/php' ;app's user user = root @@ -16,19 +16,19 @@ p2p_mode = false zklog_level = 0 ;zookeepr log path -zklog_path = '/home/devops/workspace/dubbo/logs/zookeeper.log' +zklog_path = '/var/fsof/provider/zookeeper.log' ;zookeeper url list -zk_url_list = http://192.168.214.148:2181,http://192.168.214.148:2182,http://192.168.214.148:2183 +zk_url_list = http://127.0.0.1:2181 ;provider overload mode switch overload_mode = true -;if request wait more than waiting_time before processed, we will lost this quest, unit is micro-second +;if request wait more than waiting_time before processed, we will lost this quest, unit is micro-second waiting_time = 2000 ;if overload_number requests trigger overload rule continuous, we will open loss request mode overload_number = 5 -;how many quest is lost before lost mode is close +;how many quest is lost before lost mode is close loss_number = 20 \ No newline at end of file diff --git a/consumer/ConsumerException.php b/consumer/ConsumerException.php new file mode 100644 index 0000000..dc2f465 --- /dev/null +++ b/consumer/ConsumerException.php @@ -0,0 +1,16 @@ +<?php +namespace com\fenqile\fsof\consumer; + +use Exception; + +class ConsumerException extends Exception +{ + /** + * @param string $message The exception message. + * @param Exception|null $previous The previous exception, if any. + */ + public function __construct($message, Exception $previous = null) + { + parent::__construct($message, 0, $previous); + } +} \ No newline at end of file diff --git a/consumer/Type.php b/consumer/Type.php index 5afa4b5..3000d5b 100644 --- a/consumer/Type.php +++ b/consumer/Type.php @@ -140,11 +140,47 @@ class Type $std = new \stdClass; foreach ($properties as $key => $value) { - $std->$key = ($value instanceof Type) ? $value->value : $value; + $std->$key = ($value instanceof Type) ? self::typeTosafe($value) : $value; } $std_wrap = new \stdClass(); $std_wrap->object = $std; $std_wrap->class = 'L'.str_replace('.', '/', $class).';'; return $std_wrap; } + + public static function getDataForSafed($args) + { + foreach ($args as &$value) + { + if ($value instanceof \stdClass) { + $value = $value->object; + } elseif ($value instanceof Type) { + $value = self::typeTosafe($value); + } + } + return $args; + } + + public static function typeTosafe(Type $type) + { + switch ($type->type){ + case Type::SHORT: + case Type::INT: + case Type::LONG: + $value = (int)$type->value; + break; + case Type::FLOAT: + case Type::DOUBLE: + $value = (float)$type->value; + break; + case Type::BOOLEAN: + $value = (bool)$type->value; + break; + case Type::STRING: + default: + $value = (string)$type->value; + break; + } + return $value; + } } \ No newline at end of file diff --git a/consumer/fsof/FSOFProcessor.php b/consumer/fsof/FSOFProcessor.php index fd854b5..6555542 100644 --- a/consumer/fsof/FSOFProcessor.php +++ b/consumer/fsof/FSOFProcessor.php @@ -22,6 +22,7 @@ use com\fenqile\fsof\common\protocol\fsof\DubboParser; use com\fenqile\fsof\common\protocol\fsof\DubboRequest; use com\fenqile\fsof\common\protocol\fsof\DubboResponse; use com\fenqile\fsof\consumer\client\FSOFClient4Linux; +use com\fenqile\fsof\consumer\ConsumerException; class FSOFProcessor { @@ -35,6 +36,8 @@ class FSOFProcessor private $logger; + private $iotimeout = 3; + public function __construct() { $this->logger = \Logger::getLogger(__CLASS__); @@ -43,6 +46,7 @@ class FSOFProcessor public function executeRequest(DubboRequest $request, $svrAddr, $ioTimeOut, &$providerAddr) { + $this->iotimeout = $ioTimeOut; //计算服务端个数 $svrNum = count($svrAddr); //连接异常重试次数最多2次 @@ -66,10 +70,10 @@ class FSOFProcessor $request->port = $port; $request->setGroup($svrUrl->getGroup()); $request->setVersion( $svrUrl->getVersion()); - $request->setTimeout($ioTimeOut * 1000); + $request->setTimeout($this->iotimeout * 1000); $request->setSerialization($svrUrl->getSerialization(DubboParser::DUBBO_PROTOCOL_SERIALIZE_FAST_JSON)); - $client = $this->connectProvider($host, $port, $ioTimeOut); + $client = $this->connectProvider($host, $port, $this->iotimeout); if(empty($client)) { //记录连接错误日志 @@ -126,7 +130,7 @@ class FSOFProcessor $msg = mb_substr($msg, 0, 512, 'UTF-8').' ...(len:'.strlen($msg).")"; } $this->logger->error("send date failed:" . $msg); - throw new \Exception("发送请求数据失败"); + throw new ConsumerException("发送请求数据失败"); } } catch (\Exception $e) @@ -139,7 +143,7 @@ class FSOFProcessor $msg = mb_substr($msg, 0, 512, 'UTF-8').' ...(len:'.strlen($msg).")"; } $this->logger->error("send date failed:" . $msg, $e); - throw new \Exception("发送请求数据失败"); + throw new ConsumerException("发送请求数据失败"); } try @@ -157,7 +161,7 @@ class FSOFProcessor } else { - throw new \Exception("与服务器建立连接失败"); + throw new ConsumerException("与服务器建立连接失败"); } return $ret; } @@ -206,11 +210,11 @@ class FSOFProcessor { if (0 == $socket->getlasterror()) { - throw new \Exception("provider端己关闭网络连接"); + throw new ConsumerException("provider端己关闭网络连接"); } else { - throw new \Exception("接收应答数据超时"); + throw new ConsumerException("接收应答数据超时"); } } @@ -221,7 +225,7 @@ class FSOFProcessor if (($response) && ($response->getSn() != $request->getSn())) { $this->logger->error("response sn[{$response->getSn()}] != request sn[{$request->getSn()}]"); - throw new \Exception("请求包中的sn非法"); + throw new ConsumerException("请求包中的sn非法"); } //接收消息体 @@ -256,25 +260,25 @@ class FSOFProcessor $tmpdata = $this->Recv($socket, $cur_len); if ($tmpdata) { - $recv_data = $recv_data . $tmpdata; + $recv_data .= $tmpdata; $resv_len -= $cur_len; } else { if (0 == $socket->getlasterror()) { - throw new \Exception("provider端己关闭网络连接"); + throw new ConsumerException("provider端己关闭网络连接"); } else { - throw new \Exception("接收应答数据超时"); + throw new ConsumerException("接收应答数据超时"); } } - //如果超过15秒就当超时处理 - if ((microtime(true) - $start_time) > 15) + //如果超过设置的iotimeout就当超时处理 + if ((microtime(true) - $start_time) > $this->iotimeout) { $this->logger->error("Multi recv {$resv_len} bytes data timeout"); - throw new \Exception("接收应答数据超时"); + throw new ConsumerException("接收应答数据超时"); } } while ($resv_len > 0); @@ -285,7 +289,7 @@ class FSOFProcessor { if(DubboResponse::OK != $response->getStatus()) { - throw new \Exception($response->getErrorMsg()); + throw new ConsumerException($response->getErrorMsg()); } else { @@ -295,7 +299,7 @@ class FSOFProcessor else { $this->logger->error("parse response body err:".$response->__toString()); - throw new \Exception("未知异常"); + throw new ConsumerException("未知异常"); } } @@ -303,21 +307,20 @@ class FSOFProcessor { try { + $start_time = microtime(true); $resv_len = $len; $_data = ''; - $cnt = 20;//最多循环20次,防止provider端挂掉时,consumer陷入死循环 do { - $cnt--; $tmp_data = $socket->recv($resv_len); if (!$tmp_data) { $this->logger->warn("socket->recv faile:$resv_len"); break; } - $_data = $_data . $tmp_data; + $_data .= $tmp_data; $resv_len -= strlen($tmp_data); - } while (($resv_len > 0) && ($cnt > 0)); + } while (($resv_len > 0) && ( (microtime(true) - $start_time) > $this->iotimeout)); //读取数据不能超过设置的io时长 if ($resv_len > 0) { @@ -332,11 +335,11 @@ class FSOFProcessor $this->logger->error('recv data exception',$e); if(self::FSOF_CONNECTION_RESET == $e->getCode()) { - throw new \Exception("未知异常"); + throw new ConsumerException("未知异常"); } else { - throw new \Exception("接收应答数据超时"); + throw new ConsumerException("接收应答数据超时"); } } } diff --git a/consumer/proxy/Proxy.php b/consumer/proxy/Proxy.php index 021cff7..723a4ba 100644 --- a/consumer/proxy/Proxy.php +++ b/consumer/proxy/Proxy.php @@ -83,11 +83,14 @@ final class Proxy protected function generateParamType($args) { + $types = []; foreach ($args as $val) { if($val instanceof \stdClass){ $types[] = $val->class; - }else{ + }else if($val instanceof Type){ $types[] = Type::adapter[$val->type]??'Ljava/lang/Object;'; + }else{ + $types[] = 'Ljava/lang/Object;'; } } return $types; diff --git a/consumer/proxy/ProxyFactory.php b/consumer/proxy/ProxyFactory.php index 5036b48..0b4c8da 100644 --- a/consumer/proxy/ProxyFactory.php +++ b/consumer/proxy/ProxyFactory.php @@ -22,6 +22,7 @@ use com\fenqile\fsof\common\log\FSOFSystemUtil; use com\fenqile\fsof\common\config\FSOFConstants; use com\fenqile\fsof\common\config\FSOFCommonUtil; use com\fenqile\fsof\registry\automatic\ConsumerProxy; +use com\fenqile\fsof\consumer\ConsumerException; final class ProxyFactory @@ -220,7 +221,7 @@ final class ProxyFactory if (empty($ret)) { $errMsg = "current_address:".FSOFSystemUtil::getLocalIP()."|".$consumerInterface; - throw new \Exception($errMsg); + throw new ConsumerException($errMsg); } else { @@ -233,6 +234,7 @@ final class ProxyFactory { self::$logger->error('consumer_app:'.self::$appName.'|app_config_file:'.self::$appConfigFile. '|version:'.$version.'|group:'.$group.'|provider_service:'.$consumerInterface.'|errmsg:'. $e->getMessage().'|exceptionmsg:'.$e); + throw new ConsumerException($e->getMessage(), $e); } return $ret; } diff --git a/demo/demo-consumer/config/log4php.xml b/demo/demo-consumer/config/log4php.xml index 0b8828f..7e89f2f 100644 --- a/demo/demo-consumer/config/log4php.xml +++ b/demo/demo-consumer/config/log4php.xml @@ -1,9 +1,9 @@ <configuration xmlns="http://logging.apache.org/log4php/"> - <appender name="myAppender" class="LoggerAppenderDailyFile"> + <appender name="myAppender" class="LoggerAppenderFile"> + <param name="file" value="/tmp/consumer.log" /> <layout class="LoggerLayoutPattern"> - <param name="conversionPattern" value="[%date{Y-m-d H:i:s,u}][%level][traceid:%X{traceid}][%logger][%line] %message%newline" /> + <param name="conversionPattern" value="[%date{Y-m-d H:i:s,u}][%level][%logger][%line] %message%newline" /> </layout> - <param name="file" value="/home/devops/workspace/dubbo/logs/consumer_%s.log" /> </appender> <root> <level value="DEBUG" /> diff --git a/demo/demo-consumer/consumer/demo-consumer.consumer b/demo/demo-consumer/consumer/demo-consumer.consumer index 81c86a1..8e007d8 100644 --- a/demo/demo-consumer/consumer/demo-consumer.consumer +++ b/demo/demo-consumer/consumer/demo-consumer.consumer @@ -1,16 +1,10 @@ [consumer_config] p2p_mode = false -#tcp/sock -redis_connect_type = tcp -redis_hosts = 127.0.0.1:6379,127.0.0.1:6379 -redis_connect_timeout = 1 -redis_retry_count = 1 - [consumer_services] com.fenqile.example.DemoService[group] = * com.fenqile.example.DemoService[version] = 1.0.0 -com.imooc.springboot.dubbo.demo.ObjectDemoService[group] = * -com.imooc.springboot.dubbo.demo.ObjectDemoService[version] = 1.0.0 \ No newline at end of file +com.fenqile.arch.dubbo.service.DemoPhpService[group] = * +com.fenqile.arch.dubbo.service.DemoPhpService[version] = 1.0.0 \ No newline at end of file