你好!首先感谢回复。嗯,之后我也思考了这个问题,也感觉阿里的开发人员是为了这方面做考虑的。但是个人仍然认为,这部分代码是不安全的。可能会导致新注册上来的broker无法第一时间被scanNotActiveBroker感知到,因为执行scanNotActiveBroker任务的线程不会实时去更新工作线程中的数据,有一个内存可见性的问题。当然和你说的相同,这里的并发是很低的,且有可能在一个集群搭建起来之后后续都不会有新的broker进来,所以这种可见性问题基本不会出现。
在2018-03-15,"老胡" <[email protected]> 写道:-----原始邮件-----
 发件人: "老胡" <[email protected]>
 发送时间: 2018年3月15日 星期四
 收件人: users <[email protected]>, users <[email protected]>
 主题: 回复:rocketmq线程安全问题

你好! 这里是存在线程安全的问题。
scanNotActiveBroker 只与 unregisterBroker和 registerBroker的之间是线程不安全的。
scanNotActiveBroker 每10秒执行一次,而unregisterBroker 与 registerBroker 可能很久才会触发。甚至不会触发。
出现线程安全的几率很低,
scanNotActiveBroker  锁持有时间很长,频率高
scanNotActiveBroker 报错,可以等下下次执行
每10秒执行一次,那么会哟加锁,解锁的操作,比较耗时,在上面的原有下,不加锁是一种好的方式。
仅仅是个人理解。




------------------ 原始邮件 ------------------
发件人: "鲁仕林"<[email protected]>;
发送时间: 2018年3月14日(星期三) 下午3:23
收件人: "users"<[email protected]>;
主题: rocketmq线程安全问题




this.brokerLiveTable = new HashMap<String, BrokerLiveInfo>(256); */
public void scanNotActiveBroker() {
    Iterator<Entry<String, BrokerLiveInfo>> it = 
this.brokerLiveTable.entrySet().iterator();
    while (it.hasNext()) {
        Entry<String, BrokerLiveInfo> next = it.next();
        long last = next.getValue().getLastUpdateTimestamp();
        if ((last + BROKER_CHANNEL_EXPIRED_TIME) < System.currentTimeMillis()) {
            RemotingUtil.closeChannel(next.getValue().getChannel());
            it.remove();
            log.warn("The broker channel expired, {} {}ms", next.getKey(), 
BROKER_CHANNEL_EXPIRED_TIME);
            this.onChannelDestroy(/**brokerAddress*/next.getKey(), 
/**长连接channel*/next.getValue().getChannel());
        }
    }
}首先这是rocketmq中扫描存活broker的一段代码,我认为这段代码是存在线程安全性问题。在一些情况下,如在使用Iterator便利这个map时,有新的broker注册进来,会抛出java.util.ConcurrentModificationException。







Reply via email to