1. 概览
Kafka消费者主要涉及到的就是KafkaConsumer
类,其涉及到的主要是2个操作:
subscribe
:订阅topic,动态分配分区给消费者poll
:轮询,返回消息集
此外还有一些操作:
assign
:静态分配分区给消费者commitAsync/commitSync
:异步/同步提交偏移量到协调者
2. 客户端消费消息
2.1. 消费者订阅状态
KafkaConsumer
提供了2种消费模式:
- 订阅(
subscribe
):只指定订阅的topic,分区由协调者动态分配 - 分配(
assign
):除了topic外,还指定消费的分区,即分区静态分配
订阅的状态保存在SubscriptionState
中:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
public class SubscriptionState {
private enum SubscriptionType {
NONE, AUTO_TOPICS, AUTO_PATTERN, USER_ASSIGNED
}
private SubscriptionType subscriptionType; // 订阅类型
private Pattern subscribedPattern; // 订阅模式
private Set<String> subscription; // 本消费者消费的topic
private Set<String> groupSubscription; // 本消费者所在组消费的topic
private final PartitionStates<TopicPartitionState> assignment; // 当前分配的分区及其状态
private final OffsetResetStrategy defaultResetStrategy; // 偏移量重置策略
private ConsumerRebalanceListener rebalanceListener; // 触发重平衡的回调
private int assignmentId = 0; // 当前分配分区方案的ID
// ...
}
客户端采用订阅模式时,并不会先分配好分区:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
// In KafkaConsumer
public void subscribe(Collection<String> topics, ConsumerRebalanceListener listener) {
// ...
try {
// ...
if (topics == null)
throw new IllegalArgumentException("Topic collection to subscribe to cannot be null");
if (topics.isEmpty()) {
this.unsubscribe();
} else {
// ...
// 清除没有订阅的主题的数据
fetcher.clearBufferedDataForUnassignedTopics(topics);
// 添加订阅信息
if (this.subscriptions.subscribe(new HashSet<>(topics), listener))
metadata.requestUpdateForNewTopics(); // 设置成功则标记以启动元数据更新
}
} finally {
release();
}
}
// In SubscriptionState
public synchronized boolean subscribe(Set<String> topics, ConsumerRebalanceListener listener) {
registerRebalanceListener(listener); // 注册回调
setSubscriptionType(SubscriptionType.AUTO_TOPICS); // 设置订阅类型AUTO_TOPICS
return changeSubscription(topics); // 变更订阅
}
private boolean changeSubscription(Set<String> topicsToSubscribe) {
if (subscription.equals(topicsToSubscribe))
return false; // 若没更改则返回false,元数据不需要更新
subscription = topicsToSubscribe; // 直接设置订阅的topic
return true;
}
而分配到分区后(重平衡后),会调用下面的方法,添加分区和状态到assignment
:
1
2
3
4
5
6
7
8
9
10
11
12
13
public synchronized void assignFromSubscribed(Collection<TopicPartition> assignments) {
// ...
// assignment保存了分区TopicPartition和它的状态TopicPartitionState
Map<TopicPartition, TopicPartitionState> assignedPartitionStates = new HashMap<>(assignments.size());
for (TopicPartition tp : assignments) {
TopicPartitionState state = this.assignment.stateValue(tp);
if (state == null)
state = new TopicPartitionState();
assignedPartitionStates.put(tp, state);
}
assignmentId++;
this.assignment.set(assignedPartitionStates); // 直接设置新的assignment
}
而分配模式下,assignment
会被预先分配好。
a) 分区状态
分区状态即TopicPartitionState
,主要保存下面的信息:
1
2
3
4
5
6
7
8
9
10
private static class TopicPartitionState {
private FetchState fetchState; // 拉取状态
private FetchPosition position; // 拉取偏移量,包含偏移量值、epoch和Leader
private Long highWatermark; // 上一次拉取的HW(低于HW的消息Kafka Broker都已提交/备份)
// ...
private boolean paused; // 拉取是否暂停
private OffsetResetStrategy resetStrategy; // 偏移量重置策略
// ...
}
其中position
代表拉取的进度,它不能为空,否则消费者不能拉取这个分区。
b) 初始化分区状态
消费者初始化后,分区状态时没有设置的,轮询前需要设置,这最终会调用updateFetchPositions
方法,下面说明一下调用逻辑,代码不贴了:
- 首先判断分配到的分区是否有合法
position
,若有则返回,开始拉取消息 - 进入
refreshCommittedOffsetsIfNeeded
(ConsumerCoordinator
)方法,获取需要拉取偏移量的分区 - 进入
fetchCommittedOffsets
(ConsumerCoordinator
)方法,向协调者发送请求,并轮询,获得偏移量,并设置 - 全部设置完成后,开始拉取消息
c) 重置和更新拉取偏移量
偏移量的更新入口在KafkaConsumer#updateFetchPositions
中:
1
2
3
4
5
6
7
8
9
10
11
12
13
private boolean updateFetchPositions(final Timer timer) {
// 验证偏移量
fetcher.validateOffsetsIfNeeded();
// 判断是否所有已订阅分区都获取了拉取偏移量,若都有则直接返回
cachedSubscriptionHashAllFetchPositions = subscriptions.hasAllFetchPositions();
if (cachedSubscriptionHashAllFetchPositions) return true;
// 关键:向协调者发送请求,刷新拉取偏移量,从已提交的偏移量开始
if (coordinator != null && !coordinator.refreshCommittedOffsetsIfNeeded(timer)) return false;
// 重置偏移量(若剩余分区还需要一个偏移量)
subscriptions.resetMissingPositions();
fetcher.resetOffsetsIfNeeded();
return true;
}
这里关键在refreshCommittedOffsetsIfNeeded
方法,它:
- 会向本消费者组的协调者发送
OFFSET_FETCH
请求(具体在sendOffsetFetchRequest
方法),获取消费者在该分区已提交的偏移量 - 使用该偏移量设置分区状态
而重置偏移量在Fetcher
处,入口为resetOffsetsIfNeeded
:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public void resetOffsetsIfNeeded() {
// ...
// 获取需要重置偏移量的分区
Set<TopicPartition> partitions = subscriptions.partitionsNeedingReset(time.milliseconds());
if (partitions.isEmpty())
return;
final Map<TopicPartition, Long> offsetResetTimestamps = new HashMap<>();
for (final TopicPartition partition : partitions) {
// 根据重置策略,填入时间戳标记(最早、最新)
// 客户端会拉取Kafka Broker最早/最新的记录偏移量进行重置
Long timestamp = offsetResetStrategyTimestamp(partition);
if (timestamp != null)
offsetResetTimestamps.put(partition, timestamp);
}
// 异步重置偏移量
resetOffsetsAsync(offsetResetTimestamps);
}
这里关键在resetOffsetsAsync
方法,它:
-
向每个分区的主节点发送
LIST_OFFSET
请求(具体在sendListOffsetRequest
方法),获取最新/最早的记录偏移量 -
触发回调函数,调用
resetOffsetIfNeeded
方法,里面具体调用的是SubscriptionState#maybeSeekUnvalidated
方法,重置分区状态的偏移量
在启动/重分配时,会触发偏移量的拉取和重置,消费者从该偏移量开始进行消费:
- 首先向协调者发送
OFFSET_FETCH
请求,获取分区已提交的偏移量,设置为当前偏移量 - 若协调者没有存储提交的偏移量,则向分区主节点发送
LIST_OFFSETS
请求,根据策略获取主节点指定时间的(最新/最早)记录项的偏移量,重置为当前偏移量
2.2. 轮询前准备
准备工作有:
- 连接消费组的协调者节点
- 向协调者发送入组请求
- 从协调者中获得分配的分区
具体而言,消费者会调用ConsumerCoordinator#poll
以轮询协调者的事件(如保证协调者可用,入组,提交偏移量等等)。
关于入组:
- 入组在轮询中,客户端向协调者异步发送入组请求,响应通过轮询获取,入组关键的方法是
ensureActiveGroup
,保证消费者入组 - 入组操作说一个死循环,直到入组成功后才能跳出
- 入组响应中会包含协调者返回的分区分配信息
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
// In AbstractCoordinator
public void ensureActiveGroup() {
// 入组,不停重试,直到成功
while (!ensureActiveGroup(time.timer(Long.MAX_VALUE))) {
log.warn("still waiting to ensure active group");
}
}
boolean ensureActiveGroup(final Timer timer) {
// 保证协调者就绪
if (!ensureCoordinatorReady(timer)) {
return false;
}
// 启动心跳线程
startHeartbeatThreadIfNeeded();
// 必要时入组,返回消费者是否在组内
return joinGroupIfNeeded(timer);
}
boolean joinGroupIfNeeded(final Timer timer) {
while (rejoinNeededOrPending()) {
if (!ensureCoordinatorReady(timer)) {
return false;
}
if (needsJoinPrepare) {
// 准备加入(标记以防止重复执行)
needsJoinPrepare = false;
onJoinPrepare(generation.generationId, generation.memberId);
}
// 向协调者异步发送入组请求,返回的结果是分区分配信息
final RequestFuture<ByteBuffer> future = initiateJoinGroup();
client.poll(future, timer); // 注意,这里轮询会阻塞,等待入组结果到来(后面会说包括“加入组”+“同步组”),或者超时
if (!future.isDone()) {
// 超时,返回失败
return false;
}
if (future.succeeded()) {
// 响应成功
Generation generationSnapshot;
synchronized (AbstractCoordinator.this) {
generationSnapshot = this.generation;
}
if (generationSnapshot != Generation.NO_GENERATION) {
// 返回响应的不是NO_GENERATION
ByteBuffer memberAssignment = future.value().duplicate();
// 完成入组,响应中带有分配的分区
// 这步设置分区的分配信息
onJoinComplete(generationSnapshot.generationId, generationSnapshot.memberId, generationSnapshot.protocolName, memberAssignment);
resetJoinGroupFuture();
needsJoinPrepare = true;
} else {
// 若消息说NO_GENERATION,则响应被心跳线程接受了,入组失败,所以还需要重新入组
log.info("Generation data was cleared by heartbeat thread. Initiating rejoin.");
resetStateAndRejoin();
resetJoinGroupFuture();
return false;
}
} else {
// ...
// 出现错误,若能重复,则睡眠一段时间重试
timer.sleep(rebalanceConfig.retryBackoffMs);
}
}
return true;
}
关于协调者的查找和连接:
- 客户端会询问已知的某个节点,异步发送
FIND_COORDINATOR
请求,以得到本组的协调者,这通过ensureCoordinatorReady
方法里调用的lookupCoordinator
方法实现,具体代码不贴了 - 响应的回调里,具体在
FindCoordinatorResponseHandler#onSuccess
,会:- 从响应中抽出并设置协调者节点信息
- 调用
client#tryConnect
尝试连接
入组具体准备流程为(之后还会提到):
- 消费者订阅topic
- 消费者准备轮询
- 检查是否需要分配到分区
- 若需要,则发送
JOIN_GROUP
+SYNC_GROUP
,确保分区的分配
- 消费者开始拉取消息
2.3. 轮询流程
消费者轮询消息流程如下:
- 若初始化/重分区,执行准备工作,包括获取分区起始的拉取偏移量
- 轮询消息以拉取消息
- 异步发出拉取请求
- 轮询,当就绪时将请求异步发出去,并通过异步回调接收响应
- 获取拉取的消息,同时更新拉取偏移量,消息通过响应回调进行保存
对于轮询流程,上层核心代码在pollForFetches
,大致如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
private Map<TopicPartition, List<ConsumerRecord<K, V>>> pollForFetches(Timer timer) {
long pollTimeout = coordinator == null ? timer.remainingMs() :
Math.min(coordinator.timeToNextPoll(timer.currentTimeMs()), timer.remainingMs());
// 若已有接收到的消息,直接将其返回
// 有这一步是因为拉取的消息是通过异步回调保存的
final Map<TopicPartition, List<ConsumerRecord<K, V>>> records = fetcher.fetchedRecords();
if (!records.isEmpty()) {
return records;
}
// 向服务端发起FETCH请求,此时请求并没有发出
fetcher.sendFetches();
// ...
Timer pollTimer = time.timer(pollTimeout);
// 轮询,此时请求在写就绪后发出,同时响应通过Future的回调处理,得到的消息会被保存起来
client.poll(pollTimer, () -> {
return !fetcher.hasAvailableFetches();
});
timer.update(pollTimer.currentTimeMs());
// 返回已拉取的消息(拉取的消息通过响应回调保存),并更新拉取偏移量
return fetcher.fetchedRecords();
}
消费者上层的
poll(Duration)
内部是用一个循环,不停调用pollForFetches
,条件是“是否超过时限且没拉到消息”消费者可能会一直拉取不到消息,此时消费者会一直重试,直到超时,因此内部会用循环。因此外部调用
poll(Duration)
也必须使用循环,才能持续收到拉取的消息
响应回调:函数定义在
sendFetches
的一个匿名类中,回调函数大致如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 // 发起拉取请 RequestFuture<ClientResponse> future = client.send(fetchTarget, request); // 注册响应回调,将拉取的消息保存到队列 future.addListener(new RequestFutureListener<ClientResponse>() { @Override public void onSuccess(ClientResponse resp) { synchronized (Fetcher.this) { try { // 获取响应 FetchResponse<Records> response = (FetchResponse<Records>) resp.responseBody(); // ... Set<TopicPartition> partitions = new HashSet<>(response.responseData().keySet()); // ... // 遍历收到的消息 for (Map.Entry<TopicPartition, FetchResponse.PartitionData<Records>> entry : response.responseData().entrySet()) { TopicPartition partition = entry.getKey(); FetchRequest.PartitionData requestData = data.sessionPartitions().get(partition); // ... long fetchOffset = requestData.fetchOffset; FetchResponse.PartitionData<Records> partitionData = entry.getValue(); Iterator<? extends RecordBatch> batches = partitionData.records.batches().iterator(); short responseVersion = resp.requestHeader().apiVersion(); // 保存收到的消息到队列 // completedFetches是一个队列,保存已经拉取完成的消息 completedFetches.add(new CompletedFetch(partition, partitionData, metricAggregator, batches, fetchOffset, responseVersion)); } // ... } } // ... onFail });回调函数在客户端轮询(
poll
)时,完成响应接收后调用,因此回调函数调用的线程和外部消费者线程是同一个线程(具体到内部还是多路复用,这里简略说明,实现在Selector#attempRead
和Selector#attempWrite
中)
a) 流水线优化
此外,poll(Duration)
操作还使用了“流水线”的思想:在返回拉取的消息前,再发送一次拉取请求,从而在处理拉取的消息时,获取下一轮拉取的消息。这部分关键代码如下所示:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
private ConsumerRecords<K, V> poll(final Timer timer, final boolean includeMetadataInTimeout) {
acquireAndEnsureOpen();
try {
// ...
do {
// ....
// 轮询,获得已拉取的消息
final Map<TopicPartition, List<ConsumerRecord<K, V>>> records = pollForFetches(timer);
if (!records.isEmpty()) {
// 流水线:返回本轮拉取的消息前,把下一轮拉取的请求发出去
//
// 这里可能会把请求真正地发出去,而不仅仅是将请求入队
// 因为这里调研了客户端的poll,只是它超时时间是0
// 当写事件就绪时,请求会发出去,否则直接返回
// 因此它不会阻塞客户端线程
if (fetcher.sendFetches() > 0 || client.hasPendingRequests()) {
client.transmitSends();
}
return this.interceptors.onConsume(new ConsumerRecords<>(records));
}
} while (timer.notExpired());
return ConsumerRecords.empty();
} finally {
// ...
}
}
流水线的加入提高了消费者的性能,使得第二次拉取的请求,不需要等到第一次拉取的消息处理完之后(注意是“处理完”)再发送。
b) 线程模型
实际上,消费者线程只有1个线程,拉取器Fetcher
只负责将拉取请求发出,并通过异步回调到方式处理和保存拉取的消息,这个动作是立刻返回的。
之所以说是“轮询”,是因为客户端使用了多路复用,必须通过轮询(如select
, poll
, epoll
等)才能真正发出请求和接收响应。
与之非常类似的线程模型有Redis服务端aeEventLoop
。
c) 其他
首先,即时有流水线,消费者必须得到第一次拉取的响应后,才能发送第二个拉取请求,这在上面的poll
代码里可以得到,只有当获取的记录非空时,才能发送第二个拉取请求(且第二个请求轮询等待时间为0)。
其次,拉取记录大小有阈值,有下面:
- 消息大小阈值(
message.max.bytes
):服务器允许接收1条消息到最大字节数 - 分区拉取大小阈值(
max.partition.fetch.bytes
): 拉取消息时,每个分区的最大字节数 - 消息拉取大小阈值(
fetch.message.max.bytes
):消费者能够读取的最大消息大小
这里后两个必须比第一个大,否则某些分区永远无法消费(当消息大小介于两者之间时)。这些大小需要控制,过大或过小都会影响吞吐。
然后,轮询消息个数也有阈值(max.poll.records
),控制一次轮询最多获取多少条记录。
由于有这个限制,可能会有多余的消息被拉取下来:
多余的消息会在下面的轮询中返回给上层
下一次拉取时,不会对多余消息所在的分区拉取消息
1 2 3 4 5 6 7 8 9 10 11 12 // 获取可拉取消息的分区 private List<TopicPartition> fetchablePartitions() { Set<TopicPartition> exclude = new HashSet<>(); // 当多余拉取到的消息没被消费时,下一轮拉取时,对应分区不会被拉取 if (nextInLineFetch != null && !nextInLineFetch.isConsumed) { exclude.add(nextInLineFetch.partition); } for (CompletedFetch completedFetch : completedFetches) { exclude.add(completedFetch.partition); } return subscriptions.fetchablePartitions(tp -> !exclude.contains(tp)); // 过滤没消费完过多消息的分区 }
3. 心跳任务
KafkaConsumer
会执行心跳任务,这里的心跳是指“消费者”和“对应消费者组的协调者”的心跳。通过心跳,可以表明自己是存活的;若心跳丢失,协调者就认为消费者挂掉,从而进行失败处理,例如触发分区重平衡。
心跳任务定义在HeartbeatThread
中,这个任务会在一个单独的线程中执行。
3.1. 心跳状态
心跳状态记录了心跳任务的元数据,定义在HeartBeat
中:
1
2
3
4
5
6
7
8
9
10
11
12
public final class Heartbeat {
private final int maxPollIntervalMs; // 最大拉取信息时间间隔
private final GroupRebalanceConfig rebalanceConfig; // 重平衡配置
private final Time time; // 系统当前时间(实际上是个工具类)
private final Timer heartbeatTimer; // 心跳计时器
private final Timer sessionTimer; // 会话计时器
private final Timer pollTimer; // 拉取计时器
private volatile long lastHeartbeatSend = 0L; // 最近一次心跳请求发出的时间戳
private volatile boolean heartbeatInFlight = false; // 心跳是否正在传输
// ...
}
通过上面几个Timer
,可以很容易地得到现在是否需要心跳,以及下一次心跳还需要的时间。当心跳成功后,这些Timer
也会被更新:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
private void update(long now) {
// 更新3个计时器,推进计时器的时间
heartbeatTimer.update(now);
sessionTimer.update(now);
pollTimer.update(now);
}
void receiveHeartbeat() {
update(time.milliseconds()); // 收到心跳则更新时间戳
heartbeatInFlight = false; // 取消正在传输的标记
sessionTimer.reset(rebalanceConfig.sessionTimeoutMs); // 重置会话计时器
}
boolean shouldHeartbeat(long now) {
update(now); // 更新时间
return heartbeatTimer.isExpired(); // 过期即需要心跳
}
long timeToNextHeartbeat(long now) {
update(now); // 更新时间
return heartbeatTimer.remainingMs(); // 剩余的时间即下次心跳还需等待的时间
}
// 启动一次心跳
void sentHeartbeat(long now) {
lastHeartbeatSend = now; // 设置这次心跳时间戳
heartbeatInFlight = true; // 标记心跳正在传输
update(now); // 更新计时器
heartbeatTimer.reset(rebalanceConfig.heartbeatIntervalMs);
}
3.2. 发送心跳
心跳发送在HeartbeatThread#run
中,具体的代码段如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
heartbeat.sentHeartbeat(now);
// 这里发送心跳请求(HEARTBEAT)
final RequestFuture<Void> heartbeatFuture = sendHeartbeatRequest();
// 注册回调
heartbeatFuture.addListener(new RequestFutureListener<Void>() {
@Override
public void onSuccess(Void value) {
// 心跳成功,则更新心跳状态
synchronized (AbstractCoordinator.this) {
heartbeat.receiveHeartbeat();
}
}
@Override
public void onFailure(RuntimeException e) {
// 心跳失败,则分类
// 1. 若正在重平衡,则认为心跳成功
// 2. FenceException: 停止心跳
// 3. 其他:标记心跳失败,唤醒协调者规划下一次心跳
synchronized (AbstractCoordinator.this) {
if (e instanceof RebalanceInProgressException) {
heartbeat.receiveHeartbeat();
} else if (e instanceof FencedInstanceIdException) {
log.error("Caught fenced group.instance.id {} error in heartbeat thread", rebalanceConfig.groupInstanceId);
heartbeatThread.failed.set(e);
heartbeatThread.disable();
} else {
heartbeat.failHeartbeat();
AbstractCoordinator.this.notify();
}
}
}
});
而心跳返回时有响应的,即HeartbeatResponse
。响应会先被HeartbeatResponseHandler
处理,将HeartbeatResponse
转换成Void
,然后传入上面的回调回调函数中。具体代码不贴了。
1
2
3
4
5
6
7
8
9
10
11
synchronized RequestFuture<Void> sendHeartbeatRequest() {
HeartbeatRequest.Builder requestBuilder =
new HeartbeatRequest.Builder(new HeartbeatRequestData()
.setGroupId(rebalanceConfig.groupId)
.setMemberId(this.generation.memberId)
.setGroupInstanceId(this.rebalanceConfig.groupInstanceId.orElse(null))
.setGenerationId(this.generation.generationId));
return client.send(coordinator, requestBuilder)
.compose(new HeartbeatResponseHandler(generation)); // 先交给HeartbeatResponseHandler处理
// 处理时将结果传给Future<Void>,触发上面代码块的回调
}
3.3. 运行心跳任务
客户端加入消费组后,就需要启动心跳任务,具体如下代码所示:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
boolean ensureActiveGroup(final Timer timer) {
if (!ensureCoordinatorReady(timer)) {
return false;
}
startHeartbeatThreadIfNeeded(); // 入组前,创建心跳线程,但没被启动
return joinGroupIfNeeded(timer); // 入组后,心跳线程会被开启
}
private synchronized void startHeartbeatThreadIfNeeded() {
if (heartbeatThread == null) {
// 启动心跳线程
heartbeatThread = new HeartbeatThread();
heartbeatThread.start(); // 运行中,但没开启,会处于等待状态
// 入组后,线程会被开启进行心跳
}
}
而整个任务是通过一个死循环执行的,并通过调用AbstractCoordinator.this#notify/wait
来实现定时的心跳:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
@Override
public void run() {
try {
while (true) {
synchronized (AbstractCoordinator.this) {
if (closed)
return;
if (!enabled) {
// 开启了才能执行,否则等待
// 入组前,没有开启
// 等待入组后,会调用enable开启,enable中会调用AbstractCoordinator.this.notify
// 入组后的回调,可参考initiateJoinGroup方法
AbstractCoordinator.this.wait();
continue;
}
// ...
// 轮询,以拉取和发送心跳响应和请求
client.pollNoWakeup();
long now = time.milliseconds();
if (coordinatorUnknown()) {
if (findCoordinatorFuture != null || lookupCoordinator().failed())
// 协调者找不到,等待一段时间再重试
AbstractCoordinator.this.wait(rebalanceConfig.retryBackoffMs);
} else if (heartbeat.sessionTimeoutExpired(now)) {
// 会话丢失,标记协调者未知
markCoordinatorUnknown();
} else if (heartbeat.pollTimeoutExpired(now)) {
// 拉取心跳超时,则可能离开消费组
String leaveReason = "...";
maybeLeaveGroup(leaveReason);
} else if (!heartbeat.shouldHeartbeat(now)) {
// 若当前不需要心跳,则等待一段时间
AbstractCoordinator.this.wait(rebalanceConfig.retryBackoffMs);
} else {
// 发送心跳
heartbeat.sentHeartbeat(now);
final RequestFuture<Void> heartbeatFuture = sendHeartbeatRequest();
heartbeatFuture.addListener(new RequestFutureListener<Void>() {
// ... 见上面分析
});
}
}
}
}
// ...
}
3.4. 心跳与协调者的关系
心跳是为了确保:
- 协调者已知,消费者必须连接上协调者
- 消费者处于活动状态,消费者必须分配到分区
从上面可知,心跳是在入组后进行的。心跳可能会出现错误,这时候需要处理,分下面几类:
- 协调者挂掉:标记原协调者死亡,需要重新连接新协调者
- 协调者没挂:标记“重新入组”,此时需要向协调者发送“入组”请求
处理的代码如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
private class HeartbeatCompletionHandler extends CoordinatorResponseHandler<HeartbeatResponse, Void> {
@Override
public void handle(HeartbeatResponse heartbeatResponse, RequestFuture<Void> future) {
sensors.heartbeatLatency.record(response.requestLatencyMs());
Errors error = Errors.forCode(heartbeatResponse.errorCode());
if (error == Errors.NONE) {
// 没错误,直接完成即可
future.complete(null);
} else if (error == Errors.GROUP_COORDINATOR_NOT_AVAILABLE
|| error == Errors.NOT_COORDINATOR_FOR_GROUP) {
// 协调者挂了,标记,需要重连新协调者
coordinatorDead();
future.raise(error);
} else if (error == Errors.REBALANCE_IN_PROGRESS) {
// 正在重分配,节点没挂,则需要重新入组
AbstractCoordinator.this.rejoinNeeded = true;
future.raise(Errors.REBALANCE_IN_PROGRESS);
} else if (error == Errors.ILLEGAL_GENERATION) {
// generation不合法,节点没挂,则需要重新入组
AbstractCoordinator.this.rejoinNeeded = true;
future.raise(Errors.ILLEGAL_GENERATION);
} else if (error == Errors.UNKNOWN_MEMBER_ID) {
// member id不合法,节点没挂,则需要重新入组
memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID;
AbstractCoordinator.this.rejoinNeeded = true;
future.raise(Errors.UNKNOWN_MEMBER_ID);
} else if (error == Errors.GROUP_AUTHORIZATION_FAILED) {
future.raise(new GroupAuthorizationException(groupId)); // 验证错误
} else {
// 其他错误
future.raise(new KafkaException("Unexpected error in heartbeat response: " + error.message()));
}
}
}
4. 偏移量提交
KafkaConsumer
有下面几种偏移量提交方式:
commitAsync
:异步提交偏移量到协调者commitSync
:同步提交偏移量到协调者- 自动提交:提交是异步的
4.1. 自动提交任务
当enable.auto.commit=true
时,启用自动提交。自动提交可以同步也可以异步,这里只看异步提交。
通过调用链,可知消费者在对协调者进行轮训时时,会触发自动提交:
1
2
3
4
5
6
7
8
// In ConsumerCoordinator
public boolean poll(Timer timer) {
// ...
invokeCompletedOffsetCommitCallbacks(); // 下文会说,它会读取completedOffsetCommits队列(里面保存提交偏移量完成的响应),然后执行响应里指定的回调函数
// ...
maybeAutoCommitOffsetsAsync(timer.currentTimeMs()); // 轮询最后可能会触发自动提交
return true;
}
自动提交触发的时间间隔由auto.commit.interval.ms
参数提供,默认5s。该值会被传入一个计时器nextAutoCommitTimer
(位于ConsumerCoordinator
中),当计时器到期时,就会触发提交任务,并重置计时器:
1
2
3
4
5
6
7
8
9
10
11
// In ConsumerCoordinator
public void maybeAutoCommitOffsetsAsync(long now) {
if (autoCommitEnabled) { // 启动自动提交时
nextAutoCommitTimer.update(now); // 更新计时器
if (nextAutoCommitTimer.isExpired()) {
// 若计时器到期,说明需要提交,则重置计时器,并执行异步提交任务
nextAutoCommitTimer.reset(autoCommitIntervalMs);
doAutoCommitOffsetsAsync(); // 异步提交
}
}
}
自动提交任务代码如下,它首先获取每个分区已消费的最新分区偏移量,然后以该偏移量异步提交(异步提交偏移量就放到下一节):
1
2
3
4
5
6
7
8
private void doAutoCommitOffsetsAsync() {
// 获取已消费的分区的最新偏移量
Map<TopicPartition, OffsetAndMetadata> allConsumedOffsets = subscriptions.allConsumed();
// 执行异步提交,具体在下一节说明
commitOffsetsAsync(allConsumedOffsets, (offsets, exception) -> {
// 执行完成后到回调,这里主要是错误处理,略
});
}
4.2. 异步提交偏移量
异步提交偏移量外部接口时commitAsync
,内部最终会调用coordinator#commitOffsetAsync
方法:
该方法自动提交偏移量时也会调用,只不过参数中的偏移量由客户端自己获取
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public void commitOffsetsAsync(final Map<TopicPartition, OffsetAndMetadata> offsets, final OffsetCommitCallback callback) {
invokeCompletedOffsetCommitCallbacks();
if (!coordinatorUnknown()) {
// 若协调者已知,则执行异步提交
doCommitOffsetsAsync(offsets, callback);
} else {
// 否则先查找协调者,找到协调者后,回调立刻异步提交并进行轮询(将请求发出)
pendingAsyncCommits.incrementAndGet();
lookupCoordinator().addListener(new RequestFutureListener<Void>() {
@Override
public void onSuccess(Void value) {
// 找到后,立刻执行异步提交
pendingAsyncCommits.decrementAndGet();
doCommitOffsetsAsync(offsets, callback);
// 轮询,把请求发出去
client.pollNoWakeup();
}
@Override
public void onFailure(RuntimeException e) {
// 查找协调者失败,将提交错误结果追加到completedOffsetCommits队列,在轮询时统一执行回调(参数的callback)
pendingAsyncCommits.decrementAndGet();
completedOffsetCommits.add(new OffsetCommitCompletion(callback, offsets,
new RetriableCommitFailedException(e)));
}
});
}
// 轮询,将请求发出,这里的轮询不会造成进行阻塞
client.pollNoWakeup();
}
上面方法的核心在doCommitOffsetCommit
:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
// 执行异步提交偏移量
private void doCommitOffsetsAsync(final Map<TopicPartition, OffsetAndMetadata> offsets, final OffsetCommitCallback callback) {
// 发送OFFSET_COMMIT请求(需要poll才能将请求发出去)
RequestFuture<Void> future = sendOffsetCommitRequest(offsets);
// 回调函数,若为空则为默认回调,默认什么都不处理
final OffsetCommitCallback cb = callback == null ? defaultOffsetCommitCallback : callback;
// 注册响应回调
future.addListener(new RequestFutureListener<Void>() {
@Override
public void onSuccess(Void value) {
// 提交成功,将成功结果添加到completedOffsetCommits队列,在轮询时统一执行回调(参数的callback)
if (interceptors != null)
interceptors.onCommit(offsets);
completedOffsetCommits.add(new OffsetCommitCompletion(cb, offsets, null));
}
@Override
public void onFailure(RuntimeException e) {
// 提交失败,将失败结果连同异常添加到completedOffsetCommits队列,在轮询时统一执行回调(参数的callback)
Exception commitException = e;
if (e instanceof RetriableException) {
commitException = new RetriableCommitFailedException(e);
}
completedOffsetCommits.add(new OffsetCommitCompletion(cb, offsets, commitException));
if (commitException instanceof FencedInstanceIdException) {
asyncCommitFenced.set(true);
}
}
});
而处理提交偏移量响应的地方位于OffsetCommitResponseHandler
,处理的代码如下:
和其他请求发送类似,该回调会通过
send(req).compose(handler)
添加进去,当响应就绪时,会执行handler
的回调;handler
里面有参数future
,当回调触发后,最终会让future
就绪,从而触发future
里的回调。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
@Override
public void handle(OffsetCommitResponse commitResponse, RequestFuture<Void> future) {
sensors.commitSensor.record(response.requestLatencyMs());
Set<String> unauthorizedTopics = new HashSet<>();
// 遍历每个分区的提交响应
for (OffsetCommitResponseData.OffsetCommitResponseTopic topic : commitResponse.data().topics()) {
for (OffsetCommitResponseData.OffsetCommitResponsePartition partition : topic.partitions()) {
TopicPartition tp = new TopicPartition(topic.name(), partition.partitionIndex());
OffsetAndMetadata offsetAndMetadata = this.offsets.get(tp);
long offset = offsetAndMetadata.offset();
// 根据返回码生成异常
Errors error = Errors.forCode(partition.errorCode());
if (error == Errors.NONE) {
// 分区提交成功
log.debug("Committed offset {} for partition {}", offset, tp);
} else {
// 分区提交失败
// 根据不同的错误处理,最后都会调用future.raise(error)
// ...
}
}
}
if (!unauthorizedTopics.isEmpty()) {
log.error("Not authorized to commit to topics {}", unauthorizedTopics);
future.raise(new TopicAuthorizationException(unauthorizedTopics));
} else {
// 全部成功,让future complete
future.complete(null);
}
}
4.3. 同步提交偏移量
异步提交偏移量外部接口时commitSync
,内部最终会调用coordinator#commitOffsetsSync
方法。
这里相对比较简单,和异步提交类似,不过这里使用了带阻塞的poll
,调用完成后返回的Future
通常就会有结果,若没结果,说明必定超时,方法会返回false
。
同步提交,底层调用的是
select
,它会阻塞;异步提交,底层调用的是
selectNow
,它不会阻塞,立即返回。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public boolean commitOffsetsSync(Map<TopicPartition, OffsetAndMetadata> offsets, Timer timer) {
invokeCompletedOffsetCommitCallbacks();
if (offsets.isEmpty())
return true;
do {
// 若协调者没准备好
if (coordinatorUnknown() && !ensureCoordinatorReady(timer)) {
return false;
}
RequestFuture<Void> future = sendOffsetCommitRequest(offsets); // 发送OFFSET_COMMIT请求
client.poll(future, timer); // 这里产生阻塞,真正地将请求发出,并接收响应,完成后才会继续下去
// 这一步返回后,Future通常会有结果
invokeCompletedOffsetCommitCallbacks(); // 首先调用这一步,执行之前提交偏移量的响应回调
// 若在这段时间内,响应成功,则返回true
if (future.succeeded()) {
if (interceptors != null)
interceptors.onCommit(offsets);
return true;
}
// 若在这段时间内,响应失败且不可重试,则抛出异常
if (future.failed() && !future.isRetriable())
throw future.exception();
// 若这段时间内,响应失败但可重试,或没响应(不过一般都跳出循环直接超时了),则尝试重试
timer.sleep(rebalanceConfig.retryBackoffMs);
} while (timer.notExpired());
// 超时,则返回false
return false;
}
同步提交偏移量在内部的调用地方在2个地方:
- 准备入组时:消费者重新入组前,需要先提交一次偏移量,这里的偏移量是最新的拉取偏移量,这里必须同步(见
onJoinPrepare
) - 关闭消费者:关闭消费者前,也需要先提交一次偏移量,这里的偏移量也是最新的拉取偏移量,同样这里也必须同步(见
close
)
4.4. 消费者消息处理语义
语义通常有:至多一次、至少一次、有且仅有一次。
a) 至多一次
至多一次允许数据的丢失。
因此,消费者需要先提交偏移量,然后处理消息,因此:
- 打开自动提交
- 设置较短的自动提交时间间隔
b) 至少一次
至少一次不允许数据丢失,但可以重复。
因此,消费者需要先处理消息,再提交偏移量,因此:
- 关闭自动提交
- 处理完消息消息后,调用
commitSync
同步提交
c) 有且仅有一次
方案有2种:
- 引入两阶段提交协议
- 同时更新消息偏移量和处理结果(即提交放入一个事务里),例如将其写入数据库
以第2种方案为例:
- 关闭自动提交
- 实现一个偏移量管理器
OffsetManager
,将分区偏移量和处理结果的更新以事务的方式提交(例如提交给数据库) - 实现并注册一个分区重平衡监听器
ConsumerRebalanceListener
,触发重平衡时,从OffsetManager
中恢复出已提交的分区偏移量(通过调用KafkaConsumer#seek
实现)
而对于生产者,也有2种方案:
- 读取分区最后一条消息,决定是否重传
- 给消息附上唯一ID,下游进行去重
- 使用“事务”