Kafka技术内幕-生产者

Posted by keys961 on June 19, 2020

1. Kafka生产者客户端

Kafka生产者客户端主要以KafkaProducer类。

1.1. 生产者消息发送及发送前准备

消息发送主要调用的是KafkaProducer#send方法发送消息,消息发送支持同步和异步,因为它返回Future

  • 同步:立刻调用Future#get
  • 异步:直接返回,发送完成后会执行回调函数

发送消息时,会做下面的事情进行准备:

  1. 选择分区:

    某个topic某个分区的信息记录在PartitionInfo中:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    
     public class PartitionInfo {
         private final String topic; // topic名
         private final int partition; // 分区号
         private final Node leader; // 该分区主副本节点
         private final Node[] replicas; // 该分区的所有副本节点
         private final Node[] inSyncReplicas; // 该分区正在同步的节点
         private final Node[] offlineReplicas; // 该分区离线的节点
     		// ...
     }
    

    生产者会指定topic的某一个分区,将消息生产出去。分区算法接口是Partitioner#partition方法,默认实现位于DefaultPartitioner类,其算法很简单,源码就不贴了:

    • 若消息没指定key:round-robin
    • 若消息指定key:对key进行散列并取余
  2. 累计消息并批量发送

    为提高吞吐,Kafka生产者使用批量消息发送,这涉及批量缓存的RecordAccumulator类,该类会为每个topic的每个分区维护一个双端队列(即Deque<ProducerBatch>),用于缓存消息以批量发送。

    当消息到来时:

    • 若队列中不存在ProducerBatch,则创建新的该实例,并追加消息
    • 若队列中存在ProducerBatch,则找队列中最后一个实例,尝试追加消息
      • 若成功,则返回结果
      • 若不成功,则创建新的该实例,并追加消息
    • 追加完毕后,会返回一个RecordAppendResult,当队列长度大于1或者队列最后一个批次满了时,唤醒客户端将队列头的消息批次发出

1.2. 消息发送线程

批次满了后,会调用Sender#wakeupSender实际上时一个Runnable,被另一个线程执行。

相关源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
public void run() {
    while (running) {
        try {
            runOnce();
        } catch (Exception e) {
            log.error("Uncaught error in kafka producer I/O thread: ", e);
        }
    }
    // ...
}

void runOnce() {
    // ...
    long currentTimeMs = time.milliseconds();
    long pollTimeout = sendProducerData(currentTimeMs); // 获取批次并将其转换成请求,填入客户端发送队列
    client.poll(pollTimeout, currentTimeMs); // 执行网络消息的发送和接收
}

private long sendProducerData(long now) {
    Cluster cluster = metadata.fetch();
    RecordAccumulator.ReadyCheckResult result = this.accumulator.ready(cluster, now);
    // ...
    // 读取缓存数据,得到请求列表(分区-请求列表)
    Map<Integer, List<ProducerBatch>> batches = this.accumulator.drain(cluster, result.readyNodes, this.maxRequestSize, now);
    // ...
    // ...
    // 计算selector poll超时时间
    long pollTimeout = Math.min(result.nextReadyCheckDelayMs, notReadyTimeout);
    pollTimeout = Math.min(pollTimeout, this.accumulator.nextExpiryTimeMs() - now);
    pollTimeout = Math.max(pollTimeout, 0);
    if (!result.readyNodes.isEmpty()) {
        log.trace("Nodes with data ready to send: {}", result.readyNodes);    
        pollTimeout = 0; // 一般是0
    }
    // 异步发送消息(仅将请求放入队列中)
    sendProduceRequests(batches, now);
    return pollTimeout;
}

总体而言,生产者发送消息的流程如下:

  • 主线程将消息缓存起来用于批量发送,若批次满,会激活客户端以发送请求
  • 发送线程通过KafkaClient#ready方法找到已经准备好的服务端节点,并调用KafkaClient#connect建立连接
  • 发送线程通过RecordAccumulator#drain获取整理好的批次记录,用于发送
  • 发送线程将批次记录转换成客户端请求,并将其发送给服务端

封装请求的方法在Sender#sendProduceRequest方法,里面关键的是ProduceRequest.Builder构建请求,具体源码不贴了。

1.3. 处理网络连接

上面所述使用KafkaClient进行ready, connect, send, poll,而这部分的实现是NetworkClient

  • ready:从RecordAccumulator获取准备好的节点,并建立连接
  • send:创建请求并将其暂存到通道中
  • poll:对Selector进行poll,进行真正的读写操作(发送请求,读取响应)

其底层使用的就是NIO,使用的是多路复用的模型,底层涉及到SelectorKafkaChannel等对底层的封装。这部分就不再多叙述了。

2. 服务端网络处理

Kafka服务端也是使用了多路复用的技术,这部分可以参考Netty, Redis等中间件,原理就不多叙述了。

Kafka的线程模型十分相近的是Netty Server,关于Netty线程模型,可参考这里

这里摘取Kafka Confluence上的服务端内部架构图,注意网络层和API层:

k_internal

2.1. 请求与响应流程

这里对应的代码截取如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
// In Processor
override def run(): Unit = {
  startupComplete()
  try {
    while (isRunning) {
      try {
        // 1. 建立和配置新连接
        configureNewConnections()
        // 2. 检查响应队列是否有响应要写出,若有则注册OP_WRITE,并将响应暂存到通道里
        processNewResponses()
        // 3. 执行读写操作(读取请求,写出响应)
        poll()
        // 4. 若读取完毕,移除OP_READ
        processCompletedReceives()
        // 5. 若写出完毕,移除OP_WRITE,注册OP_READ
        processCompletedSends()
        // 6. 处理断连和过多的连接
        processDisconnected()
        closeExcessConnections()
      } 
      // ...
    }
  }
}

2.2. 请求通道的请求队列和响应队列

服务端读取完请求后:

  1. 将请求塞入一个请求通道里(RequestChannel),即请求队列,内部实际上就是一个阻塞队列(ArrayBlockingQueue
  2. 然后请求处理线程(KafkaRequestHandler)和KafkaApis就可以获取请求,并处理请求
  3. 响应会塞入响应队列里(LinkedBlockingDeque),等待OP_WRITE就绪后写回客户端

而对于队列架构上:

  • 每个处理器的Selector有自己的响应队列,但是共享1个请求队列

  • 此外,某个请求被某个处理器读取并处理,其对应的响应也是由相同的处理器返回的。

2.3. 请求处理

服务端会为请求处理创建一个线程池KafkaRequestHandlerPool,里面执行的就是KafkaRequestHandler任务。这些线程会共用一个请求队列,不断轮询拉取请求,然后交给KafkaApis处理。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
class KafkaRequestHandlerPool(val brokerId: Int,
                              val requestChannel: RequestChannel,
                              val apis: KafkaApis,
                              time: Time,
                              numThreads: Int,
                              requestHandlerAvgIdleMetricName: String,
                              logAndThreadNamePrefix : String) extends Logging with KafkaMetricsGroup {
  // ...
  private val threadPoolSize: AtomicInteger = new AtomicInteger(numThreads)
  // ...
  val runnables = new mutable.ArrayBuffer[KafkaRequestHandler](numThreads)
  for (i <- 0 until numThreads) {
    createHandler(i) // 创建KafkaRequestHandler任务线程
  }

  def createHandler(id: Int): Unit = synchronized {
    // 创建KafkaRequestHandler任务线程
    runnables += new KafkaRequestHandler(id, brokerId, aggregateIdleMeter, threadPoolSize, requestChannel, apis, time)
    // 启动线程,线程是守护线程
    KafkaThread.daemon(logAndThreadNamePrefix + "-kafka-request-handler-" + id, runnables(id)).start()
  }
  // ...
}

class KafkaRequestHandler(id: Int,
                          brokerId: Int,
                          val aggregateIdleMeter: Meter,
                          val totalHandlerThreads: AtomicInteger,
                          val requestChannel: RequestChannel,
                          apis: KafkaApis,
                          time: Time) extends Runnable with Logging {
  // ...
  def run(): Unit = {
    while (!stopped) {
      // ...
      val req = requestChannel.receiveRequest(300) // 拉取请求,最多等待0.3s
      // ...
      req match {
        case RequestChannel.ShutdownRequest =>
          // 关机请求,则执行关闭任务
          debug(s"Kafka request handler $id on broker $brokerId received shut down command")
          shutdownComplete.countDown()
          return

        case request: RequestChannel.Request =>
          // 普通请求,则交给KafkaApis处理
          try {
            // ...
            // 交给KafkaApis处理
            apis.handle(request)
          } catch {
            // ...
          } finally {
            // ...
          }

        case null => // continue
      }
    }
    // ...
  }
  // ...
}

从上面可知,请求处理的入口就在KafkaApis#handle方法,它是请求处理的统一入口:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
class KafkaApis(val requestChannel: RequestChannel,
                val replicaManager: ReplicaManager,
                val adminManager: AdminManager,
                val groupCoordinator: GroupCoordinator,
                val txnCoordinator: TransactionCoordinator,
                val controller: KafkaController,
                val zkClient: KafkaZkClient,
                val brokerId: Int,
                val config: KafkaConfig,
                val metadataCache: MetadataCache,
                val metrics: Metrics,
                val authorizer: Option[Authorizer],
                val quotas: QuotaManagers,
                val fetchManager: FetchManager,
                brokerTopicStats: BrokerTopicStats,
                val clusterId: String,
                time: Time,
                val tokenManager: DelegationTokenManager) extends Logging {
  // ...
  
  // 入口函数
  def handle(request: RequestChannel.Request): Unit = {
    try {
      // ...
      // 根据不同的请求类型进行处理
      request.header.apiKey match {
        case ApiKeys.PRODUCE => handleProduceRequest(request)
        case ApiKeys.FETCH => handleFetchRequest(request)
        // 其他不同类型,略...
      }
    } catch {
      // ...
    } finally {
      // ...
    }
}

处理请求完成后,响应也会被创建,并调用KafkaApis#sendResponse方法,将响应推入到对应处理器的响应队列中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
// In KafkaApis
private def sendResponse(request: RequestChannel.Request,
                         responseOpt: Option[AbstractResponse],
                         onComplete: Option[Send => Unit]): Unit = {
  // ...
  // 创建响应
  val response = responseOpt match {
    case Some(response) =>
      val responseSend = request.context.buildResponse(response)
      val responseString =
        if (RequestChannel.isRequestLoggingEnabled) Some(response.toString(request.context.apiVersion))
        else None
      new RequestChannel.SendResponse(request, responseSend, responseString, onComplete)
    case None =>
      new RequestChannel.NoOpResponse(request)
  }
  // 将响应压入响应队列
  sendResponse(response)
}

private def sendResponse(response: RequestChannel.Response): Unit = {
  // 将响应压入响应队列
  requestChannel.sendResponse(response)
}

// In RequestChannel
def sendResponse(response: RequestChannel.Response): Unit = {
  // ...
  // 寻找请求和响应一样的处理器,将响应压入对应的响应队列里
  val processor = processors.get(response.processor)
  if (processor != null) {
    processor.enqueueResponse(response)
  }
}

3. 总结

这里做一个总结:

  • 对于生产者和客户端:
    • 请求可同步,也可异步
    • 使用批量请求以提高吞吐
    • 网络架构上还是多路复用一套
  • 对于服务端:
    • 网络架构上也是多路复用的一套
    • 请求是交给后台线程执行,使用了请求队列(共享)进行缓冲;响应也使用类似的响应队列(每个处理器各一个)进行缓冲
    • KafkaApis是请求处理的统一入口