欢迎访问昆山宝鼎软件有限公司网站! 设为首页 | 网站地图 | XML | RSS订阅 | 宝鼎邮箱 | 后台管理


新闻资讯

MENU

软件开发知识
原文出处: 刘正阳

配景

我们直接跑到最底层,看看kafka的网络层处理惩罚是怎么处理惩罚的。因为Java的NIO照旧偏底层,不能直接用来做应用开拓,所以一般都利用像netty的框架可能凭据本身的需要封装一些nio,让上层业务不消体贴网络处理惩罚的细节,只需要建设处事监听端口、接管请求、处理惩罚请求、写返回就可以了。我在看netty、thrift等涉及到网络的Java框架时较量喜欢去看他们的nio是怎么封装的,这里也是可以或许浮现作者程度的处所。java nio的根基元素为Selector、Channel、ByteBuffer。
我们从server和client两头别离阐明。

kafka server端在org.apache.kafka.common.network中举办了封装。
就像package.html内里写的。

>
The network server for kafka. No application specific code here, just general network server stuff.

The classes Receive and Send encapsulate the incoming and outgoing transmission of bytes. A Handler
is a mapping between a Receive and a Send, and represents the users hook to add logic for mapping requests
to actual processing code. Any uncaught exceptions in the reading or writing of the transmissions will result in
the server logging an error and closing the offending socket. As a result it is the duty of the Handler
implementation to catch and serialize any application-level errors that should be sent to the client.

This slightly lower-level interface that models sending and receiving rather than requests and responses
is necessary in order to allow the send or receive to be overridden with a non-user-space writing of bytes
using FileChannel.transferTo.

启动进程

网络层的启动在SocketServer.kafka中, 属于KafkaServer启动进程中的一部门
首先看一下server.properties中的网络相关设置

  • listener就是当地的hostname和端标语, 没有的话会利用InetAddress和默认值(9092)
  • num.network.threads 类比netty中的worker threads num,是认真处理惩罚请求的线程的数量,nio的reactor模式一般是前面有一个Acceptor认真毗连的成立,成立后Reactor将各类读写事件分发给各个Handler处理惩罚,这个num是分发处理惩罚读写事件的io的线程数。
  • num.io.threads 就是设置的Handler的数量,昆山软件公司,每个Handler一般都是一个线程(也叫IOThread)来处理惩罚。
  • 就是在一个毗连上可以  <a href=昆山软件定制开拓 持续发送多个应用层的请求" class="aligncenter size-large wp-image-28814" title="multi-reactor" src="/uploads/allimg/c180521/152D4B0C3P-1B08.png" />

  • queued.max.requests 在Handler处理惩罚完成前可以或许列队的request的数量,相当于应用层的request buffer
  • socket.send.buffer.bytes socket options里的sendbuffer
  • socket.receive.buffer.bytes receive buffer
  • socket.request.max.bytes 请求的最大的byte巨细,因为接管请求时需要申请空间来存储请求,假如太大会导致oom,这是一个掩护法子。
  • # The number of threads that the server uses for receiving requests from the network and sending responses to the network
    num.network.threads=3
    # The number of threads that the server uses for processing requests, which may include disk I/O
    num.io.threads=8
    # The number of queued request allowed before blocking the network threads
    #queued.max.requests
    # The send buffer (SO_SNDBUF) used by the socket server
    socket.send.buffer.bytes=102400
    # The receive buffer (SO_RCVBUF) used by the socket server
    socket.receive.buffer.bytes=102400
    # The maximum size of a request that the socket server will accept (protection against OOM)
    socket.request.max.bytes=104857600

    SocketServer

    这个类上的注释叙述了kafka server的io线程模子

    这个类上的注释叙述了kafka server的io线程模子
    
    /**
     * An NIO socket server. The threading model is
     *   1 Acceptor thread that handles new connections
     *   Acceptor has N Processor threads that each have their own selector and read requests from sockets
     *   M Handler threads that handle requests and produce responses back to the processor threads for writing.
     */

    一共三种线程。一个Acceptor线程认真处理惩罚新毗连请求,会有N个Processor线程,每个都有本身的Selector,认真从socket中读取请求和将返回功效写回。然后会有M个Handler线程,认真处理惩罚请求,而且将功效返回给Processor。
    将Acceptor和Processor线程分隔的目标是为了制止读写频繁影响新毗连的吸收。

    SocketServer初始化

    SockerServer建设的时候通过server.properties和默认的设置中获取设置,如numNetworkThread(num.network.threads,也就是线程模子中的N)、

    建设processor数组、acceptorMap(因为大概会在多个Endpoint吸收请求)、memoryPool(SimpleMemoryPool里主要做的工作是统计监控ByteBuffer的利用)、requestChanne等 。

    private val endpoints = config.listeners.map(l => l.listenerName -> l).toMap
    private val numProcessorThreads = config.numNetworkThreads
    private val maxQueuedRequests = config.queuedMaxRequests
    private val totalProcessorThreads = numProcessorThreads * endpoints.size
    private val maxConnectionsPerIp = config.maxConnectionsPerIp
    private val maxConnectionsPerIpOverrides = config.maxConnectionsPerIpOverrides
    this.logIdent = "[Socket Server on Broker " + config.brokerId + "], "
    private val memoryPoolSensor = metrics.sensor("MemoryPoolUtilization")
    private val memoryPoolDepletedPercentMetricName = metrics.metricName("MemoryPoolAvgDepletedPercent", "socket-server-metrics")
    memoryPoolSensor.add(memoryPoolDepletedPercentMetricName, new Rate(TimeUnit.MILLISECONDS))
    private val memoryPool = if (config.queuedMaxBytes > 0) new SimpleMemoryPool(config.queuedMaxBytes, config.socketRequestMaxBytes, false, memoryPoolSensor) else MemoryPool.NONE
    val requestChannel = new RequestChannel(totalProcessorThreads, maxQueuedRequests)
    private val processors = new Array[Processor](totalProcessorThreads)
    private[network] val acceptors = mutable.Map[EndPoint, Acceptor]()
    private var connectionQuotas: ConnectionQuotas = _

    RequestChannel