yang20150702 commented on code in PR #181:
URL: https://github.com/apache/dubbo-rust/pull/181#discussion_r1522781224
##########
dubbo/src/extension/registry_extension.rs:
##########
@@ -0,0 +1,685 @@
+use std::{
+ borrow::Cow, collections::HashMap, convert::Infallible, future::Future,
pin::Pin, str::FromStr,
+};
+
+use async_trait::async_trait;
+use thiserror::Error;
+use tokio::sync::mpsc::Receiver;
+use tower::discover::Change;
+
+use dubbo_base::{url::UrlParam, Url};
+use proxy::RegistryProxy;
+
+use crate::{
+ extension::{
+ ConvertToExtensionFactories, Extension, ExtensionFactories,
ExtensionMetaInfo,
+ ExtensionName, ExtensionType,
+ },
+ StdError,
+};
+
+//
extension://0.0.0.0/?extension-type=registry&extension-name=nacos®istry-url=nacos://127.0.0.1:8848
+pub fn to_extension_url(registry_url: Url) -> Url {
+ let mut registry_extension_loader_url: Url =
"extension://0.0.0.0".parse().unwrap();
+
+ let protocol = registry_url.protocol();
+
+ registry_extension_loader_url.add_query_param(ExtensionType::Registry);
+
registry_extension_loader_url.add_query_param(ExtensionName::new(protocol.to_string()));
+
registry_extension_loader_url.add_query_param(RegistryUrl::new(registry_url));
+
+ registry_extension_loader_url
+}
+
+pub type ServiceChange = Change<String, ()>;
+pub type DiscoverStream = Receiver<Result<ServiceChange, StdError>>;
+
+#[async_trait]
+pub trait Registry {
+ async fn register(&self, url: Url) -> Result<(), StdError>;
+
+ async fn unregister(&self, url: Url) -> Result<(), StdError>;
+
+ async fn subscribe(&self, url: Url) -> Result<DiscoverStream, StdError>;
+
+ async fn unsubscribe(&self, url: Url) -> Result<(), StdError>;
+
+ fn url(&self) -> &Url;
+}
+
+impl<T> crate::extension::Sealed for T where T: Registry + Send + 'static {}
+
+impl<T> ExtensionMetaInfo for T
+where
+ T: Registry + Send + 'static,
+ T: Extension<Target = Box<dyn Registry + Send + 'static>>,
+{
+ fn extension_type() -> ExtensionType {
+ ExtensionType::Registry
+ }
+}
+
+impl<T> ConvertToExtensionFactories for T
+where
+ T: Registry + Send + 'static,
+ T: Extension<Target = Box<dyn Registry + Send + 'static>>,
+{
+ fn convert_to_extension_factories() -> ExtensionFactories {
+ fn constrain<F>(f: F) -> F
+ where
+ F: for<'a> Fn(
+ &'a Url,
+ ) -> Pin<
+ Box<
+ dyn Future<Output = Result<Box<dyn Registry + Send +
'static>, StdError>>
+ + Send
+ + 'a,
+ >,
+ >,
+ {
+ f
+ }
+
+ let constructor = constrain(|url: &Url| {
+ let f = <T as Extension>::create(url);
+ Box::pin(f)
+ });
+
+
ExtensionFactories::RegistryExtensionFactory(RegistryExtensionFactory::new(constructor))
+ }
+}
+
+#[derive(Default)]
+pub(super) struct RegistryExtensionLoader {
+ factories: HashMap<String, RegistryExtensionFactory>,
+}
+
+impl RegistryExtensionLoader {
+ pub(super) fn new() -> Self {
+ Self {
+ factories: HashMap::new(),
+ }
+ }
+
+ pub(crate) async fn register(
+ &mut self,
+ extension_name: String,
+ factory: RegistryExtensionFactory,
+ ) {
+ self.factories.insert(extension_name, factory);
+ }
+
+ pub(crate) async fn remove(&mut self, extension_name: String) {
+ self.factories.remove(&extension_name);
+ }
+
+ pub(crate) async fn load(&mut self, url: &Url) -> Result<RegistryProxy,
StdError> {
+ let extension_name = url.query::<ExtensionName>().unwrap();
+ let extension_name = extension_name.value();
+ let factory = self.factories.get_mut(&extension_name).ok_or_else(|| {
+ RegistryExtensionLoaderError::new(format!(
+ "registry extension loader error: extension name {} not found",
+ extension_name
+ ))
+ })?;
+ factory.create(url).await
+ }
+}
+
+type RegistryConstructor = for<'a> fn(
+ &'a Url,
+) -> Pin<
+ Box<dyn Future<Output = Result<Box<dyn Registry + Send + 'static>,
StdError>> + Send + 'a>,
+>;
+pub(super) struct RegistryExtensionFactory {
+ constructor: RegistryConstructor,
+ instances: HashMap<String, RegistryProxy>,
+}
+
+impl RegistryExtensionFactory {
+ pub(super) fn new(constructor: RegistryConstructor) -> Self {
+ Self {
+ constructor,
+ instances: HashMap::new(),
+ }
+ }
+}
+
+impl RegistryExtensionFactory {
+ pub(super) async fn create(&mut self, url: &Url) -> Result<RegistryProxy,
StdError> {
+ let registry_url = url.query::<RegistryUrl>().unwrap();
+ let registry_url = registry_url.value();
+ let url_str = registry_url.as_str().to_string();
+ match self.instances.get(&url_str) {
+ Some(proxy) => {
+ let proxy = proxy.clone();
+ Ok(proxy)
+ }
+ None => {
+ let registry = (self.constructor)(url).await?;
+ let proxy = <RegistryProxy as From<Box<dyn Registry +
Send>>>::from(registry);
+ self.instances.insert(url_str, proxy.clone());
+ Ok(proxy)
+ }
+ }
+ }
+}
+
+#[derive(Error, Debug)]
+#[error("{0}")]
+pub(crate) struct RegistryExtensionLoaderError(String);
+
+impl RegistryExtensionLoaderError {
+ pub(crate) fn new(msg: String) -> Self {
+ RegistryExtensionLoaderError(msg)
+ }
+}
+
+pub mod proxy {
+ use async_trait::async_trait;
+ use thiserror::Error;
+ use tokio::sync::oneshot;
+
+
+ use dubbo_base::Url;
+ use dubbo_logger::tracing::error;
+
+ use crate::{
+ extension::registry_extension::{DiscoverStream, Registry},
+ StdError,
+ };
+
+ pub(super) enum RegistryOpt {
+ Register(Url, oneshot::Sender<Result<(), StdError>>),
+ Unregister(Url, oneshot::Sender<Result<(), StdError>>),
+ Subscribe(Url, oneshot::Sender<Result<DiscoverStream, StdError>>),
+ UnSubscribe(Url, oneshot::Sender<Result<(), StdError>>),
+ }
+
+ #[derive(Clone)]
+ pub struct RegistryProxy {
+ sender: tokio::sync::mpsc::Sender<RegistryOpt>,
+ url: Url,
+ }
+
+ #[async_trait]
+ impl Registry for RegistryProxy {
+ async fn register(&self, url: Url) -> Result<(), StdError> {
+ let (tx, rx) = oneshot::channel();
+
+ match self
+ .sender
+ .send(RegistryOpt::Register(url.clone(), tx))
+ .await
+ {
+ Ok(_) => match rx.await {
+ Ok(result) => result,
+ Err(_) => {
+ error!(
+ "registry proxy error: receive register response
failed, url: {}",
+ url
+ );
+ return Err(
+ RegistryProxyError::new("receive register response
failed").into()
+ );
+ }
+ },
+ Err(_) => {
+ error!(
+ "registry proxy error: send register request failed,
url: {}",
+ url
+ );
+ return Err(RegistryProxyError::new("send register opt
failed").into());
+ }
+ }
+ }
+
+ async fn unregister(&self, url: Url) -> Result<(), StdError> {
+ let (tx, rx) = oneshot::channel();
+ match self
+ .sender
+ .send(RegistryOpt::Unregister(url.clone(), tx))
+ .await
+ {
+ Ok(_) => match rx.await {
+ Ok(result) => result,
+ Err(_) => {
+ error!(
+ "registry proxy error: receive unregister response
failed, url: {}",
+ url
+ );
+ return Err(
+ RegistryProxyError::new("receive unregister
response failed").into(),
+ );
+ }
+ },
+ Err(_) => {
+ error!(
+ "registry proxy error: send unregister request failed,
url: {}",
+ url
+ );
+ return Err(RegistryProxyError::new("send unregister opt
failed").into());
+ }
+ }
+ }
+
+ async fn subscribe(&self, url: Url) -> Result<DiscoverStream,
StdError> {
+ let (tx, rx) = oneshot::channel();
+
+ match self
+ .sender
+ .send(RegistryOpt::Subscribe(url.clone(), tx))
+ .await
+ {
+ Ok(_) => match rx.await {
+ Ok(result) => result,
+ Err(_) => {
+ error!(
+ "registry proxy error: receive subscribe response
failed, url: {}",
+ url
+ );
+ return Err(
+ RegistryProxyError::new("receive subscribe
response failed").into()
+ );
+ }
+ },
+ Err(_) => {
+ error!(
+ "registry proxy error: send subscribe request failed,
url: {}",
+ url
+ );
+ return Err(RegistryProxyError::new("send subscribe opt
failed").into());
+ }
+ }
+ }
+
+ async fn unsubscribe(&self, url: Url) -> Result<(), StdError> {
+ let (tx, rx) = oneshot::channel();
+ match self
+ .sender
+ .send(RegistryOpt::UnSubscribe(url.clone(), tx))
+ .await
+ {
+ Ok(_) => {
+ match rx.await {
+ Ok(result) => result,
+ Err(_) => {
+ error!("registry proxy error: receive unsubscribe
response failed, url: {}", url);
+ return Err(RegistryProxyError::new(
+ "receive unsubscribe response failed",
+ )
+ .into());
+ }
+ }
+ }
+ Err(_) => {
+ error!(
+ "registry proxy error: send unsubscribe request
failed, url: {}",
+ url
+ );
+ return Err(RegistryProxyError::new("send unsubscribe opt
failed").into());
+ }
+ }
+ }
+
+ fn url(&self) -> &Url {
+ &self.url
+ }
+ }
+
+ impl From<Box<dyn Registry + Send>> for RegistryProxy {
+ fn from(registry: Box<dyn Registry + Send>) -> Self {
+ let url = registry.url().clone();
+
+ let (sender, mut receiver) = tokio::sync::mpsc::channel(1024);
+
+ tokio::spawn(async move {
+ while let Some(opt) = receiver.recv().await {
+ match opt {
+ RegistryOpt::Register(url, tx) => {
+ let register = registry.register(url).await;
+ if let Err(_) = tx.send(register) {
+ error!("registry proxy error: send register
response failed");
+ }
+ }
+ RegistryOpt::Unregister(url, tx) => {
+ let unregister = registry.unregister(url).await;
+ if let Err(_) = tx.send(unregister) {
+ error!("registry proxy error: send unregister
response failed");
+ }
+ }
+ RegistryOpt::Subscribe(url, tx) => {
+ let subscribe = registry.subscribe(url).await;
+ if let Err(_) = tx.send(subscribe) {
+ error!("registry proxy error: send subscribe
response failed");
+ }
+ }
+ RegistryOpt::UnSubscribe(url, tx) => {
+ let unsubscribe = registry.unsubscribe(url).await;
+ if let Err(_) = tx.send(unsubscribe) {
+ error!("registry proxy error: send unsubscribe
response failed");
+ }
+ }
+ }
+ }
+ });
+
+ RegistryProxy { sender, url }
+ }
+ }
+
+ #[derive(Error, Debug)]
+ #[error("registry proxy error: {0}")]
+ pub(crate) struct RegistryProxyError(String);
+
+ impl RegistryProxyError {
+ pub(crate) fn new(msg: &str) -> Self {
+ RegistryProxyError(msg.to_string())
+ }
+ }
+}
+
+pub struct RegistryUrl(Url);
Review Comment:
建议:将params维护在单独的文件下,命名参考(xxx_param.rs)
##########
dubbo/src/triple/server/builder.rs:
##########
@@ -142,10 +144,12 @@ impl From<Url> for ServerBuilder {
let authority = uri.authority().unwrap();
+ let service_name = u.query::<InterfaceName>().unwrap().value();
Review Comment:
service_name和 InterfaceName是否可以保持一致的命名?
##########
dubbo/src/extension/registry_extension.rs:
##########
@@ -0,0 +1,685 @@
+use std::{
Review Comment:
this file need add License
##########
dubbo/src/registry/types.rs:
##########
@@ -15,46 +15,40 @@
* limitations under the License.
*/
-use std::{
- collections::HashMap,
Review Comment:
删除该文件
##########
dubbo/src/extension/mod.rs:
##########
@@ -0,0 +1,383 @@
+pub mod registry_extension;
Review Comment:
this file need add License
##########
registry/nacos/src/utils/mod.rs:
##########
@@ -49,13 +49,13 @@ pub(crate) fn build_nacos_client_props(url: &Url) ->
(nacos_sdk::api::props::Cli
let server_addr = format!("{}:{}{}", host, port, backup);
let namespace = url
- .get_param(NAMESPACE_KEY)
+ .query_param_by_kv(NAMESPACE_KEY)
Review Comment:
query_param_by_kv 可以考虑命名为 query_param_by_key
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]