1. 概览
之前提及过,消费者拉取消息,需要和协调者进行交互,包括:
-
发现并连接协调者
-
申请入组,从协调者中获取分配到的分区
-
和协调者心跳,协调者判断是否需要触发分区重平衡
-
向协调者提交消费的分区偏移量
本文主要分析:
- 协调者如何处理消费者入组
- 心跳监控和对应处理
2. 消费者入组
消费者入组的入口在ensureActiveGroup
方法,它保证消费者连接到协调者并分配到分区。
2.1. 发现协调者
消费者入组,必须先发现协调者,入口方法为ensureCoordinatorReady
,最后调用方法lookupCoordinator
。
消费者会:
- 发送
FIND_COORDINATOR
请求给某个服务端节点(选择的节点,正在传输的请求个数最少) - 收到请求后,设置协调者字段,并立刻尝试和协调者连接
这部分代码如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
protected synchronized RequestFuture<Void> lookupCoordinator() {
if (findCoordinatorFuture == null) {
// 选择一个Broker节点
Node node = this.client.leastLoadedNode();
if (node == null) {
return RequestFuture.noBrokersAvailable();
} else {
// 发起FIND_COORDINATOR请求
findCoordinatorFuture = sendFindCoordinatorRequest(node);
findCoordinatorFuture.addListener(new RequestFutureListener<Void>() {
@Override
public void onSuccess(Void value) {}
@Override
public void onFailure(RuntimeException e) {
findCoordinatorException = e;
}
});
}
}
return findCoordinatorFuture;
}
private RequestFuture<Void> sendFindCoordinatorRequest(Node node) {
// 向某个节点发送FIND_COORDINATOR请求
FindCoordinatorRequest.Builder requestBuilder =
new FindCoordinatorRequest.Builder(
new FindCoordinatorRequestData()
.setKeyType(CoordinatorType.GROUP.id())
.setKey(this.rebalanceConfig.groupId));
return client.send(node, requestBuilder)
.compose(new FindCoordinatorResponseHandler());
}
private class FindCoordinatorResponseHandler extends RequestFutureAdapter<ClientResponse, Void> {
@Override
public void onSuccess(ClientResponse resp, RequestFuture<Void> future) {
// 处理FIND_COORDINATOR响应
clearFindCoordinatorFuture();
FindCoordinatorResponse findCoordinatorResponse = (FindCoordinatorResponse) resp.responseBody();
Errors error = findCoordinatorResponse.error();
if (error == Errors.NONE) {
// 响应成功
synchronized (AbstractCoordinator.this) {
// 设置协调者字段
int coordinatorConnectionId = Integer.MAX_VALUE - findCoordinatorResponse.data().nodeId();
AbstractCoordinator.this.coordinator = new Node(
coordinatorConnectionId,
findCoordinatorResponse.data().host(),
findCoordinatorResponse.data().port());
// 立刻尝试连接协调者
client.tryConnect(coordinator);
// 重置心跳
heartbeat.resetSessionTimeout();
}
future.complete(null);
} else if (error == Errors.GROUP_AUTHORIZATION_FAILED) {
future.raise(GroupAuthorizationException.forGroupId(rebalanceConfig.groupId));
} else {
future.raise(error);
}
}
@Override
public void onFailure(RuntimeException e, RequestFuture<Void> future) {
clearFindCoordinatorFuture();
super.onFailure(e, future);
}
}
2.2. 加入组和同步组
a) 入组大致步骤
入组大体步骤如下:
- 消费者发送“加入组”请求,捎带订阅信息给协调者
- 协调者收集所有消费者的请求
- 协调者选举从消费者中选举出“主消费者”,将订阅信息返回给它,其他消息也会收到响应,但并不是分区分配结果
- “主消费者”是第一个发送“加入组”请求的消费者
- 消费者收到响应后,会发送“同步组”请求给协调者
- “主消费者”先执行分区分配,将结果塞入“同步组”请求,返回给协调者
- 其他消费者会立刻发送“同步组”请求
- 协调者接收“主消费者”的分区分配结果,然后再处理“同步组”请求,将分配结果返回给消费者
- 消费者接收到分配结果后,设置分区分配状态,入组成功
b) 加入组
消费者发现协调者后,需要发起加入组请求,入口方法为joinGroupIfNeeded
,最后调用initiateJoinGroup
,里面关键的方法是sendJoinGroupRequest
。
实际上,
sendJoinGroupRequest
返回是“加入组”+“同步组”后的结果,内容是分区分配信息。它实际上使用了
chain
,保证了先“加入组”,后“同步组”。
消费者会:
-
向协调者发送
JOIN_GROUP
请求,请求包含了消费者、消费组以及订阅的信息 -
处理响应,根据消费者是否是Leader分情况处理(见a)小节)
这部分在前文中部分说明过,这里我们着重看之前没说明过的第2步。
“加入组”响应处理在JoinGroupResponseHandler
中,根据响应判断本消费者是否是Leader,然后分情况处理(见a)小节)。
代码如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
private class JoinGroupResponseHandler extends CoordinatorResponseHandler<JoinGroupResponse, ByteBuffer> {
@Override
public void handle(JoinGroupResponse joinResponse, RequestFuture<ByteBuffer> future) {
// 处理“加入组”(JOIN_GROUP)响应
Errors error = joinResponse.error();
if (error == Errors.NONE) {
if (isProtocolTypeInconsistent(joinResponse.data().protocolType())) {
// 协议错误,触发异常
future.raise(Errors.INCONSISTENT_GROUP_PROTOCOL);
} else {
// ...
synchronized (AbstractCoordinator.this) {
if (state != MemberState.REBALANCING) {
// 实质上消费者已离开,触发异常
future.raise(new UnjoinedGroupException());
} else {
AbstractCoordinator.this.generation = new Generation(
joinResponse.data().generationId(),
joinResponse.data().memberId(), joinResponse.data().protocolName());
if (joinResponse.isLeader()) {
// 响应标记为Leader,说明本消费者是Leader
// 先执行分区分配,然后将结果塞入“同步组”请求发送给协调者
onJoinLeader(joinResponse).chain(future); // 这里用chain,将结果处理转到调用链下游
} else {
// 本消费者是Follower
// 直接发送“同步组”请求
onJoinFollower().chain(future);
}
}
}
}
} else if // ... 其他错误处理
}
}
c) 同步组
易得知,“同步组”发生在接受到“加入组”响应之后。
对于Follower,直接发送”同步组”(SYNC_GROUP
)请求:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
private RequestFuture<ByteBuffer> onJoinFollower() {
// 直接发送SYNC_GROUP请求
SyncGroupRequest.Builder requestBuilder =
new SyncGroupRequest.Builder(
new SyncGroupRequestData()
.setGroupId(rebalanceConfig.groupId)
.setMemberId(generation.memberId)
.setProtocolType(protocolType())
.setProtocolName(generation.protocolName)
.setGroupInstanceId(this.rebalanceConfig.groupInstanceId.orElse(null))
.setGenerationId(generation.generationId)
.setAssignments(Collections.emptyList()) // Follower请求总不捎带分区分配结果
);
return sendSyncGroupRequest(requestBuilder);
}
而对于Leader,先执行分区分配算法,计算出分配结果,然后再发送“同步组”请求,请求捎带分区分配结果:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
private RequestFuture<ByteBuffer> onJoinLeader(JoinGroupResponse joinResponse) {
try {
// 1. 执行分区分配算法
Map<String, ByteBuffer> groupAssignment = performAssignment(joinResponse.data().leader(), joinResponse.data().protocolName(),
joinResponse.data().members());
List<SyncGroupRequestData.SyncGroupRequestAssignment> groupAssignmentList = new ArrayList<>();
for (Map.Entry<String, ByteBuffer> assignment : groupAssignment.entrySet()) {
groupAssignmentList.add(new SyncGroupRequestData.SyncGroupRequestAssignment()
.setMemberId(assignment.getKey())
.setAssignment(Utils.toArray(assignment.getValue()))
);
}
// 2. 发送SYNC_GROUP请求,捎带分区分配结果
SyncGroupRequest.Builder requestBuilder =
new SyncGroupRequest.Builder(
new SyncGroupRequestData()
.setGroupId(rebalanceConfig.groupId)
.setMemberId(generation.memberId)
.setProtocolType(protocolType())
.setProtocolName(generation.protocolName)
.setGroupInstanceId(this.rebalanceConfig.groupInstanceId.orElse(null))
.setGenerationId(generation.generationId)
.setAssignments(groupAssignmentList) // 捎带分区分配结果
);
return sendSyncGroupRequest(requestBuilder);
} catch (RuntimeException e) {
return RequestFuture.failure(e);
}
}
“同步组”请求的响应,包含了分区分配的信息。这部分主要由SyncGroupResponseHandler
处理,它将分区分配的数据传给了对应的Future
,外部从而可以获取分区分配的信息:
见
ensureActiveGroup
方法,以及前文2.2.节,外部是通过同步阻塞的方式得到这个Future
结果,即入组(“加入组”+“同步组”)结果,执行下面的步骤时,入组一定已经完成了。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
private class SyncGroupResponseHandler extends CoordinatorResponseHandler<SyncGroupResponse, ByteBuffer> {
@Override
public void handle(SyncGroupResponse syncResponse,
RequestFuture<ByteBuffer> future) {
// 处理“同步组”(SYNC_GROUP)请求
Errors error = syncResponse.error();
if (error == Errors.NONE) {
if (isProtocolTypeInconsistent(syncResponse.data.protocolType())) {
future.raise(Errors.INCONSISTENT_GROUP_PROTOCOL);
} else if (isProtocolNameInconsistent(syncResponse.data.protocolName())) {
future.raise(Errors.INCONSISTENT_GROUP_PROTOCOL);
} else {
// ...
// 读取到响应的分区分配信息,传给Future,外部可以调用
future.complete(ByteBuffer.wrap(syncResponse.data.assignment()));
}
} else {
// 错误,触发rejoin
requestRejoin();
// 传递错误到Future
}
}
}
d) 入组总结
入组流程总结为:
- 多个消费者向协调者发送
JOIN_GROUP
- 协调者选出第一个消费者,作为Leader
- 收集到所有消费者的请求后,返回
JOIN_GROUP
响应- 对于Leader:包含组成员消息
- 消费者返回
SYNC_GROUP
请求- 对于Leader:捎带分区分配结果,Leader执行分区分配运算
- 协调者收到Leader的
SYNC_GROUP
请求,返回分区分配信息(SYNC_GROUP
响应)给所有消费者
2.3. 分区分配
分区分配由主消费者执行,方法位于performAssignment
,而具体的分区算法接口是ConsumerPartitionAssignor
,它有下面几个实现,默认是RangeAssignor
:
这些算法具体就不说明了,可以参考: https://blog.csdn.net/bingshiwuyu/article/details/106763422?utm_medium=distribute.pc_relevant.none-task-blog-baidujs-2
2.4. 其他
a) 入组前的准备
入组前,除了需要询问知道协调者之外,还需要一些准备,包括:
- 同步提交偏移量,以保存消费进度
- 收回分配的分区,并触发分区收回的回调
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
protected void onJoinPrepare(int generation, String memberId) {
// 同步提交分区,保存消费进度,若开启了自动提交,则自动提交会先关闭
maybeAutoCommitOffsetsSync(time.timer(rebalanceConfig.rebalanceTimeoutMs));
Exception exception = null;
final Set<TopicPartition> revokedPartitions;
// 收回分区并触发回调
if (generation == Generation.NO_GENERATION.generationId &&
memberId.equals(Generation.NO_GENERATION.memberId)) {
revokedPartitions = new HashSet<>(subscriptions.assignedPartitions());
if (!revokedPartitions.isEmpty()) {
exception = invokePartitionsLost(revokedPartitions); // 触发回调
subscriptions.assignFromSubscribed(Collections.emptySet()); // 收回分区
}
} else {
switch (protocol) {
case EAGER: // EAGER: 先收回所有分区
revokedPartitions = new HashSet<>(subscriptions.assignedPartitions());
exception = invokePartitionsRevoked(revokedPartitions); // 触发回调
subscriptions.assignFromSubscribed(Collections.emptySet()); // 收回分区
break;
case COOPERATIVE: // COOPERATIVE: 先收回不再可能分配到的分区
Set<TopicPartition> ownedPartitions = new HashSet<>(subscriptions.assignedPartitions());
revokedPartitions = ownedPartitions.stream()
.filter(tp -> !subscriptions.subscription().contains(tp.topic()))
.collect(Collectors.toSet());
if (!revokedPartitions.isEmpty()) {
exception = invokePartitionsRevoked(revokedPartitions); //触发回调
ownedPartitions.removeAll(revokedPartitions);
subscriptions.assignFromSubscribed(ownedPartitions); // 收回分区
}
break;
}
}
isLeader = false; // 重置Leader状态
subscriptions.resetGroupSubscription(); // 重置消费者订阅状态
if (exception != null) {
throw new KafkaException("User rebalance callback throws an error", exception);
}
}
b) 入组后处理
入组后,消费者获得到分配给它的分区,会做下面一些事情:
- 反序列化分区分配结果
- 更新订阅信息和分区分配信息
- 更新Assignor内部状态
- 打开自动提交(若配置开启)
- 为分配的分区触发回调
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
protected void onJoinComplete(int generation,
String memberId,
String assignmentStrategy,
ByteBuffer assignmentBuffer) {
// ...
ConsumerPartitionAssignor assignor = lookupAssignor(assignmentStrategy);
// ...
// 反序列化分区分配结果
Assignment assignment = ConsumerProtocol.deserializeAssignment(assignmentBuffer);
Set<TopicPartition> assignedPartitions = new HashSet<>(assignment.partitions());
if (!subscriptions.checkAssignmentMatchedSubscription(assignedPartitions)) {
// 若分配结果和本地订阅不匹配,则需要重新入组
requestRejoin();
return;
}
final AtomicReference<Exception> firstException = new AtomicReference<>(null);
Set<TopicPartition> addedPartitions = new HashSet<>(assignedPartitions);
addedPartitions.removeAll(ownedPartitions);
if (protocol == RebalanceProtocol.COOPERATIVE) {
// 若协议是COOPERATIVE,还需要收回之前没收回的分区,触发收回的回调,并重新入组
Set<TopicPartition> revokedPartitions = new HashSet<>(ownedPartitions);
revokedPartitions.removeAll(assignedPartitions);
if (!revokedPartitions.isEmpty()) {
firstException.compareAndSet(null, invokePartitionsRevoked(revokedPartitions));
requestRejoin();
}
}
// 更新订阅信息
maybeUpdateJoinedSubscription(assignedPartitions);
try {
// 更新Assignor内部状态,默认下是空实现
assignor.onAssignment(assignment, groupMetadata);
} catch (Exception e) {
firstException.compareAndSet(null, e);
}
// 重新打开自动提交
if (autoCommitEnabled)
this.nextAutoCommitTimer.updateAndReset(autoCommitIntervalMs);
// 更新分配信息
subscriptions.assignFromSubscribed(assignedPartitions);
// 调用分配分区的回调函数
firstException.compareAndSet(null, invokePartitionsAssigned(addedPartitions));
if (firstException.get() != null)
throw new KafkaException("User rebalance callback throws an error", firstException.get());
}
c) 分区收回/分配的回调
从上面可以看到,分区被收回/分配时,都会触发回调,这个回调定义在接口ConsumerRebalanceListener
中:
1
2
3
4
5
6
public interface ConsumerRebalanceListener {
// 收回分区后触发
void onPartitionsRevoked(Collection<TopicPartition> partitions);
// 分配分区后触发
void onPartitionsAssigned(Collection<TopicPartition> partitions);
}
上面b)的invokePartitionsRevoked
和invokePartitionsAssigned
都调用了ConsumerRebalanceListener
的方法:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
private Exception invokePartitionsAssigned(final Set<TopicPartition> assignedPartitions) {
// ...
ConsumerRebalanceListener listener = subscriptions.rebalanceListener();
try {
// ...
listener.onPartitionsAssigned(assignedPartitions); // 分配分区的回调
// ...
} catch (WakeupException | InterruptException e) {
// ...
} catch (Exception e) {
// ...
}
return null;
}
private Exception invokePartitionsRevoked(final Set<TopicPartition> revokedPartitions) {
// ...
ConsumerRebalanceListener listener = subscriptions.rebalanceListener();
try {
// ...
listener.onPartitionsRevoked(revokedPartitions); // 收回分区的回调
// ...
} catch (WakeupException | InterruptException e) {
// ...
} catch (Exception e) {
// ...
}
return null;
}
3. 协调者处理请求
第2节说明了消费者端的入组,本节说明服务端协调者的处理。
3.1. REVIEW: 服务端返回响应
由于“加入组”需要协调者等待所有的消费者的请求,所以在客户端看来是“阻塞”的。但是服务端为了提高性能,不能采用阻塞的方式,因此采用的做法是:
- 服务端轮询到请求后,会交给后台线程
KafkaRequestHandler
处理(见本文2.3.节) - 处理每个请求时,定义“发送响应的回调”(
sendResponseCallback
) - 将请求连同回调传给具体负责请求的协调者(如
GroupCoordinator
),让协调者处理请求- 协调者有很多:如消费组
GroupCoordinator
,副本管理ReplicaManager
,控制器相关KafkaController
等
- 协调者有很多:如消费组
- 当协调者认为请求处理完成后,调用回调方法,
- 回调会将响应压入响应队列,处理器轮询会将响应返回给客户端(依旧见本文2.3.节)
3.2. 消费者及消费者组的元数据
a) 消费者元数据
服务端使用MemberMetadata
存储每个消费者的元数据:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
private[group] class MemberMetadata(var memberId: String, // 成员ID
val groupId: String, // 消费组ID
val groupInstanceId: Option[String],
val clientId: String,
val clientHost: String,
val rebalanceTimeoutMs: Int,
val sessionTimeoutMs: Int,
val protocolType: String,
var supportedProtocols: List[(String, Array[Byte])]) {
var assignment: Array[Byte] = Array.empty[Byte] // 消费者分区结果
var awaitingJoinCallback: JoinGroupResult => Unit = null // JOIN_GROUP回调
var awaitingSyncCallback: SyncGroupResult => Unit = null // SYNC_GROUP回调
var isLeaving: Boolean = false
var isNew: Boolean = false
val isStaticMember: Boolean = groupInstanceId.isDefined
var heartbeatSatisfied: Boolean = false
}
b) 成员组元数据管理
这部分在GroupMetadata
保存,维护了某个成员组的全部成员以及组元数据,关键字段在注释上标注了:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
private[group] class GroupMetadata(val groupId: String, // 组ID
initialState: GroupState,
time: Time) extends Logging {
type JoinCallback = JoinGroupResult => Unit
private[group] val lock = new ReentrantLock
private var state: GroupState = initialState // 组状态: Empty(初始)/Dead/Stable/PreparingRebalance/CompletingRebalance
var currentStateTimestamp: Option[Long] = Some(time.milliseconds())
var protocolType: Option[String] = None // 协议类型
var protocolName: Option[String] = None // 协议名称
var generationId = 0 // Generation号
private var leaderId: Option[String] = None // 组Leader ID
private val members = new mutable.HashMap[String, MemberMetadata] // 组成员
private val staticMembers = new mutable.HashMap[String, String]
private val pendingMembers = new mutable.HashSet[String]
private var numMembersAwaitingJoin = 0
private val supportedProtocols = new mutable.HashMap[String, Integer]().withDefaultValue(0)
private val offsets = new mutable.HashMap[TopicPartition, CommitRecordMetadataAndOffset] // 分区提交进度
private val pendingOffsetCommits = new mutable.HashMap[TopicPartition, OffsetAndMetadata]
private val pendingTransactionalOffsetCommits = new mutable.HashMap[Long, mutable.Map[TopicPartition, CommitRecordMetadataAndOffset]]()
private var receivedTransactionalOffsetCommits = false
private var receivedConsumerOffsetCommits = false
private var subscribedTopics: Option[Set[String]] = None // 已订阅的topic集合
var newMemberAdded: Boolean = false
}
3.3. 请求前检查
对于“加入组”和“同步组”请求,需要做下面检查:
- 协调者不可用
- 消费者组编号无效
- 连错协调者
- 协调者正在加载(如正在迁移数据)
- 消费者会话超时
- 协调者没有消费组,但消费者的成员编号已知
- 协调者有消费组,消费者的成员编号已知,但不在消费组中
这些都会返回特定的错误码给消费者,消费者会根据错误码进行操作(如重新选择协调者并重试入组)。
这些检查通过后:
- “加入组”请求最终会进入
doJoinGroup
/doUnknownJoinGroup
,进行进一步检查,只有在下面条件(之一)满足时才允许加入- 消费组为空,成员编号未知(第一个消费者第一次加入组时)
- 消费组不为空,成员编号未知(消费组第一次加入组)
- 消费组不为空,成员编号已知,且成员编号在组内
- “同步组”请求最终会进入
doSyncGroup
,进行进一步检查,只有在下面条件满足时才允许同步- 请求中保证消费组不为空
通过检查后,就可以执行对应的操作了,这里涉及到复杂的状态机转换操作,因此放到后面讲。
3.4. 通过回调返回响应
a) “加入组”响应
返回响应的回调函数定义如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
// 返回响应时的回调
def sendResponseCallback(joinResult: JoinGroupResult): Unit = {
// 响应创建函数
def createResponse(requestThrottleMs: Int): AbstractResponse = {
val protocolName = if (request.context.apiVersion() >= 7)
joinResult.protocolName.orNull
else
joinResult.protocolName.getOrElse(GroupCoordinator.NoProtocol)
val responseBody = new JoinGroupResponse(
new JoinGroupResponseData()
.setThrottleTimeMs(requestThrottleMs)
.setErrorCode(joinResult.error.code) // 错误码
.setGenerationId(joinResult.generationId) // generation号
.setProtocolType(joinResult.protocolType.orNull) // 协议类型
.setProtocolName(protocolName) // 协议名称
.setLeader(joinResult.leaderId) // Leader ID(主消费者)
.setMemberId(joinResult.memberId) // 当前消费者ID
.setMembers(joinResult.members.asJava) // 已知消费组的成员
)
responseBody
}
// 返回响应
sendResponseMaybeThrottle(request, createResponse)
}
当完成请求处理并返回响应时,会调用这个回调,并设置该回调字段为null
。这部分代码如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
def onCompleteJoin(group: GroupMetadata): Unit = {
group.inLock {
// ...
if /* ... */ {
// ...
} else {
group.initNextGeneration() // 增加generation
if (group.is(Empty)) {
// ...
} else {
// 一次性向每个消费者返回响应
for (member <- group.allMemberMetadata) {
val joinResult = JoinGroupResult(
members = if (group.isLeader(member.memberId)) {
group.currentMemberMetadata
} else {
List.empty
},
memberId = member.memberId,
generationId = group.generationId,
protocolType = group.protocolType,
protocolName = group.protocolName,
leaderId = group.leaderOrNull, // 响应中包含Leader ID
error = Errors.NONE) // 默认错误吗为NONE
group.maybeInvokeJoinCallback(member, joinResult) // 调用回调以返回响应
completeAndScheduleNextHeartbeatExpiration(group, member)
member.isNew = false
}
}
}
}
}
注意,返回响应是一次性返回给所有消费者的,这里涉及到“延迟操作”,它和消费组的状态机有关系,这部分放到下面说明。
b) “同步组”响应
“同步组”响应的回调定义如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
// 返回响应时的回调
def sendResponseCallback(syncGroupResult: SyncGroupResult): Unit = {
// 直接将SYNC_GROUP结果返回给客户端
sendResponseMaybeThrottle(request, requestThrottleMs =>
new SyncGroupResponse(
new SyncGroupResponseData()
.setErrorCode(syncGroupResult.error.code)
.setProtocolType(syncGroupResult.protocolType.orNull)
.setProtocolName(syncGroupResult.protocolName.orNull)
.setAssignment(syncGroupResult.memberAssignment) // 分区分配信息
.setThrottleTimeMs(requestThrottleMs)
))
}
返回响应的函数在doSyncGroup
中,这里截取最关键的一段:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
private def doSyncGroup(group: GroupMetadata,
generationId: Int,
memberId: String,
protocolType: Option[String],
protocolName: Option[String],
groupInstanceId: Option[String],
groupAssignment: Map[String, Array[Byte]],
responseCallback: SyncCallback): Unit = {
group.inLock {
// ...
group.currentState match {
case Empty =>
responseCallback(SyncGroupResult(Errors.UNKNOWN_MEMBER_ID))
case PreparingRebalance =>
responseCallback(SyncGroupResult(Errors.REBALANCE_IN_PROGRESS))
case CompletingRebalance =>
group.get(memberId).awaitingSyncCallback = responseCallback
if (group.isLeader(memberId)) {
// 对于主消费者而言,此时分配结果协调者已经知道
val missing = group.allMembers -- groupAssignment.keySet
val assignment = groupAssignment ++ missing.map(_ -> Array.empty[Byte]).toMap
// ...
groupManager.storeGroup(group, assignment, (error: Errors) => {
group.inLock {
if (group.is(CompletingRebalance) && generationId == group.generationId) {
if (error != Errors.NONE) {
// ...
} else {
// 传播消费组的分配结果,并持久化到内部topic中
setAndPropagateAssignment(group, assignment)
// 将状态转成Stable
group.transitionTo(Stable)
}
}
}
})
// ...
}
// 而对于其他消费者,不会立刻返回响应,而是:
// 1. 由上面的Leader消费者的请求触发,传播给其他消费者,此时消费组状态为Stable
// 2. 由下面Stable状态的流程触发,直接返回分配信息
case Stable =>
// 若稳定,直接返回分配结果
val memberMetadata = group.get(memberId)
responseCallback(SyncGroupResult(group.protocolType, group.protocolName, memberMetadata.assignment, Errors.NONE))
completeAndScheduleNextHeartbeatExpiration(group, group.get(memberId))
case Dead =>
throw new IllegalStateException(s"Reached unexpected condition for Dead group ${group.groupId}")
}
}
}
}
上面关键的在setAndPropagateAssignment
函数,它设置并传播每个成员的分区分配结果:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
private def setAndPropagateAssignment(group: GroupMetadata, assignment: Map[String, Array[Byte]]): Unit = {
assert(group.is(CompletingRebalance))
// 设置每个成员的分配结果
group.allMemberMetadata.foreach(member => member.assignment = assignment(member.memberId))
// 返回分配结果给每个成员
propagateAssignment(group, Errors.NONE)
}
private def propagateAssignment(group: GroupMetadata, error: Errors): Unit = {
val (protocolType, protocolName) = if (error == Errors.NONE)
(group.protocolType, group.protocolName)
else
(None, None)
for (member <- group.allMemberMetadata) {
// 对于每个成员,调用回调函数,将结果返回给每个成员
if (group.maybeInvokeSyncCallback(member, SyncGroupResult(protocolType, protocolName, member.assignment, error))) {
completeAndScheduleNextHeartbeatExpiration(group, member)
}
}
}
c) 协调者保存消费组任务
在返回“同步组”响应前,协调者会把分区分配结果持久化到topic __consumer_offsets
中。当协调节点故障时,新的协调者可从该topic读取数据并恢复。
保存状态到内部topic,调用了GroupMetadataManager#storeGroup
:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
def storeGroup(group: GroupMetadata,
groupAssignment: Map[String, Array[Byte]],
responseCallback: Errors => Unit): Unit = {
getMagic(partitionFor(group.groupId)) match {
case Some(magicValue) =>
// ...
val groupMetadataRecords = // ... 需要保存到记录
// 将记录保存到内部topic后,调用该回调将其存入缓存中
def putCacheCallback(responseStatus: Map[TopicPartition, PartitionResponse]): Unit = {
// ...
val status = responseStatus(groupMetadataPartition)
val responseError = if (status.error == Errors.NONE) {
Errors.NONE
} else {
// ...
}
}
// 调用回调responseCallback
responseCallback(responseError)
}
// 将分配信息记录追加到内部topic中,追加完后,执行回调putCacheCallback
appendForGroup(group, groupMetadataRecords, putCacheCallback)
case None =>
responseCallback(Errors.NOT_COORDINATOR)
None
}
}
private def appendForGroup(group: GroupMetadata,
records: Map[TopicPartition, MemoryRecords],
callback: Map[TopicPartition, PartitionResponse] => Unit): Unit = {
// 将分配信息记录追加到内部topic中
replicaManager.appendRecords(
timeout = config.offsetCommitTimeoutMs.toLong,
requiredAcks = config.offsetCommitRequiredAcks,
internalTopicsAllowed = true,
origin = AppendOrigin.Coordinator,
entriesPerPartition = records,
delayedProduceLock = Some(group.lock),
responseCallback = callback) // 执行完成后,运行回调,即上面的putCacheCallback
}
当然分区分配结果也会缓存起来(不然一直读盘会很慢),保存在
groupMetadataCache
中(位于GroupMetadataManager
)。它保存了它管理的所有消费组的元数据(注意,一个Broker/协调者可管理多个消费组):
1 2 3 4 5 6 7 8 9 class GroupMetadataManager(brokerId: Int, interBrokerProtocolVersion: ApiVersion, config: OffsetConfig, replicaManager: ReplicaManager, zkClient: KafkaZkClient, time: Time, metrics: Metrics) extends Logging with KafkaMetricsGroup { private val groupMetadataCache = new Pool[String, GroupMetadata] // 消费组元数据 }
4. 延迟“加入组”
上面入组的操作省略了一些细节,例如:
- 如何判断收集完所有消费者的“加入组”请求
- 如何延迟响应“加入组”和“同步组”的响应:
- “加入组”:等到所有消费者请求完后,才响应
- “同步组”:等到主消费者返回分区分配结果后,才响应
回答这些问题,需要引入这些概念:
- 延迟操作:通过它实现延迟的加入组(对应的是
DelayJoin
) - 消费组状态转换:5种状态(
Empty
,Dead
,Stable
,PreparingRebalance
,CompletingRebalance
)
本节先看延迟操作。
延迟操作的意思是:协调者不能立即执行该操作。在服务端,使用延迟操作DelayJoin
进行延迟“加入组”,表示协调者会延迟返回“加入组”响应给消费者。
4.1. 准备“再平衡”
协调者收到消费者的”加入组“请求后,除了校验,还会做其他事情。
首先协调者会准备创建/更新成员元数据:
- 当消费者成员编号未知,则分配一个编号,创建该成员元数据,并保存,编号会返回给消费者(见
addMemberAndRebalance
方法) - 当消费者成员编号已知,则更新该成员元数据(见
updateMemberAndRebalance
)
然后,开始准备“再平衡”,进入方法maybePrepareRebalance
。这里触发条件是:成员组状态必须为Stable/Empty
。它会:
- 创建
DelayedJoin
操作(若状态为Empty
,则创建其子类InitialDelayedJoin
- 将状态转换成
PreparingRebalance
- 尝试操作能否立即完成,若不能,则加入到延迟缓存中(
DelayedOperationPurgatory
,后面会讲)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
private def maybePrepareRebalance(group: GroupMetadata, reason: String): Unit = {
group.inLock { // 上锁
if (group.canRebalance) // 这里状态必须是Stable, CompletingRebalance, Empty
prepareRebalance(group, reason) // 开始准备“再平衡”
}
}
private def prepareRebalance(group: GroupMetadata, reason: String): Unit = {
if (group.is(CompletingRebalance)) // 但这里,若状态为CompletingRebalance,还是返回错误
// 所以,成员状态必须时Stable或Empty,才能出发“再平衡”准备
resetAndPropagateAssignmentError(group, Errors.REBALANCE_IN_PROGRESS)
// 创建DelayJoin
// 若目前状态为Empty,则创建InitialDelayedJoin;若为Stable,则创建DelayedJoin
val delayedRebalance = if (group.is(Empty))
new InitialDelayedJoin(this,
joinPurgatory,
group,
groupConfig.groupInitialRebalanceDelayMs,
groupConfig.groupInitialRebalanceDelayMs,
max(group.rebalanceTimeoutMs - groupConfig.groupInitialRebalanceDelayMs, 0))
else
new DelayedJoin(this, group, group.rebalanceTimeoutMs)
// 转换组状态为PreparingRebalance
group.transitionTo(PreparingRebalance)
val groupKey = GroupKey(group.groupId)
// 创建完DelayedJoin操作后,立即尝试,看看是否能立即完成
// 若不能立即完成,则会将操作加入到延迟缓存中(指定键为组ID)
joinPurgatory.tryCompleteElseWatch(delayedRebalance, Seq(groupKey))
}
易知,由于上面使用了锁,所以一次“再平衡”只会由一个消费者发起,DelayedJoin
操作只会创建1个。
4.2. 延迟操作与延迟缓存
a) 延迟操作
延迟操作(DelayedOperation
)有下面的特点:
- 需要指定超时时间,超时后会强制完成
- 需要将其加入到缓存(
DelayedOperationPurgatory
)中,并指定一个键(例如消费组编号) - 创建后,需要先尝试该操作是否能立即完成
- 判断延迟操作是否能完成,需要自定义判断和实现
这里延迟操作有3个主要接口:
tryComplete
:尝试完成onComplete
:操作完成后的回调onExpiration
:操作超时后的回调
b) 延迟缓存
延迟缓存(DelayedOperationPurgatory
)保存了延迟操作对象,并将操作和键关联。
延迟缓存有下面2个主要方法:
-
tryCompleteElseWatch
:尝试完成延迟操作,若不能完成,就以指定的键将该操作缓存起来,以进行监控监控时,任务会被加入到
Timer
中,它是一个延迟执行的线程池。当超时时,任务会被该线程池执行,执行的方法即Runnable#run
:1 2 3 4 5 6 7
// DelayedTask被加入到DelayedOperationPurgatory#timer中延迟执行 // 当超时发生时,timer会后台执行这个操作 override def run(): Unit = { // 超时时执行 if (forceComplete()) // 1. 强制结束任务 onExpiration() // 2. 触发超时回调 }
-
checkAndComplete
:检查并尝试完成指定键的延迟操作
c) DelayedJoin
DelayedJoin
是延迟操作的一个实现。它有自己的状态,即消费组元数据,保存在GroupCoordinator
中。
而GroupCoordinator
可根据消费组元数据,判断是否能够完成延迟加入的操作,依据是:消费组的每个消费者都发送了“加入组”请求。
1
2
3
4
5
6
7
8
9
10
private[group] class DelayedJoin(coordinator: GroupCoordinator,
group: GroupMetadata,
rebalanceTimeout: Long) extends DelayedOperation(rebalanceTimeout, Some(group.lock)) {
// 利用GroupCoordinator判断是否可以完成延迟入组,依据是所有消费者都发送了“加入组”请求
override def tryComplete(): Boolean = coordinator.tryCompleteJoin(group, forceComplete _)
// 任务过期的回调,定义在GroupCoordinator#onExpirationJoin中
override def onExpiration() = coordinator.onExpireJoin()
// 任务完成的回调,定义在GroupCoordinator#onCompleteJoin中
override def onComplete() = coordinator.onCompleteJoin(group)
}
这里具体看tryComplete
实现,具体放到下节说明。
4.3. 尝试完成延迟“加入组”操作
进入GroupCoordinator#tryCompleteJoin
,发现很简单:
- 判断每个消费者是否都发送了”加入组“请求
- 调用
forceComplete
,触发DelayedJoin#onComplete
回调
1
2
3
4
5
6
7
8
9
10
11
12
// In GroupCoordinator
def tryCompleteJoin(group: GroupMetadata, forceComplete: () => Boolean) = {
group.inLock {
if (group.hasAllMembersJoined) // 判断是否都发了“加入组”
forceComplete() // 若都发了,则完成任务,最后会触发onComplete回调
else false
}
}
// In GroupMetadata
// 这里条件即:等待的成员个数等于组成员个数,且处于pending状态的为空
def hasAllMembersJoined = members.size == numMembersAwaitingJoin && pendingMembers.isEmpty
而完成这个延迟“加入组”的方法是onCompleteJoin
,主要做:
- 增加组
generation
值,并更改组状态为CompletingRebalance
- 触发
awaitingJoinCallback
,返回“加入组”响应
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
def onCompleteJoin(group: GroupMetadata): Unit = {
group.inLock { // 上锁
// ...
} else {
group.initNextGeneration() // 增加generation值,并将状态转化为CompletingRebalance
// ...
} else {
// 对组内每个成员,发送JOIN_GROUP响应
for (member <- group.allMemberMetadata) {
val joinResult = JoinGroupResult(
members = if (group.isLeader(member.memberId)) {
group.currentMemberMetadata // 给Leader返回组成员信息
} else {
List.empty
},
memberId = member.memberId,
generationId = group.generationId,
protocolType = group.protocolType,
protocolName = group.protocolName,
leaderId = group.leaderOrNull,
error = Errors.NONE)
// 触发回调,即返回响应
group.maybeInvokeJoinCallback(member, joinResult)
completeAndScheduleNextHeartbeatExpiration(group, member)
member.isNew = false
}
}
}
}
}
def maybeInvokeJoinCallback(member: MemberMetadata,
joinGroupResult: JoinGroupResult): Unit = {
if (member.isAwaitingJoin) {
member.awaitingJoinCallback(joinGroupResult) // 触发回调
member.awaitingJoinCallback = null // 设置成员状态,成员不再awaitingJoin
numMembersAwaitingJoin -= 1 // numMembersAwaitingJoin计数-1
}
}
之前提及“只有所有消费者都发送‘加入组’请求后,才会响应”,但是有例外,即第一个消费者发送“入组”请求:
- 协调者处理第一个消费者的“加入组”请求,创建元数据并保存
- 消费组状态变成
PreparingRebalance
- 创建
DelayedJoin
操作,并立刻尝试完成它,由于从上面的hasAllMembersJoined
条件成立(members.size
为1,而numMembersAwaitingJoin
也是1,pendingMembers
为空),所以可以完成该操作 - 完成延迟操作,调用
GroupCoordinator#onCompleteJoin
,将组信息返回给消费者(这里消费者只有1个,且主消费者也是它)
因此,最后的结果就是:
- 整个组只有1个成员,组状态为
CompletingRebalance
- 该成员即主消费者,可以执行分区分配,并返回
SYNC_GROUP
请求 - 最后整个消费组(仅1个成员)状态变为
Stable
,入组结束
注意:JOIN_GROUP
和SYNC_GROUP
处理都会对消费组上锁,所以对于某个消费组,只能同时执行1个
但是还有一些问题:
-
尝试完成
DelayedJoin
前,其他消费者发送了JOIN_GROUP
:由于处理
JOIN_GROUP
上锁,第一个消费者的延迟操作会成功,会返回响应,状态会变为CompletingRebalance
,此时消费组状态不稳定。之后,才会处理其他消费者的JOIN_GROUP
。这和下面2的第2个子问题等价。 -
完成
DelayedJoin
后,其他消费者发送了JOIN_GROUP
-
若其他消费者的
JOIN_GROUP
在后,第一个消费者SYNC_GROUP
在前:对应消费组稳定(Stable
)后,其他消费者入组 -
若其他消费者的
JOIN_GROUP
在前,第一个消费者SYNC_GROUP
在后:对应消费组不稳定(CompletingRebalance
)时,其他消费者入组
-
第2个问题需要解决,分为2个字问题,放在下节说明。
4.4. 消费组稳定后,加入新消费者
这是4.3.中的第1个子问题。
当消费组稳定后,新消费者发送JOIN_GROUP
,会触发整个组的重新加入,原有存在的消费者也需要重新入组。
那么新消费者和原有消费者都会执行4.1~4.3提及的事情:
-
2个消费者都向协调者发送
JOIN_GROUP
-
消费组状态变成
PreparingRebalancing
,并创建DelayedJoin
操作(只会有1个),最先收到JOIN_GROUP
对应的消费者是Leader -
由于有2个消费者,因此需要
members.size == numMembersAwaitingJoin == 2
,等待收到2个消费者的JOIN_GROUP
后,才能完成DelayedJoin
操作,触发完成该操作的条件有:-
外部事件触发:收到最后一个消费者的
JOIN_GROUP
后,最后会尝试完成这个延迟操作,将响应返回给2个消费者代码解释如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
def handleJoinGroup(...): Unit = { // 对于最后一个消费者 // ... } else { val isUnknownMember = memberId == JoinGroupRequest.UNKNOWN_MEMBER_ID groupManager.getGroup(groupId) match { case None => // ... case Some(group) => group.inLock { // ... // 加入新消费者/更新消费者,使其入组 // 这里不会创建DelayedJoin操作,因为之前创建过,且状态为PreparingRebalance,也不能创建 } else if (isUnknownMember) { doUnknownJoinGroup(group, groupInstanceId, requireKnownMemberId, clientId, clientHost, rebalanceTimeoutMs, sessionTimeoutMs, protocolType, protocols, responseCallback) } else { doJoinGroup(group, memberId, groupInstanceId, clientId, clientHost, rebalanceTimeoutMs, sessionTimeoutMs, protocolType, protocols, responseCallback) } // 对于最后一个消费者,会进入该分支 if (group.is(PreparingRebalance)) { // 这一步尝试完成DelayedJoin事件 // 这里会触发DelayedJoin事件的完成回调,因为它满足4.3.中提及的hasAllMembersJoined条件 joinPurgatory.checkAndComplete(GroupKey(group.groupId)) } } } } }
-
超时触发:最后一个消费者的
JOIN_GROUP
没有收到,事件也会强制完成,响应只会返回给前面收到JOIN_GROUP
请求的消费者
-
下面举例,比较直观地说明了新消费者入组的流程:
- (第一轮):第一个消费者
JOIN_GROUP
并SYNC_GROUP
,完成重分配 - (第二轮):
- 第二个消费者
JOIN_GROUP
,触发第一个消费者重新JOIN_GROUP
(通过心跳可感知) - 第二个
JOIN_GROUP
请求先来,消费组状态变为PreparingRebalance
,并创建了DelayedJoin
操作,但不能完成
- 第二个消费者
- (第二轮)
- 第一个消费者
JOIN_GROUP
后来,不能创建DelayedJoin
,但最后尝试并且能完成DelayedJoin
操作 - 消费组状态变成
CompletingRebalance
,返回响应给两个消费者 - 最后进行
SYNC_GROUP
,Leader上传分配信息,并由协调者传播给其他消费者,消费组状态变成Stable
,具体参考3.4.b
- 第一个消费者
4.5. 消费组稳定前,加入新消费者
这是4.3.中的第2个子问题。
由于新消费者的JOIN_GROUP
先到,之前消费者收到了响应后,再进行SYNC_GROUP
就没有用了。
此时,消费者组的状态是CompletingRebalance
,新消费者的JOIN_GROUP
请求做的和4.1.~4.3.一模一样
- 将状态改回
PreparingRebalance
- 创建延迟操作
DelayedJoin
- 尝试完成延迟操作,但是不能完成,因为
hasAllMembersJoined
没满足(members.size == 2
但numMembersAwaitingJoin == 1
)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
private def doJoinGroup(...): Unit = {
group.inLock { // 上锁
// ...
} else {
val groupInstanceIdNotFound = groupInstanceId.isDefined && !group.hasStaticMember(groupInstanceId)
// ...
} else {
val member = group.get(memberId)
group.currentState match {
// ...
// 对应CompletingRebalance状态
case CompletingRebalance =>
if (member.matches(protocols)) {
// 若元数据没改变,则不需要重分配,直接返回响应
responseCallback(JoinGroupResult(...)
} else {
// 一般进入这里,执行4.1.~4.3.一样的操作
updateMemberAndRebalance(group, member, protocols, responseCallback)
}
// ...
}
}
}
}
}
之前的消费者会响应SYNC_GROUP
,它在新消费者处理完JOIN_GROUP
后处理,因为上锁的缘故,消费者组的状态变为了PreparingRebalance
(正常是CompletingRebalance
),所以:
- 返回之前的消费者
REBALANCE_IN_PROGRESS
错误码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
private def doSyncGroup(...): Unit = {
group.inLock {
// ...
} else {
group.currentState match {
// ...
// 此时之前的消费者SYNC_GROUP请求处理会来到这
// 返回REBALANCE_IN_PROGRESS错误码
// 消费者收到后会重新发送JOIN_GROUP请求
case PreparingRebalance =>
responseCallback(SyncGroupResult(Errors.REBALANCE_IN_PROGRESS))
// ...
}
}
}
}
消费者收到REBALANCE_IN_PROGRESS
后,会:
- 抛出
RebalanceInProgressException
异常 - 然后会重新发起“加入组”请求
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
boolean joinGroupIfNeeded(final Timer timer) {
while (rejoinNeededOrPending()) {
// ...
// 发起JOIN_GROUP,然后发起SYNC_GROUP
// SYNC_GROUP会返回REBALANCE_IN_PROGRESS错误
final RequestFuture<ByteBuffer> future = initiateJoinGroup();
client.poll(future, timer); // 轮询以获取入组结果
if (!future.isDone()) {
return false; // 超时
}
if (future.succeeded()) {
// ...
} else {
// 收到REBALANCE_IN_PROGRESS,Future会抛出RebanalceInProgress异常
final RuntimeException exception = future.exception();
// ...
resetJoinGroupFuture(); // 重置(将joinFuture设为null,此时while条件返回true)
if (exception instanceof UnknownMemberIdException ||
exception instanceof RebalanceInProgressException || // 遇到RebanalceInProgress
exception instanceof IllegalGenerationException ||
exception instanceof MemberIdRequiredException)
continue; // 进入下一轮循环,即重试,发起JOIN_GROUP请求
else if (!future.isRetriable())
throw exception;
timer.sleep(rebalanceConfig.retryBackoffMs);
}
}
return true;
}
protected synchronized boolean rejoinNeededOrPending() {
return rejoinNeeded || joinFuture != null;
}
private synchronized void resetJoinGroupFuture() {
this.joinFuture = null;
}
在此之后,消费者入组就和4.4.节的入组流程一致,这里不再赘述。
5. 消费组状态机与入组
从上面4章可以看出,协调者处理消费者的“加入组”和“同步组”,不仅需要“延迟操作”,而且更加依赖消费组的状态。
下一篇文章会专门讲消费组状态及其转换,并总结协调者“加入组”、“同步组”、“离开组”、超时和心跳的处理。