onewe commented on issue #180:
URL: https://github.com/apache/dubbo-rust/issues/180#issuecomment-1988370186

   # 一、背景
   
   ​    `Extension` 模块在 `Dubbo` 中是用来做扩展,插件化的重要模块. 在 `Java` 中 `Extension` 使用 
`SPI` 机制来实现. 受限于 `Rust` 语言层面的限制, 迫切需要一个全局使用、注册各种插件、根据 `Url` 为数据模型来加载`Extension` 
的加载器。
   
   
   
   # 二、设计方案
   
   ​    从用户使用的角度出发, 设想一下以下的使用方式:
   
   ```rust
   // 注册 registry 插件
   let _ = dubbo::extension::EXTENSIONS.register<NacosRegistry>().await;
   
   // 加载 registry 插件
   let registry_url = Url;
   let _ = dubbo::extension::EXTENSIONS.load_registry(registry_url).await;
   
   
   
   // 注册 failover cluster 插件
   let _ = dubbo::extension::EXTENSIONS.register<FailoverCluster>().await;
   
   // 加载 failover cluster 插件
   let failover_cluster_url = Url;
   let _ = 
dubbo::extension::EXTENSIONS.load_cluster(failover_cluster_url).await;
   ```
   
   ​    从上面的伪代码可以看出, 需要设计一个全局静态、不可变的 `EXTENSIONS` 变量, 方便用户或者框架内部使用. 
`EXTENSIONS` 至少需要 2 个方法, 一个方法用于注册插件,另一个则是用来加载插件. 根据上面的需求, 可以快速得到代码原型:
   
   ```rust
   // 全局静态、不可变
   pub static EXTENSIONS: once_cell::sync::Lazy<ExtensionDirectory> =
       once_cell::sync::Lazy::new(|| ExtensionDirectory);
   
   struct ExtensionDirectory;
   
   impl ExtensionDirectory {
     
     pub async fn register<T>(&self) -> Result<(), StdError> {}
     
     pub async fn load_registry(&self, url: Url) -> Result<?, StdError> {}
   }
   ```
   
   ​    这里`ExtensionDirectory` 有 2 个问题:
   
   1. 当用户使用 `register` 方法注册的时候, 为了方便用户使用, 这里直接采用了一个泛型 `T` , 意味需要抽象出 
`Extension`, 这样才能通过泛型的约束, 在编译期获取 `Extension` 一些相关信息, 来实现注册。
   2. 当用户或者框架本身使用 `load_registry` (这里只是为了方便举例, 后续随着扩展点的增多, 可能还有 
`load_cluster`、`load_route`等方法)方法来加载指定插件时, 这个返回值(也就是 `Registry` 插件), 需要一个抽象, 
这样才能统一返回一个通用的对象, 例如: `Box<dyn Registry + Send + 'sttic'>`
   
   为了解决这 2 个问题, 需要抽象出 `Extension` 和 `Registry`(这里只是为了方便举例, 后续随着扩展点的增多, 可能还有 
`Cluster`、`Route` 等)
   
   
   
   ## 2.1 Extension Trait
   
   ​    `Extension` 应该具有什么功能? 作为 `dubbo` 框架本身的扩展点, 最核心的 3 个功能应该是: 插件唯一标识符号、 
插件的构造函数 和 插件的类型.
   
   ### 2.1.1 插件唯一标识符
   
   ​    相当于是插件的名称, 需要在同类型插件中保证唯一即可. 例如 `Registry` 插件可以有很多实现, 可以是 
`NacosRegistry`、`ZookeeperRegistry` 和 `StaticRegistry`. 所以需要在 `Registry` 
类型插件中名称需要保证唯一, 防止插件注册被覆盖.
   
   ​    
   
   ### 2.1.2 插件的类型
   
   ​    需要插件的类型信息来执行不同的插件的注册、加载逻辑.
   
   简单的实现: 
   
   ```rust
   #[async_trait::async_trait]
   pub trait Extension {
     
       type Target;
   
       fn name() -> String;
                
       // 给插件提供异步初始化的能力
       async fn create(url: &Url) -> Result<Self::Target, StdError>;
     
        fn extension_type() -> ExtensionType;
   }
   
   pub enum ExtensionType {
       Registry,
        Cluster,
        Route,
   }
   ```
   
   
   
   ### 2.1.3 插件的构造函数
   
   ​    为简化用户插件注册过程, 只需要用户提供一个类型信息即可完成插件的注册, 所以需要用户提供一个构造方法, 以便于在运行过程中根据不同的 
`Url` 动态的构造插件对象.
   
   ​    不同类型的插件, 构造函数的签名都差不多, 唯一的不同是在返回值上. 所以这里需要使用枚举来统一. 构造函数这里的作用与 `Java` 中的 
`Factory` 的功能其实是差不多的.
   
   ```rust
   // 枚举
   pub enum ExtensionFactories {
       RegistryExtensionFactory(RegistryExtensionFactory),
   }
   
   
   // 以 Registry 来举例
   type RegistryConstructor = for<'a> fn(
       &'a Url,
   ) -> Pin<
       Box<dyn Future<Output = Result<Box<dyn Registry + Send + 'static>, 
StdError>> + Send + 'a>,
   >;
   
   // RegistryProxy 在后续会进行说明
   pub struct RegistryExtensionFactory {
       constructor: RegistryConstructor,
       instances: HashMap<String, RegistryProxy>,
   }
   
   impl RegistryExtensionFactory {
     pub async fn create(&mut self, url: &Url) -> Result<RegistryProxy, 
StdError> {
        todo!()
        }
   }
   
   ```
   
   ​    `RegistryExtensionFactory` 的基本逻辑是通过一个 `url` 来构造插件, 如果之前已经创建过了, `clone` 
一个实例返回,  不需要重复创建.
   
   ​    为了后续方便使用 `RegistryExtensionFactory`, 需要引入另外的 `trait` 来进行转换
   
   ```rust
   pub trait ConvertToExtensionFactories {
       fn convert_to_extension_factories() -> ExtensionFactories;
   }
   
   
   // 给所有实现 Registry 和 Extension 的实现 ConvertToExtensionFactories
   impl<T> ConvertToExtensionFactories for T
   where
       T: Registry + Send + 'static,
       T: Extension<Target = Box<dyn Registry + Send + 'static>>,
   {
       fn convert_to_extension_factories() -> ExtensionFactories {
           fn constrain<F>(f: F) -> F
           where
               F: for<'a> Fn(
                   &'a Url,
               ) -> Pin<
                   Box<
                       dyn Future<Output = Result<Box<dyn Registry + Send + 
'static>, StdError>>
                           + Send
                           + 'a,
                   >,
               >,
           {
               f
           }
   
           let constructor = constrain(|url: &Url| {
               let f = <T as Extension>::create(url);
               Box::pin(f)
           });
   
           
ExtensionFactories::RegistryExtensionFactory(RegistryExtensionFactory::new(constructor))
       }
   }
   
   ```
   
   
   
   ## 2.2 Registry Trait
   
   ​    `Registry` 插件需要有什么功能呢? 首先必须明确的一点是, 要实现 `Registry` 必须得实现 `Extension` , 
因为 `Registry` 是 `Extension` 的一种, 但这个约束太强了.  `Extension` 可以变成可选项, 只有用户需要注册插件到 
`dubbo` 框架中才需要实现 `Extension`. 但是 `Extension` 并不是任何对象都能实现的, 必须得是 `dubbo` 
中某个插件的具体实现才能够实现 `Extension`.
   
   ​    那 `Registry` 的核心功能是啥呢? 无外乎是注册服务、服务下线、订阅和取消订阅服务
   
   ```rust
   pub type ServiceChange = Change<String, ()>;
   pub type DiscoverStream = Receiver<Result<ServiceChange, StdError>>;
   #[async_trait]
   pub trait Registry {
       async fn register(&self, url: Url) -> Result<(), StdError>;
   
       async fn unregister(&self, url: Url) -> Result<(), StdError>;
   
       async fn subscribe(&self, url: Url) -> Result<DiscoverStream, StdError>;
   
       async fn unsubscribe(&self, url: Url) -> Result<(), StdError>;
   
       fn url(&self) -> &Url;
   }
   
   pub(crate) trait Sealed {}
   
   #[async_trait::async_trait]
   pub trait Extension: Sealed {
     
       type Target;
   
       fn name() -> String;
                
       // 给插件提供异步初始化的能力
       async fn create(url: &Url) -> Result<Self::Target, StdError>;
     
        fn extension_type() -> ExtensionType;
   }
   
   pub enum ExtensionType {
       Registry,
        Cluster,
        Route,
   }
   ```
   
   ​    这样看起来比较好点, 只有实现了 `Sealed` 才能够实现 `Extension`, 因为 `Sealed` 的可见性是 `crate`, 
所以需要一个小 `trick` 来让自定义插件的用户可以实现 `Extension`
   
   ```rust
   pub type ServiceChange = Change<String, ()>;
   pub type DiscoverStream = Receiver<Result<ServiceChange, StdError>>;
   #[async_trait]
   pub trait Registry {
       async fn register(&self, url: Url) -> Result<(), StdError>;
   
       async fn unregister(&self, url: Url) -> Result<(), StdError>;
   
       async fn subscribe(&self, url: Url) -> Result<DiscoverStream, StdError>;
   
       async fn unsubscribe(&self, url: Url) -> Result<(), StdError>;
   
       fn url(&self) -> &Url;
   }
   
   pub(crate) trait Sealed {}
   
   #[async_trait::async_trait]
   pub trait Extension: Sealed {
     
       type Target;
   
       fn name() -> String;
                
       // 给插件提供异步初始化的能力
       async fn create(url: &Url) -> Result<Self::Target, StdError>;
     
        fn extension_type() -> ExtensionType;
   }
   
   pub enum ExtensionType {
       Registry,
        Cluster,
        Route,
   }
   
   // 只要实现了 Registry 都自动实现 Sealed
   impl<T> Sealed for T where T: Registry + Send + 'static {}
   ```
   
   ​    为所有实现了 `Registry` 的都自动实现 `Sealed`, 通过这样的方式用户就可以自由的实现 `Extension`.  
上面的代码还可以改进一下, 
   
   `extension_type` 这个方法其实没必要写在 `Extension` 中, 如果放在 `Extension` 
中就会导致相同类型的插件不同的实现重复编写 `extension_type` 方法的实现. 可以把 `extension_type` 单独拿出来
   
   ```rust
   pub type ServiceChange = Change<String, ()>;
   pub type DiscoverStream = Receiver<Result<ServiceChange, StdError>>;
   #[async_trait]
   pub trait Registry {
       async fn register(&self, url: Url) -> Result<(), StdError>;
   
       async fn unregister(&self, url: Url) -> Result<(), StdError>;
   
       async fn subscribe(&self, url: Url) -> Result<DiscoverStream, StdError>;
   
       async fn unsubscribe(&self, url: Url) -> Result<(), StdError>;
   
       fn url(&self) -> &Url;
   }
   
   pub(crate) trait Sealed {}
   
   #[async_trait::async_trait]
   pub trait Extension: Sealed {
     
       type Target;
   
       fn name() -> String;
                
       // 给插件提供异步初始化的能力
       async fn create(url: &Url) -> Result<Self::Target, StdError>;
   }
   
   pub enum ExtensionType {
       Registry,
        Cluster,
        Route,
   }
   
   // 只要实现了 Registry 都自动实现 Sealed
   impl<T> Sealed for T where T: Registry + Send + 'static {}
   
   pub(crate) trait ExtensionMetaInfo {
       // 单独的 trait
       fn extension_type() -> ExtensionType;
   }
   
   
   // 给实现了 Registry 和 Extension 的类型自动实现 ExtensionMetaInfo
   // 固定类型为 Registry
   impl<T> ExtensionMetaInfo for T
   where
       T: Registry + Send + 'static,
       T: Extension<Target = Box<dyn Registry + Send + 'static>>,
   {
       fn extension_type() -> ExtensionType {
           ExtensionType::Registry
       }
   }
   ```
   
   
   
   ## 2.3 RegistryProxy 
   
   ​    前面把 `Registry` 给抽象出来了, 可以使用 `Box<dyn Proxy + Send + 'static'>` 来表示任意的 
`Registry`, 但这种方式呢, 不能`clone`, `clone` 在插件设计中也是一个比较重要的能力, 有了 `clone` 能力, 
就可以在任意的场景中使用, 灵活度比较高.
   
   ​    使用 `channel` 来给 `Box<dyn Proxy + Send + 'static'>` 增加 `clone` 能力
   
   
![registry_proxy](https://github.com/apache/dubbo-rust/assets/38708836/97021aea-738b-42e1-980a-fa9d4df0c6ad)
   
   
   ```rust
   pub mod proxy{
     
        pub(super) enum RegistryOpt {
           Register(Url, oneshot::Sender<Result<(), StdError>>),
           Unregister(Url, oneshot::Sender<Result<(), StdError>>),
           Subscribe(Url, oneshot::Sender<Result<DiscoverStream, StdError>>),
           UnSubscribe(Url, oneshot::Sender<Result<(), StdError>>),
       }
     
     
         #[derive(Clone)]
       pub struct RegistryProxy {
           sender: tokio::sync::mpsc::Sender<RegistryOpt>,
           url: Url,
       }
     
       impl RegistryProxy {
        
           async fn register(&self, url: Url) -> Result<(), StdError> {
               todo!()
           }
   
           async fn unregister(&self, url: Url) -> Result<(), StdError> {
               todo!()
           }
   
           async fn subscribe(&self, url: Url) -> Result<DiscoverStream, 
StdError> {
               todo!()
           }
   
           async fn unsubscribe(&self, url: Url) -> Result<(), StdError> {
               todo!()
           }
   
           fn url(&self) -> &Url {
              todo!()
           }
       }
     
     
   }
   ```
   
   ​    这样一来 `RegistryProxy` 就可以无所顾及的 `clone`了.
   
   
   
   ## 3.1 ExtensionDirectory
   
   ​    前面把基础抽象给做了,回到 `ExtensionDirectory` 设计上来.
   
   ```rust
   // 全局静态、不可变
   pub static EXTENSIONS: once_cell::sync::Lazy<ExtensionDirectory> =
       once_cell::sync::Lazy::new(|| ExtensionDirectory);
   
   struct ExtensionDirectory;
   
   impl ExtensionDirectory {
     
     pub async fn register<T>(&self) -> Result<(), StdError>  
     where
         T: Extension,
         T: ExtensionMetaInfo,
          T: ConvertToExtensionFactories,
     {
       todo!()
     }
     
     pub async fn load_registry(&self, url: Url) -> Result<RegistryProxy, 
StdError> {
       todo!()
     }
   }
   ```
   
   ​    大体结构就差不多了, 但这里还存在一个问题. 注册插件总得要把注册的东西放在一个地方,  `ExtensionDirectory` 
结构体里什么都没有, 需要一个数据结构来保存注册的东西.
   
   ​    注册的核心逻辑就是把插件的构造函数给保存起来, 可以根据这点得到
   
   ```rust
   
   #[derive(Default)]
   pub struct RegistryExtensionLoader {
       factories: HashMap<String, RegistryExtensionFactory>,
   }
   
   impl RegistryExtensionLoader {
       pub fn new() -> Self {
           Self {
               factories: HashMap::new(),
           }
       }
   
       pub async fn register(
           &mut self,
           extension_name: String,
           factory: RegistryExtensionFactory,
       ) {
           todo!()
       }
   
   
       pub async fn load(&mut self, url: &Url) -> Result<RegistryProxy, 
StdError> {
          todo!()
       }
   }
   ```
   
   ​    `RegistryExtensionLoader` 里的 `factories` 存放 `Registry` 不同实现的 `factory`, 
所以 `Extension` 的唯一标识符是比较重要的.
   
   ```rust
   // 全局静态、不可变
   pub static EXTENSIONS: once_cell::sync::Lazy<ExtensionDirectory> =
       once_cell::sync::Lazy::new(|| ExtensionDirectory);
   
   struct ExtensionDirectory {
       registry_extension_loader: RegistryExtensionLoader,
   }
   
   impl ExtensionDirectory {
     
     pub async fn register<T>(&self) -> Result<(), StdError>  
     where
         T: Extension,
         T: ExtensionMetaInfo, // 根据 ExtensionMetaInfo 得到类型信息, 来执行不同的注册逻辑例如: 
registry 执行 RegistryExtensionLoader, cluster 执行 ClusterExtensionLoader ...etc
                T: ConvertToExtensionFactories,
     {
       todo!()
     }
     
     pub async fn load_registry(&self, url: Url) -> Result<RegistryProxy, 
StdError> {
       todo!()
     }
   }
   ```
   
   ​    注意 `ExtensionDirectory` 里使用的是不可变引用, 而 `RegistryExtensionLoader` 
使用的是可变引用. 为了解决这个问题, 需要再次引入 `channel`
   
   ```rust
   pub static EXTENSIONS: once_cell::sync::Lazy<ExtensionDirectoryCommander> =
       once_cell::sync::Lazy::new(|| ExtensionDirectory::init());
   
   
   
   #[derive(Default)]
   struct ExtensionDirectory {
       registry_extension_loader: registry_extension::RegistryExtensionLoader,
   }
   
   impl ExtensionDirectory {
       fn init() -> ExtensionDirectoryCommander {
           let (tx, mut rx) = tokio::sync::mpsc::channel::<ExtensionOpt>(64);
   
           tokio::spawn(async move {
               let mut extension_directory = ExtensionDirectory::default();
   
               while let Some(extension_opt) = rx.recv().await {
                   match extension_opt {
                       ExtensionOpt::Register(
                           extension_name,
                           extension_factories,
                           extension_type,
                           tx,
                       ) => {
                           let result = extension_directory
                               .register(extension_name, extension_factories, 
extension_type)
                               .await;
                           let _ = tx.send(result);
                       }
                       ExtensionOpt::Load(url, extension_type, tx) => {
                           let result = extension_directory.load(url, 
extension_type).await;
                           let _ = tx.send(result);
                       }
                   }
               }
           });
   
           ExtensionDirectoryCommander { sender: tx }
       }
   
       async fn register(
           &mut self,
           extension_name: String,
           extension_factories: ExtensionFactories,
           extension_type: ExtensionType,
       ) -> Result<(), StdError> {
           match extension_type {
               ExtensionType::Registry => match extension_factories {
                   
ExtensionFactories::RegistryExtensionFactory(registry_extension_factory) => {
                       self.registry_extension_loader
                           .register(extension_name, registry_extension_factory)
                           .await;
                       Ok(())
                   }
               },
           }
       }
   
   
       async fn load(
           &mut self,
           url: Url,
           extension_type: ExtensionType,
       ) -> Result<Extensions, StdError> {
           match extension_type {
               ExtensionType::Registry => {
                   let extension = 
self.registry_extension_loader.load(&url).await;
                   match extension {
                       Ok(extension) => Ok(Extensions::Registry(extension)),
                       Err(err) => {
                           error!("load extension failed: {}", err);
                           Err(err)
                       }
                   }
               }
           }
       }
   }
   
   
   pub struct ExtensionDirectoryCommander {
       sender: tokio::sync::mpsc::Sender<ExtensionOpt>,
   }
   
   impl ExtensionDirectoryCommander {
       pub async fn register<T>(&self) -> Result<(), StdError>
       where
           T: Extension,
           T: ExtensionMetaInfo,
           T: ConvertToExtensionFactories,
       {
          todo!()
       }
   
       pub async fn remove<T>(&self) -> Result<(), StdError>
       where
           T: Extension,
           T: ExtensionMetaInfo,
       {
           todo!()
       }
   
       pub async fn load_registry(&self, url: Url) -> Result<RegistryProxy, 
StdError> {
           todo!()
       }
   }
   
   
   enum ExtensionOpt {
       Register(
           String,
           ExtensionFactories,
           ExtensionType,
           oneshot::Sender<Result<(), StdError>>,
       ),
       Load(
           Url,
           ExtensionType,
           oneshot::Sender<Result<Extensions, StdError>>,
       ),
   }
   ```
   
   整体结构如下
   
   
![all](https://github.com/apache/dubbo-rust/assets/38708836/b8a72fbd-e4a8-4d40-bf50-4c77f9abbd87)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to