Linchen-Xu commented on code in PR #13238:
URL: https://github.com/apache/dubbo/pull/13238#discussion_r1368349014
##########
dubbo-metadata/dubbo-metadata-report-redis/src/main/java/org/apache/dubbo/metadata/store/redis/RedisMetadataReport.java:
##########
@@ -207,4 +245,242 @@ private String
getMetadataStandalone(BaseMetadataIdentifier metadataIdentifier)
}
}
+ @Override
+ public boolean registerServiceAppMapping(String serviceInterface, String
defaultMappingGroup, String newConfigContent, Object ticket) {
+ try {
+ if (null!= ticket && !(ticket instanceof String)) {
+ throw new IllegalArgumentException("zookeeper publishConfigCas
requires stat type ticket");
+ }
+ String pathKey = buildMappingKey(defaultMappingGroup);
+
+ return storeMapping(pathKey, serviceInterface,
newConfigContent,(String)ticket);
+ } catch (Exception e) {
+ logger.warn(REGISTRY_ZOOKEEPER_EXCEPTION, "", "", "redis
publishConfigCas failed.", e);
+ return false;
+ }
+ }
+
+
+ private boolean storeMapping(String key, String field, String value,String
ticket) {
+ if (pool != null) {
+ return storeMappingStandalone(key, field, value, ticket);
+ } else {
+ return storeMappingInCluster(key, field, value, ticket);
+ }
+ }
+
+ private boolean storeMappingInCluster(String key, String field, String
value,String ticket) {
+ try (JedisCluster jedisCluster = new JedisCluster(jedisClusterNodes,
timeout, timeout, 2, password, new GenericObjectPoolConfig<>())) {
+ Object result =jedisCluster.eval(luaScript, 1, key, field,
ticket==null?"":ticket, value,buildPubSubKey(field));
+ return null==result;
+ } catch (Throwable e) {
+ String msg = "Failed to put " + key + ":" + field + " to redis " +
value + ", cause: " + e.getMessage();
+ logger.error(TRANSPORT_FAILED_RESPONSE, "", "", msg, e);
+ throw new RpcException(msg, e);
+ }
+ }
+
+ private boolean storeMappingStandalone(String key, String field, String
value,String ticket) {
+ try (Jedis jedis = pool.getResource()) {
+ Object result = jedis.eval(luaScript, 1, key, field,
ticket==null?"":ticket, value,buildPubSubKey(field));
+ return null==result;
+ } catch (Throwable e) {
+ String msg = "Failed to put " + key + ":" + field + " to redis " +
value + ", cause: " + e.getMessage();
+ logger.error(TRANSPORT_FAILED_RESPONSE, "", "", msg, e);
+ throw new RpcException(msg, e);
+ }
+ }
+
+ private String buildMappingKey(String defaultMappingGroup) {
+ return this.root + GROUP_CHAR_SEPARATOR + defaultMappingGroup;
+ }
+
+ private String buildPubSubKey(String serviceKey) {
+ return buildMappingKey(DEFAULT_MAPPING_GROUP) + GROUP_CHAR_SEPARATOR +
serviceKey;
+ }
+
+ @Override
+ public ConfigItem getConfigItem(String serviceKey, String group) {
+ String key = buildMappingKey(group);
+ String content = getMappingData(key, serviceKey);
+
+ return new ConfigItem(content, content);
+ }
+
+ private String getMappingData(String key, String field) {
+ if (pool != null) {
+ return getMappingDataStandalone(key, field);
+ } else {
+ return getMappingDataInCluster(key, field);
+ }
+ }
+
+ private String getMappingDataInCluster(String key, String field) {
+ try (JedisCluster jedisCluster = new JedisCluster(jedisClusterNodes,
timeout, timeout, 2, password, new GenericObjectPoolConfig<>())) {
+ return jedisCluster.hget(key, field);
+ } catch (Throwable e) {
+ String msg = "Failed to get " + key + ":" + field + " from redis
cluster , cause: " + e.getMessage();
+ logger.error(TRANSPORT_FAILED_RESPONSE, "", "", msg, e);
+ throw new RpcException(msg, e);
+ }
+ }
+
+ private String getMappingDataStandalone(String key, String field) {
+ try (Jedis jedis = pool.getResource()) {
+ return jedis.hget(key, field);
+ } catch (Throwable e) {
+ String msg = "Failed to get " + key + ":" + field + " from redis ,
cause: " + e.getMessage();
+ logger.error(TRANSPORT_FAILED_RESPONSE, "", "", msg, e);
+ throw new RpcException(msg, e);
+ }
+ }
+
+ @Override
+ public void removeServiceAppMappingListener(String serviceKey,
MappingListener listener) {
+ if (null != listenerMap.get(serviceKey)) {
+ MappingDataListener
mappingDataListener=listenerMap.get(serviceKey);
+ NotifySub notifySub=mappingDataListener.getNotifySub();
+ notifySub.removeListener(listener);
+ if(notifySub.isEmpty()){
+ mappingDataListener.shutdown();
+ listenerMap.remove(serviceKey,mappingDataListener);
+ }
+ }
+ }
+
+ @Override
+ public Set<String> getServiceAppMapping(String serviceKey, MappingListener
listener, URL url) {
+ if (null == listenerMap.get(serviceKey)) {
+ NotifySub notifySub = new NotifySub(serviceKey);
+ notifySub.addListener(listener);
+ MappingDataListener dataListener = new
MappingDataListener(buildPubSubKey(serviceKey), notifySub);
+ ConcurrentHashMapUtils.computeIfAbsent(listenerMap, serviceKey
+ , k -> dataListener);
+ dataListener.start();
+ }else{
+ listenerMap.get(serviceKey).getNotifySub().addListener(listener);
+ }
+ return this.getServiceAppMapping(serviceKey, url);
+ }
+
+ @Override
+ public Set<String> getServiceAppMapping(String serviceKey, URL url) {
+ String key = buildMappingKey(DEFAULT_MAPPING_GROUP);
+ return getAppNames(getMappingData(key, serviceKey));
+
+ }
+
+ @Override
+ public MetadataInfo getAppMetadata(SubscriberMetadataIdentifier
identifier, Map<String, String> instanceMetadata) {
+ String content = this.getMetadata(identifier);
+ return JsonUtils.toJavaObject(content, MetadataInfo.class);
+ }
+
+ @Override
+ public void publishAppMetadata(SubscriberMetadataIdentifier identifier,
MetadataInfo metadataInfo) {
+ this.storeMetadata(identifier, metadataInfo.getContent());
+ }
+
+ @Override
+ public void unPublishAppMetadata(SubscriberMetadataIdentifier identifier,
MetadataInfo metadataInfo) {
+ this.deleteMetadata(identifier);
+ }
+
+ private static class NotifySub extends JedisPubSub {
+
+ private String serviceKey;
+ private Set<MappingListener> listeners = new HashSet<>();
+
+ public NotifySub(String serviceKey) {
+ this.serviceKey = serviceKey;
+ }
+
+ public void addListener(MappingListener listener) {
+ this.listeners.add(listener);
+ }
+
+ public void removeListener(MappingListener listener) {
+ this.listeners.remove(listener);
+ }
+
+ public Boolean isEmpty(){
+ return this.listeners.isEmpty();
+ }
+
+ @Override
+ public void onMessage(String key, String msg) {
+ logger.info("sub from redis " + key + " message:" + msg);
+ MappingChangedEvent mappingChangedEvent = new
MappingChangedEvent(serviceKey, getAppNames(msg));
+ if(!listeners.isEmpty()){
+ listeners.forEach(listener ->
listener.onEvent(mappingChangedEvent));
+ }
+
+ }
+
+ @Override
+ public void onPMessage(String pattern, String key, String msg) {
+ onMessage(key, msg);
+ }
+
+ @Override
+ public void onPSubscribe(String pattern, int subscribedChannels) {
+ super.onPSubscribe(pattern, subscribedChannels);
+ }
+ }
+
+ private class MappingDataListener extends Thread {
+
+ private String path;
+
+ private volatile Jedis currentClient;
Review Comment:
This variable is assigned but never accessed.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]