欢迎访问昆山宝鼎软件有限公司网站! 设为首页 | 网站地图 | XML | RSS订阅 | 宝鼎邮箱 | 后台管理


新闻资讯

MENU

软件开发知识
原文出处: 谢晞鸣

1. Namesrv 简介

Namesrv 可以领略为一个注册中心, 整个Namesrv的代码很是简朴,主要包括两块成果:

  • 打点一些 KV 的设置
  • 打点一些 Topic、Broker的注册信息
  • client 中的 namesrv <a href=昆山软件开拓 地点列表是怎么来的呢" title="600" src="http://www.importnew.com/https:/fdx321.github.io/images/%E3%80%90RocketMQ%E6%BA%90%E7%A0%81%E5%AD%A6%E4%B9%A0%E3%80%912-Namesrv_1.png" />

    2. Namesrv 启动进程

    启动进程主要涉及 NamesrvStartup/NamesrvController 两个类, NamesrvStartup 认真理会呼吁行的一些参数到各类 Config 工具中(NamesrvConfig/NettyServerConfig等),假如呼吁行参数中带有设置文件的路径,也会从设置文件中读取设置到各类 Config 工具中,然后初始化 NamesrvController,设置shutdownHook, 启动 NamesrvController。 NamesrvController 会去初始化和启动各个组件,主要是:

  • 建设NettyServer,注册 requestProcessor,用于处理惩罚差异的网络请求
  • 启动 NettyServer
  • 启动各类 scheduled task.
  • 不只仅 Namesrv 是这样,劳务派遣管理系统,其他模块在启动进程中也都是 startup/controller/config 一起完成这样的套路。

    3. Namesrv 主要组件

  • Processor 线程池,nettyServer 吸收到请求后,封装成任务提交到该线程池。
    remoting 模块维护了这样一个 processorTable:
  • HashMap<Integer/* request code */, Pair<NettyRequestProcessor, ExecutorService>> processorTable

    一个 processor 可以处理惩罚多个 request code, 多个 processor 也可以共用一个线程池。对付 Namesrv, 只有一个 processor 线程池,给两个 Processor 共享。

  • DefaultRequestProcessor(Namesrv 尚有一个 ClusterTestRequestProcessor 担任了该 Processor,在 clusterTest enable的情* 况下利用它来 getRouteInfoByTopic),用来处理惩罚 namesrv 吸收到的所有 RequestCode, Processor 内部会按照差异的RequestCode 挪用差异的要领。
  • KVConfigManager, 维护了一些KV方法的设置数据,可以按照请求,昆山软件公司,昆山软件开发,执行添加、删除、查询等操纵
  • RouteInfoManager, 维护了topic/broker/cluster/filter这些对象的路由信息,同样支持增删改查的操纵
  • schedued 线程,按必然的频率做两个工作,扫描不活泼的broker;打印所有KV设置信息
  • 4. 以broker注册为例看下Namesrv的事情进程

    1. DefaultRequestProcessor 处理惩罚来自 NettyServer的 [RemotingCommand] request, 假如 request.getCode 是 RequestCode.REGISTER_BROKER, 就去注册。这里会按照request.version来判定,从V3_0_11 开始支持了FilterServer。
    2. 从 request 解码获得 RegisterBrokerRequestHeader, 包括以下字段:
    3. brokerName, // 默认是BrokerConfig里的得到的locakHostName
    4. brokerAddr, //brokerConfig.getBrokerIP1() + “:” + nettyServerConfig.getListenPort()
    5. clusterName, //默认是BrokerConfig的”DefaultCluster”
    6. haServerAddr, //brokerConfig.getBrokerIP2() + “:” + messageStoreConfig.getHaListenPort()
    7. brokerId, //假如是MASTER,就是MixAll.MASTER_ID(也就0),不然就是其他
    8. 从 request.body 解码获得 RegisterBrokerBody, RegisterBrokerBody 包括以下内容,用JSON的方法来描写吧:
      {
        "topicConfigSerializeWrapper": {
            "topicConfigTable":{
               "topic_xxx":{
                 "defaultReadQueueNums":"16",
                "defaultWriteQueueNums":"16",
                "topicName":"xxx",
                "readQueueNums":"",
                "writeQueueNums":"",
                "perm":"",
                "topicFilterType":"",
                "topicSysFlag":"",
                "order":""
               },
            },
            "dataVersion":{
               "timestamp":"xxxx",
               "counter":"xxxx"
            }
         },
        "filterServerList":[
           "",//filterServerAddr
        ]
      }
    9. 在 clusterAddrTable 中新增一笔记录
    10. 在 brokerAddrTable 中新增一笔记录,这里会构建一个BrokerData
      {
        "cluster":"xxx",
        "brokerName":"xxx",
        "brokerAddrs":{
           "brokerId_xx":"broker address xxx"
         }
      }
    11. 假如是第一次注册可能topicConfig产生了改观,会去更新topicQueueTable
    12. 在brokerLiveTable新增该broker
    13. 在filterServerTable新增这些filterServer的地点列表

    5.其他

    以上内容看下来,namesrv 是一个无状态的应用,可以程度任意扩展。每一个 broker 城市和所有的 namesrv 保持长毗连(有个scheduled task会按必然频率给所有namesrv做register broker的操纵),所以 namesrv 之间没有主从干系,也不需要复制数据。client(producer/consumer) 随机选一个 namesrv 毗连。client 中的 namesrv 地点列表是怎么来的呢,有两种方法:

    1. 通过呼吁行或设置文件在启动的时候得到的
    2. 通过 Scheduled task,按必然的频率从一个 web 处事 fetch的(web处事可以自建),假如有改观,就更新这个 namesrv 地点列表。
      client 选择 namesrv的进程如下, index递增取模,然并不是每次都这么干,取到后会缓存起来。
    if (addrList != null && !addrList.isEmpty()) {
        for (int i = 0; i < addrList.size(); i++) {
            int index = this.namesrvIndex.incrementAndGet();
            index = Math.abs(index);
            index = index % addrList.size();
            String newAddr = addrList.get(index);
            this.namesrvAddrChoosed.set(newAddr);
            Channel channelNew = this.createChannel(newAddr);
            if (channelNew != null)
                return channelNew;
        }
    }

    看到这里我发生了疑问,那岂不是每个 client 启动的时候都取的是第一个 namesrv,它不会压力很大吗,厥后发明 namesrvIndex 的初始值是随机的。

    以上所有扯淡都是基于源码 https://github.com/apache/incubator-rocketmq (tag:rocketmq-all-4.1.0-incubating)所贴代码有所删减。