Linchen-Xu commented on code in PR #13238:
URL: https://github.com/apache/dubbo/pull/13238#discussion_r1368349014


##########
dubbo-metadata/dubbo-metadata-report-redis/src/main/java/org/apache/dubbo/metadata/store/redis/RedisMetadataReport.java:
##########
@@ -207,4 +245,242 @@ private String 
getMetadataStandalone(BaseMetadataIdentifier metadataIdentifier)
         }
     }
 
+    @Override
+    public boolean registerServiceAppMapping(String serviceInterface, String 
defaultMappingGroup, String newConfigContent, Object ticket) {
+        try {
+            if (null!= ticket && !(ticket instanceof String)) {
+                throw new IllegalArgumentException("zookeeper publishConfigCas 
requires stat type ticket");
+            }
+            String pathKey = buildMappingKey(defaultMappingGroup);
+
+            return storeMapping(pathKey, serviceInterface, 
newConfigContent,(String)ticket);
+        } catch (Exception e) {
+            logger.warn(REGISTRY_ZOOKEEPER_EXCEPTION, "", "", "redis 
publishConfigCas failed.", e);
+            return false;
+        }
+    }
+
+
+    private boolean storeMapping(String key, String field, String value,String 
ticket) {
+        if (pool != null) {
+            return storeMappingStandalone(key, field, value, ticket);
+        } else {
+            return storeMappingInCluster(key, field, value, ticket);
+        }
+    }
+
+    private boolean storeMappingInCluster(String key, String field, String 
value,String ticket) {
+        try (JedisCluster jedisCluster = new JedisCluster(jedisClusterNodes, 
timeout, timeout, 2, password, new GenericObjectPoolConfig<>())) {
+            Object result =jedisCluster.eval(luaScript, 1, key, field, 
ticket==null?"":ticket, value,buildPubSubKey(field));
+            return null==result;
+        } catch (Throwable e) {
+            String msg = "Failed to put " + key + ":" + field + " to redis " + 
value + ", cause: " + e.getMessage();
+            logger.error(TRANSPORT_FAILED_RESPONSE, "", "", msg, e);
+            throw new RpcException(msg, e);
+        }
+    }
+
+    private boolean storeMappingStandalone(String key, String field, String 
value,String ticket) {
+        try (Jedis jedis = pool.getResource()) {
+            Object result = jedis.eval(luaScript, 1, key, field, 
ticket==null?"":ticket, value,buildPubSubKey(field));
+            return null==result;
+        } catch (Throwable e) {
+            String msg = "Failed to put " + key + ":" + field + " to redis " + 
value + ", cause: " + e.getMessage();
+            logger.error(TRANSPORT_FAILED_RESPONSE, "", "", msg, e);
+            throw new RpcException(msg, e);
+        }
+    }
+
+    private String buildMappingKey(String defaultMappingGroup) {
+        return this.root + GROUP_CHAR_SEPARATOR + defaultMappingGroup;
+    }
+
+    private String buildPubSubKey(String serviceKey) {
+        return buildMappingKey(DEFAULT_MAPPING_GROUP) + GROUP_CHAR_SEPARATOR + 
serviceKey;
+    }
+
+    @Override
+    public ConfigItem getConfigItem(String serviceKey, String group) {
+        String key = buildMappingKey(group);
+        String content = getMappingData(key, serviceKey);
+
+        return new ConfigItem(content, content);
+    }
+
+    private String getMappingData(String key, String field) {
+        if (pool != null) {
+            return getMappingDataStandalone(key, field);
+        } else {
+            return getMappingDataInCluster(key, field);
+        }
+    }
+
+    private String getMappingDataInCluster(String key, String field) {
+        try (JedisCluster jedisCluster = new JedisCluster(jedisClusterNodes, 
timeout, timeout, 2, password, new GenericObjectPoolConfig<>())) {
+            return jedisCluster.hget(key, field);
+        } catch (Throwable e) {
+            String msg = "Failed to get " + key + ":" + field + " from redis 
cluster , cause: " + e.getMessage();
+            logger.error(TRANSPORT_FAILED_RESPONSE, "", "", msg, e);
+            throw new RpcException(msg, e);
+        }
+    }
+
+    private String getMappingDataStandalone(String key, String field) {
+        try (Jedis jedis = pool.getResource()) {
+            return jedis.hget(key, field);
+        } catch (Throwable e) {
+            String msg = "Failed to get " + key + ":" + field + " from redis , 
cause: " + e.getMessage();
+            logger.error(TRANSPORT_FAILED_RESPONSE, "", "", msg, e);
+            throw new RpcException(msg, e);
+        }
+    }
+
+    @Override
+    public void removeServiceAppMappingListener(String serviceKey, 
MappingListener listener) {
+        if (null != listenerMap.get(serviceKey)) {
+            MappingDataListener 
mappingDataListener=listenerMap.get(serviceKey);
+            NotifySub notifySub=mappingDataListener.getNotifySub();
+            notifySub.removeListener(listener);
+            if(notifySub.isEmpty()){
+                mappingDataListener.shutdown();
+                listenerMap.remove(serviceKey,mappingDataListener);
+            }
+        }
+    }
+
+    @Override
+    public Set<String> getServiceAppMapping(String serviceKey, MappingListener 
listener, URL url) {
+        if (null == listenerMap.get(serviceKey)) {
+            NotifySub notifySub = new NotifySub(serviceKey);
+            notifySub.addListener(listener);
+            MappingDataListener dataListener = new 
MappingDataListener(buildPubSubKey(serviceKey), notifySub);
+            ConcurrentHashMapUtils.computeIfAbsent(listenerMap, serviceKey
+                , k -> dataListener);
+            dataListener.start();
+        }else{
+            listenerMap.get(serviceKey).getNotifySub().addListener(listener);
+        }
+        return this.getServiceAppMapping(serviceKey, url);
+    }
+
+    @Override
+    public Set<String> getServiceAppMapping(String serviceKey, URL url) {
+        String key = buildMappingKey(DEFAULT_MAPPING_GROUP);
+        return getAppNames(getMappingData(key, serviceKey));
+
+    }
+
+    @Override
+    public MetadataInfo getAppMetadata(SubscriberMetadataIdentifier 
identifier, Map<String, String> instanceMetadata) {
+        String content = this.getMetadata(identifier);
+        return JsonUtils.toJavaObject(content, MetadataInfo.class);
+    }
+
+    @Override
+    public void publishAppMetadata(SubscriberMetadataIdentifier identifier, 
MetadataInfo metadataInfo) {
+        this.storeMetadata(identifier, metadataInfo.getContent());
+    }
+
+    @Override
+    public void unPublishAppMetadata(SubscriberMetadataIdentifier identifier, 
MetadataInfo metadataInfo) {
+        this.deleteMetadata(identifier);
+    }
+
+    private static class NotifySub extends JedisPubSub {
+
+        private String serviceKey;
+        private Set<MappingListener> listeners = new HashSet<>();
+
+        public NotifySub(String serviceKey) {
+            this.serviceKey = serviceKey;
+        }
+
+        public void addListener(MappingListener listener) {
+            this.listeners.add(listener);
+        }
+
+        public void removeListener(MappingListener listener) {
+            this.listeners.remove(listener);
+        }
+
+        public Boolean isEmpty(){
+            return this.listeners.isEmpty();
+        }
+
+        @Override
+        public void onMessage(String key, String msg) {
+            logger.info("sub from redis " + key + " message:" + msg);
+            MappingChangedEvent mappingChangedEvent = new 
MappingChangedEvent(serviceKey, getAppNames(msg));
+            if(!listeners.isEmpty()){
+                listeners.forEach(listener -> 
listener.onEvent(mappingChangedEvent));
+            }
+
+        }
+
+        @Override
+        public void onPMessage(String pattern, String key, String msg) {
+            onMessage(key, msg);
+        }
+
+        @Override
+        public void onPSubscribe(String pattern, int subscribedChannels) {
+            super.onPSubscribe(pattern, subscribedChannels);
+        }
+    }
+
+    private class MappingDataListener extends Thread {
+
+        private String path;
+
+        private volatile Jedis currentClient;

Review Comment:
   This variable is assigned but never accessed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to