1. 概览
上一篇比较详细地说明了消费者和协调者是如何处理“加入组”与“同步组”,从而完成消费者入组和分区重平衡。
对于协调者而言,处理这些请求需要依赖:
- 延迟操作(
DelayedJoin
) - 消费组的状态
明显,消费组的状态也是非常重要的。本文主要说明消费者组的状态机转换,并总结协调者如何处理:
- “加入组”、“同步组”、“离开组”请求
- 再平衡超时、会话超时
- 延迟的心跳
2. 消费组的状态转换
消费组的状态有:Empty
, Stable
, PreparingRebalance
, CompletingRebalance
和Dead
。初始状态为Empty
。
对于消费者入组重平衡,消费组的状态转换规律为:
- 原状态为
Empty | Stable
:- 收到
JOIN_GROUP
:创建DelayJoin
,状态变为PreparingRebalance
- 尝试完成
DelayJoin
:若成功,返回JOIN_GROUP
响应,状态变为CompletingRebalance
- 收到
SYNC_GROUP
:- 状态为
Empty
:返回错误,状态不变 - 状态为
Stable
:返回SYNC_GROUP
响应(即分区分配信息),状态不变
- 状态为
- 收到
- 原状态为
PreparingRebalance
:- 收到
JOIN_GROUP
:不会创建DelayJoin
,状态不变 - 尝试完成
DelayJoin
:若成功,返回JOIN_GROUP
响应,状态变为CompletingRebalance
- 收到
SYNC_GROUP
:返回错误,状态不变
- 收到
- 原状态为
CompletingRebalance
:- 收到
JOIN_GROUP
:创建DelayJoin
,状态变为PreparingRebalance
- 尝试完成
DelayJoin
:一般不会成功,状态不变(若成功,返回JOIN_GROUP
响应,状态变为CompletingRebalance
) - 收到
SYNC_GROUP
:- 对于非Leader:不返回
SYNC_GROUP
响应,状态不变 - 对于Leader:持久化分区分配信息,返回
SYNC_GROUP
响应给所有消费者,状态变为Stable
- 对于非Leader:不返回
- 收到
- 原状态为
Dead
:- 以上操作均返回错误,状态不变
此外:
-
处理
JOIN_GROUP
和SYNC_GROUP
时,消费组会上锁,所以对于某个消费组,协调者只能同时处理1个JOIN_GROUP
和SYNC_GROUP
请求。这里可看下图: -
不论是否创建
DelayedJoin
操作,收到JOIN_GROUP
最后,都会尝试完成它,因为该请求是触发这个延迟事件完成的外部条件。
3. 协调者处理“加入组”请求
从2中,很容易得出协调者如何处理“加入组”请求:
- 原始状态为
Empty | Stable
:状态变化为PreparingRebalance
- 创建/更新成员元数据
- 创建
DelayedJoin
延迟事件 - 尝试完成
DelayedJoin
事件:若成功,则状态变化为CompletingRebalance
,广播JOIN_GROUP
响应
- 原始状态为
PreparingRebalance
:状态不变- 创建/更新成员元数据
- 尝试完成之前创建的
DelayedJoin
事件:若成功,则状态变化为CompletingRebalance
,广播JOIN_GROUP
响应
- 原始状态为
CompletingRebalance
:状态变化为PreparingRebalance
- 创建/更新成员元数据
- 创建
DelayedJoin
延迟事件(此时已有消费者需要重发JOIN_GROUP
响应,因为它SYNC_GROUP
响应会返回错误) - 尝试完成
DelayedJoin
事件:通常不会成功
- 原始状态为
Dead
:返回错误
这部分代码在doUnknownJoinGroup
和doJoinGroup
中处理:
-
对于
Empty
状态,调用前者,直接调用addMemberAndRebalance
进行再平衡:1 2 3 4 5 6 7 8 9 10 11 12
private def doUnknownJoinGroup(...): Unit = { group.inLock { // ... } else { // ... // 直接增加成员,并开始重平衡,状态会变成PreparingRebalance addMemberAndRebalance(rebalanceTimeoutMs, sessionTimeoutMs, newMemberId, groupInstanceId, clientId, clientHost, protocolType, protocols, group, responseCallback) } } } }
-
对于其他状态,调用后者,会调用
addMemberAndRebalance
(未知消费者)或updateMemberAndRebalance
(已知消费者)进行再平衡这里有一个特殊情形:
当
CompletingRebalance
下,收到了元数据一致的JOIN_GROUP
,会直接返回JOIN_GROUP
响应。这是因为消费者可能没收到JOIN_GROUP
响应,导致请求重发。这时候不需要再次重平衡,只需返回JOIN_GROUP
响应即可。同样这也适用于
SYNC_GROUP
请求:当
Stable
下,收到了SYNC_GROUP
请求,会直接返回SYNC_GROUP
响应。当消费者没收到SYNC_GROUP
响应时导致了重发,这也不会造成错误。1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53
private def doJoinGroup(...): Unit = { group.inLock { // ... } else { val member = group.get(memberId) group.currentState match { // PreparingRebalance下,开始重平衡,状态不需要变化 case PreparingRebalance => updateMemberAndRebalance(group, member, protocols, responseCallback) // CompletingRebalance下,开始重平衡,状态变为PreparingRebalance // 已有消费者需要重发入组请求 case CompletingRebalance => if (member.matches(protocols)) { // 当JOIN_GROUP请求的元数据是一致的,在CompletingRebalance下, // 可能是消费者没收到JOIN_GROUP的响应,导致了重发 // 这时候,直接返回JOIN_GROUP的响应即可 responseCallback(JoinGroupResult( members = if (group.isLeader(memberId)) { group.currentMemberMetadata } else { List.empty }, memberId = memberId, generationId = group.generationId, protocolType = group.protocolType, protocolName = group.protocolName, leaderId = group.leaderOrNull, error = Errors.NONE)) } } else { updateMemberAndRebalance(group, member, protocols, responseCallback) } // Stable下,开始重平衡,状态变为PreparingRebalance // 已有消费者需要重发入组请求 case Stable => val member = group.get(memberId) if (group.isLeader(memberId) || !member.matches(protocols)) { updateMemberAndRebalance(group, member, protocols, responseCallback) } else { // ... } // Empty|Dead下,对于已知消费者,直接返回错误 case Empty | Dead => // 返回错误 responseCallback(JoinGroupResult(memberId, Errors.UNKNOWN_MEMBER_ID)) } } } } }
4. 协调者处理“同步组”请求
从2中,也很容易得出协调者如何处理“同步组”请求:
- 原始状态为
Empty | Dead
:返回错误 - 原始状态为
Stable
:状态不变- 直接返回
SYNC_GROUP
响应
- 直接返回
- 原始状态为
PreparingRebalance
:返回错误,这会让消费者重发JOIN_GROUP
请求 - 原始状态为
CompletingRebalance
:若请求来自Leader消费者,状态变化为Stable
;否则什么都不做- 将分区分配信息持久化到内部topic中
- 广播
SYNC_GROUP
响应给消费者
这部分代码在doSyncGroup
:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
private def doSyncGroup(group: GroupMetadata,
generationId: Int,
memberId: String,
protocolType: Option[String],
protocolName: Option[String],
groupInstanceId: Option[String],
groupAssignment: Map[String, Array[Byte]],
responseCallback: SyncCallback): Unit = {
group.inLock {
// ...
group.currentState match {
// Empty下,返回错误
case Empty =>
responseCallback(SyncGroupResult(Errors.UNKNOWN_MEMBER_ID))
// PreparingRebalance下,返回错误,会触发JOIN_GROUP重发
case PreparingRebalance =>
responseCallback(SyncGroupResult(Errors.REBALANCE_IN_PROGRESS))
// CompletingRebalance下,只对Leader响应
// 先持久化分区分配信息,然后广播SYNC_GROUP响应(不止是Leader),并转换状态为Stable
case CompletingRebalance =>
group.get(memberId).awaitingSyncCallback = responseCallback
if (group.isLeader(memberId)) {
// ...
val missing = group.allMembers -- groupAssignment.keySet
val assignment = groupAssignment ++ missing.map(_ -> Array.empty[Byte]).toMap
// ...
groupManager.storeGroup(group, assignment, (error: Errors) => {
group.inLock {
// ...
if (group.is(CompletingRebalance) && generationId == group.generationId) {
// ...
else {
setAndPropagateAssignment(group, assignment)
group.transitionTo(Stable)
}
}
}
})
groupCompletedRebalanceSensor.record()
}
// Stable下,直接返回SYNC_GROUP请求
case Stable =>
val memberMetadata = group.get(memberId)
responseCallback(SyncGroupResult(group.protocolType, group.protocolName, memberMetadata.assignment, Errors.NONE))
completeAndScheduleNextHeartbeatExpiration(group, group.get(memberId))
// ...
}
}
}
}
5. 协调者处理“离开组”请求
离开组有多种情况,如进程关闭、消费者不订阅topic等等。
正常情况下,对于消费者,它会:
-
关闭与协调者的心跳任务
-
向协调者发送“离开组”请求
-
重置相关信息
正常情况下,对于协调者,它会处理“离开组”请求,它会:
-
给消费组加锁(与“加入组”和“同步组”一样,因此它们互斥,不能同时执行)
-
关闭和消费者的心跳
- 移除消费者成员信息,更新消费组数据
- 根据消费组状态,影响分区重平衡
其中第4步相对重要,和消费组状态有关:
- 原始状态为
Dead | Empty
:什么都不做 - 原始状态为
Stable | CompletingRelance
:由于消费者离开,分区需要重新分配,因此需要触发一次分区再平衡,状态会变为PreparingRebalance
- 原始状态为
PreparingRebalance
:由于消费者离开,DelayedJoin
的延迟操作就有可能完成,因此需要尝试完成该延迟操作
代码位于removeMemberAndUpdateGroup
:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
private def removeMemberAndUpdateGroup(group: GroupMetadata, member: MemberMetadata, reason: String): Unit = {
group.maybeInvokeJoinCallback(member, JoinGroupResult(JoinGroupRequest.UNKNOWN_MEMBER_ID, Errors.UNKNOWN_MEMBER_ID))
// 更新组信息
group.remove(member.memberId)
group.removeStaticMember(member.groupInstanceId)
// 根据消费组状元,影响分区重平衡
group.currentState match {
case Dead | Empty => // Dead|Empty,什么都不做
// Stable|CompletingRebalance,需要触发一次分区重平衡,状态变为PreparingRebalance
case Stable | CompletingRebalance => maybePrepareRebalance(group, reason)
// PreparingRebalance,DelayedJoin操作可能可以完成,尝试完成它
case PreparingRebalance => joinPurgatory.checkAndComplete(GroupKey(group.groupId))
}
}
6. 再平衡超时与会话超时
消费者发送“加入组”请求后,协调者会创建一个延迟操作DelayedJoin
。这个操作往往不能立即完成,因此需要一个时间限制。
当DelayedJoin
超时,即规定时间内没有完全收到所有成员的”加入组”请求,会强制让其完成,GroupCoordinator#onCompleteJoin
还是会被调用:
-
对于超时的情形,找到所有没有收到“加入组”请求的消费者,关闭其心跳,并将其从组中移出
-
依旧给其他消费者返回“加入组”响应
-
完成本次的“延迟心跳”(
DelayedHeartbeat
),并调度下一次的心跳
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
def onCompleteJoin(group: GroupMetadata): Unit = {
group.inLock {
// 对于没收到JOIN_GROUP的消费者,可认为心跳失败
// 直接关闭其心跳,并从组中移除
group.notYetRejoinedMembers.filterNot(_.isStaticMember) foreach { failedMember =>
removeHeartbeatForLeavingMember(group, failedMember)
group.remove(failedMember.memberId)
group.removeStaticMember(failedMember.groupInstanceId)
}
// ...
} else {
group.initNextGeneration() // Generation + 1,状态变成CompletingRebalance
// ...
} else {
// 返回JOIN_GROUP响应给所有成员
for (member <- group.allMemberMetadata) {
val joinResult = JoinGroupResult(...)
group.maybeInvokeJoinCallback(member, joinResult) // 触发回调以返回响应
completeAndScheduleNextHeartbeatExpiration(group, member) // 完成“延迟心跳”,调度下一次心跳
member.isNew = false
}
}
}
}
}
这里第1步是额外应对超时情形,而后2步和正常情况下一样。
注意这里的第3步。完成DelayedJoin
后,可认为它是一个心跳,因此可以认为本次的DelayedHeartbeat
已完成,可进行下一轮的调度:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
private def completeAndScheduleNextHeartbeatExpiration(group: GroupMetadata, member: MemberMetadata): Unit = {
completeAndScheduleNextExpiration(group, member, member.sessionTimeoutMs)
}
private def completeAndScheduleNextExpiration(group: GroupMetadata, member: MemberMetadata, timeoutMs: Long): Unit = {
val memberKey = MemberKey(member.groupId, member.memberId)
// 设置成员收到心跳
member.heartbeatSatisfied = true
// 完成本次的心跳任务
heartbeatPurgatory.checkAndComplete(memberKey)
// 创建新一轮延迟心跳
// 设置本轮心跳状态为false(因为本轮还没发起心跳)
member.heartbeatSatisfied = false
// 创建新的延迟心跳操作
val delayedHeartbeat = new DelayedHeartbeat(this, group, member.memberId, isPending = false, timeoutMs)
// 尝试完成延迟心跳,这里尝试会失败,会直接放入到缓存进行监控
heartbeatPurgatory.tryCompleteElseWatch(delayedHeartbeat, Seq(memberKey))
}
当消费者收到JOIN_GROUP
响应后,要尽快发送SYNC_GROUP
请求,否则协调者会认为消费者挂掉。处理SYNC_GROUP
时,当组状态为Stable
或CompletingRebalance
,也会重新调度DelayedHeartbeat
操作,其中前者为单个消费者,后者为所有消费者。
关于延迟心跳(DelayedHearbeat
),更多的可看下节。
7. 延迟心跳
7.1. 延迟心跳事件
和其他延迟操作一样,DelayedHeartbeat
操作也需要外部事件触发来完成,这个外部事件即:服务端和客户端有通信。
和其他延迟操作一样,DelayedHeartbeat
也有3个重要方法:
tryComplete
:若消费者存活,则操作可以完成onExpiration
:超时后的回调onComplete
:完成操作后的回调,该回调为空
对于尝试完成DelayedHeartbeat
,代码如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
def tryCompleteHeartbeat(group: GroupMetadata,
memberId: String,
isPending: Boolean,
forceComplete: () => Boolean): Boolean = {
group.inLock {
if (group.is(Dead)) {
forceComplete() // 组已经Dead,完成即可
} else if (isPending) {
// isPending = true时,成员没有加入组中,说明成员正在加入组,肯定存活,直接完成即可
if (group.has(memberId)) {
forceComplete()
} else false
} else if (shouldCompleteNonPendingHeartbeat(group, memberId)) {
// isPending = false时,成员已经完成入组(重平衡)
// 则需要成员已经收到了心跳,或者它即将离开,则直接完成
forceComplete()
} else false
}
}
def shouldCompleteNonPendingHeartbeat(group: GroupMetadata, memberId: String): Boolean = {
if (group.has(memberId)) {
val member = group.get(memberId)
member.hasSatisfiedHeartbeat || member.isLeaving
} else {
info(s"Member id $memberId was not found in ${group.groupId} during heartbeat completion check")
true
}
}
而对于心跳超时,代码如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
def onExpireHeartbeat(group: GroupMetadata, memberId: String, isPending: Boolean): Unit = {
group.inLock {
if (group.is(Dead)) {
// ...
} else if (isPending) {
// ...
// 若成员没有入组,超时则直接移除该成员到组外
removePendingMemberAndUpdateGroup(group, memberId)
} else if (!group.has(memberId)) {
// ...
} else {
val member = group.get(memberId)
if (!member.hasSatisfiedHeartbeat) {
// 若成员入组,且没收到心跳,超时则将其移出组外
removeMemberAndUpdateGroup(group, member, s"removing member ${member.memberId} on heartbeat expiration")
}
}
}
}
7.2. 处理消费者的“心跳”请求
当处理、响应“加入组”,“同步组”,“离开组”的请求时,协调会触发延迟心跳的完成操作,调用第6节所述的completeAndScheduleNextHeartbeatExpiration
方法:
- 尝试完成
DelayedHeartbeat
- 若正常,则什么都不做
- 若超时,则会将成员移出组外,并可能触发再平衡
客户端也会向服务端发出“心跳”请求,协调者关于这部分处理和“加入组”、“同步组”类似,也会调用completeAndScheduleNextHeartbeatExpiration
方法完成延迟心跳任务,并调度下一轮的延迟心跳:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
def handleHeartbeat(groupId: String,
memberId: String,
groupInstanceId: Option[String],
generationId: Int,
responseCallback: Errors => Unit): Unit = {
// ...
groupManager.getGroup(groupId) match {
case None =>
responseCallback(Errors.UNKNOWN_MEMBER_ID)
case Some(group) => group.inLock {
// ...
else {
group.currentState match {
// 若为Empty,返回错我
// 因为消费者是入组后才启动心跳线程的
case Empty =>
responseCallback(Errors.UNKNOWN_MEMBER_ID)
// 若为CompletingRebalance,返回错误,会触发JOIN_GROUP重发
// 因为这个状态下,消费者还是不能启动心跳的
// 或者是消费者以为消费组稳定,但是服务端不认为,需要让消费者感知到这个不稳定,从而触发JOIN_GROUP重发
case CompletingRebalance =>
responseCallback(Errors.REBALANCE_IN_PROGRESS)
// 若为PreparingRebalance,状态不变,完成延迟心跳任务并调度下一轮心跳
// 但依旧返回错误,因为这个状态下,消费者依旧不能启动心跳
// 或者是消费者以为消费组稳定,但是服务端不认为,需要让消费者感知到这个不稳定,从而触发JOIN_GROUP重发
case PreparingRebalance =>
val member = group.get(memberId)
completeAndScheduleNextHeartbeatExpiration(group, member)
responseCallback(Errors.REBALANCE_IN_PROGRESS)
// 若为Stable,则就是正常的心跳,完成延迟心跳任务并调度下一轮心跳即可
case Stable =>
val member = group.get(memberId)
completeAndScheduleNextHeartbeatExpiration(group, member)
responseCallback(Errors.NONE)
// 若为Dead,抛出异常
case Dead =>
throw new IllegalStateException(s"Reached unexpected condition for Dead group $groupId")
}
}
}
}
}
注意CompletingRebalance
和PreparingRebalance
的处理,会触发消费者的JOIN_GROUP
重发,这是因为:
- 该状态下,消费者是不能启动心跳的
- 或是消费者以为消费组稳定,但是服务端不认为,需要让消费者感知到这个不稳定,从而返回错误,触发
JOIN_GROUP
的重发
7.3. 心跳超时
注意DelayedHeartbeat
也是DelayedOperation
,它会被放入Timer
中,当超时的时候,被Timer
触发执行,强制完成任务并触发超时回调。(见前文4.2.b)
此时服务端就感知到了心跳超时,并将对应消费者移出到组外,并触发分区重平衡。
8. 总结
本文和前文主要讲了协调者是如何处理消费者入组重平衡的。
消费者入组需要完成:
- “加入组”
- “同步组”
而协调者的处理至关重要,这里关键的在于:
- 延迟操作
- 消费组状态机的转换
而协调者通过监控消费者的心跳来确认消费者是否存活:
- 若超时,则会将消费者从组中移除
- 在“加入组”和“同步组”时,也会额外视作心跳处理,以监控消费者的存活状态