1. Kafka生产者客户端
Kafka生产者客户端主要以KafkaProducer
类。
1.1. 生产者消息发送及发送前准备
消息发送主要调用的是KafkaProducer#send
方法发送消息,消息发送支持同步和异步,因为它返回Future
:
- 同步:立刻调用
Future#get
- 异步:直接返回,发送完成后会执行回调函数
发送消息时,会做下面的事情进行准备:
-
选择分区:
某个topic某个分区的信息记录在
PartitionInfo
中:1 2 3 4 5 6 7 8 9
public class PartitionInfo { private final String topic; // topic名 private final int partition; // 分区号 private final Node leader; // 该分区主副本节点 private final Node[] replicas; // 该分区的所有副本节点 private final Node[] inSyncReplicas; // 该分区正在同步的节点 private final Node[] offlineReplicas; // 该分区离线的节点 // ... }
生产者会指定topic的某一个分区,将消息生产出去。分区算法接口是
Partitioner#partition
方法,默认实现位于DefaultPartitioner
类,其算法很简单,源码就不贴了:- 若消息没指定
key
:round-robin - 若消息指定
key
:对key
进行散列并取余
- 若消息没指定
-
累计消息并批量发送
为提高吞吐,Kafka生产者使用批量消息发送,这涉及批量缓存的
RecordAccumulator
类,该类会为每个topic的每个分区维护一个双端队列(即Deque<ProducerBatch>
),用于缓存消息以批量发送。当消息到来时:
- 若队列中不存在
ProducerBatch
,则创建新的该实例,并追加消息 - 若队列中存在
ProducerBatch
,则找队列中最后一个实例,尝试追加消息- 若成功,则返回结果
- 若不成功,则创建新的该实例,并追加消息
- 追加完毕后,会返回一个
RecordAppendResult
,当队列长度大于1或者队列最后一个批次满了时,唤醒客户端将队列头的消息批次发出
- 若队列中不存在
1.2. 消息发送线程
批次满了后,会调用Sender#wakeup
。Sender
实际上时一个Runnable
,被另一个线程执行。
相关源码如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
public void run() {
while (running) {
try {
runOnce();
} catch (Exception e) {
log.error("Uncaught error in kafka producer I/O thread: ", e);
}
}
// ...
}
void runOnce() {
// ...
long currentTimeMs = time.milliseconds();
long pollTimeout = sendProducerData(currentTimeMs); // 获取批次并将其转换成请求,填入客户端发送队列
client.poll(pollTimeout, currentTimeMs); // 执行网络消息的发送和接收
}
private long sendProducerData(long now) {
Cluster cluster = metadata.fetch();
RecordAccumulator.ReadyCheckResult result = this.accumulator.ready(cluster, now);
// ...
// 读取缓存数据,得到请求列表(分区-请求列表)
Map<Integer, List<ProducerBatch>> batches = this.accumulator.drain(cluster, result.readyNodes, this.maxRequestSize, now);
// ...
// ...
// 计算selector poll超时时间
long pollTimeout = Math.min(result.nextReadyCheckDelayMs, notReadyTimeout);
pollTimeout = Math.min(pollTimeout, this.accumulator.nextExpiryTimeMs() - now);
pollTimeout = Math.max(pollTimeout, 0);
if (!result.readyNodes.isEmpty()) {
log.trace("Nodes with data ready to send: {}", result.readyNodes);
pollTimeout = 0; // 一般是0
}
// 异步发送消息(仅将请求放入队列中)
sendProduceRequests(batches, now);
return pollTimeout;
}
总体而言,生产者发送消息的流程如下:
- 主线程将消息缓存起来用于批量发送,若批次满,会激活客户端以发送请求
- 发送线程通过
KafkaClient#ready
方法找到已经准备好的服务端节点,并调用KafkaClient#connect
建立连接 - 发送线程通过
RecordAccumulator#drain
获取整理好的批次记录,用于发送 - 发送线程将批次记录转换成客户端请求,并将其发送给服务端
封装请求的方法在
Sender#sendProduceRequest
方法,里面关键的是ProduceRequest.Builder
构建请求,具体源码不贴了。
1.3. 处理网络连接
上面所述使用KafkaClient
进行ready, connect, send, poll
,而这部分的实现是NetworkClient
:
ready
:从RecordAccumulator
获取准备好的节点,并建立连接send
:创建请求并将其暂存到通道中poll
:对Selector
进行poll
,进行真正的读写操作(发送请求,读取响应)
其底层使用的就是NIO,使用的是多路复用的模型,底层涉及到Selector
、KafkaChannel
等对底层的封装。这部分就不再多叙述了。
2. 服务端网络处理
Kafka服务端也是使用了多路复用的技术,这部分可以参考Netty, Redis等中间件,原理就不多叙述了。
Kafka的线程模型十分相近的是Netty Server,关于Netty线程模型,可参考这里。
这里摘取Kafka Confluence上的服务端内部架构图,注意网络层和API层:
2.1. 请求与响应流程
这里对应的代码截取如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
// In Processor
override def run(): Unit = {
startupComplete()
try {
while (isRunning) {
try {
// 1. 建立和配置新连接
configureNewConnections()
// 2. 检查响应队列是否有响应要写出,若有则注册OP_WRITE,并将响应暂存到通道里
processNewResponses()
// 3. 执行读写操作(读取请求,写出响应)
poll()
// 4. 若读取完毕,移除OP_READ
processCompletedReceives()
// 5. 若写出完毕,移除OP_WRITE,注册OP_READ
processCompletedSends()
// 6. 处理断连和过多的连接
processDisconnected()
closeExcessConnections()
}
// ...
}
}
}
2.2. 请求通道的请求队列和响应队列
服务端读取完请求后:
- 将请求塞入一个请求通道里(
RequestChannel
),即请求队列,内部实际上就是一个阻塞队列(ArrayBlockingQueue
) - 然后请求处理线程(
KafkaRequestHandler
)和KafkaApis
就可以获取请求,并处理请求 - 响应会塞入响应队列里(
LinkedBlockingDeque
),等待OP_WRITE
就绪后写回客户端
而对于队列架构上:
-
每个处理器的
Selector
都有自己的响应队列,但是共享1个请求队列 -
此外,某个请求被某个处理器读取并处理,其对应的响应也是由相同的处理器返回的。
2.3. 请求处理
服务端会为请求处理创建一个线程池KafkaRequestHandlerPool
,里面执行的就是KafkaRequestHandler
任务。这些线程会共用一个请求队列,不断轮询拉取请求,然后交给KafkaApis
处理。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
class KafkaRequestHandlerPool(val brokerId: Int,
val requestChannel: RequestChannel,
val apis: KafkaApis,
time: Time,
numThreads: Int,
requestHandlerAvgIdleMetricName: String,
logAndThreadNamePrefix : String) extends Logging with KafkaMetricsGroup {
// ...
private val threadPoolSize: AtomicInteger = new AtomicInteger(numThreads)
// ...
val runnables = new mutable.ArrayBuffer[KafkaRequestHandler](numThreads)
for (i <- 0 until numThreads) {
createHandler(i) // 创建KafkaRequestHandler任务线程
}
def createHandler(id: Int): Unit = synchronized {
// 创建KafkaRequestHandler任务线程
runnables += new KafkaRequestHandler(id, brokerId, aggregateIdleMeter, threadPoolSize, requestChannel, apis, time)
// 启动线程,线程是守护线程
KafkaThread.daemon(logAndThreadNamePrefix + "-kafka-request-handler-" + id, runnables(id)).start()
}
// ...
}
class KafkaRequestHandler(id: Int,
brokerId: Int,
val aggregateIdleMeter: Meter,
val totalHandlerThreads: AtomicInteger,
val requestChannel: RequestChannel,
apis: KafkaApis,
time: Time) extends Runnable with Logging {
// ...
def run(): Unit = {
while (!stopped) {
// ...
val req = requestChannel.receiveRequest(300) // 拉取请求,最多等待0.3s
// ...
req match {
case RequestChannel.ShutdownRequest =>
// 关机请求,则执行关闭任务
debug(s"Kafka request handler $id on broker $brokerId received shut down command")
shutdownComplete.countDown()
return
case request: RequestChannel.Request =>
// 普通请求,则交给KafkaApis处理
try {
// ...
// 交给KafkaApis处理
apis.handle(request)
} catch {
// ...
} finally {
// ...
}
case null => // continue
}
}
// ...
}
// ...
}
从上面可知,请求处理的入口就在KafkaApis#handle
方法,它是请求处理的统一入口:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
class KafkaApis(val requestChannel: RequestChannel,
val replicaManager: ReplicaManager,
val adminManager: AdminManager,
val groupCoordinator: GroupCoordinator,
val txnCoordinator: TransactionCoordinator,
val controller: KafkaController,
val zkClient: KafkaZkClient,
val brokerId: Int,
val config: KafkaConfig,
val metadataCache: MetadataCache,
val metrics: Metrics,
val authorizer: Option[Authorizer],
val quotas: QuotaManagers,
val fetchManager: FetchManager,
brokerTopicStats: BrokerTopicStats,
val clusterId: String,
time: Time,
val tokenManager: DelegationTokenManager) extends Logging {
// ...
// 入口函数
def handle(request: RequestChannel.Request): Unit = {
try {
// ...
// 根据不同的请求类型进行处理
request.header.apiKey match {
case ApiKeys.PRODUCE => handleProduceRequest(request)
case ApiKeys.FETCH => handleFetchRequest(request)
// 其他不同类型,略...
}
} catch {
// ...
} finally {
// ...
}
}
处理请求完成后,响应也会被创建,并调用KafkaApis#sendResponse
方法,将响应推入到对应处理器的响应队列中:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
// In KafkaApis
private def sendResponse(request: RequestChannel.Request,
responseOpt: Option[AbstractResponse],
onComplete: Option[Send => Unit]): Unit = {
// ...
// 创建响应
val response = responseOpt match {
case Some(response) =>
val responseSend = request.context.buildResponse(response)
val responseString =
if (RequestChannel.isRequestLoggingEnabled) Some(response.toString(request.context.apiVersion))
else None
new RequestChannel.SendResponse(request, responseSend, responseString, onComplete)
case None =>
new RequestChannel.NoOpResponse(request)
}
// 将响应压入响应队列
sendResponse(response)
}
private def sendResponse(response: RequestChannel.Response): Unit = {
// 将响应压入响应队列
requestChannel.sendResponse(response)
}
// In RequestChannel
def sendResponse(response: RequestChannel.Response): Unit = {
// ...
// 寻找请求和响应一样的处理器,将响应压入对应的响应队列里
val processor = processors.get(response.processor)
if (processor != null) {
processor.enqueueResponse(response)
}
}
3. 总结
这里做一个总结:
- 对于生产者和客户端:
- 请求可同步,也可异步
- 使用批量请求以提高吞吐
- 网络架构上还是多路复用一套
- 对于服务端:
- 网络架构上也是多路复用的一套
- 请求是交给后台线程执行,使用了请求队列(共享)进行缓冲;响应也使用类似的响应队列(每个处理器各一个)进行缓冲
KafkaApis
是请求处理的统一入口