Kafka技术内幕-协调者(2)

Posted by keys961 on June 28, 2020

1. 概览

上一篇比较详细地说明了消费者和协调者是如何处理“加入组”与“同步组”,从而完成消费者入组和分区重平衡。

对于协调者而言,处理这些请求需要依赖:

  • 延迟操作(DelayedJoin
  • 消费组的状态

明显,消费组的状态也是非常重要的。本文主要说明消费者组的状态机转换,并总结协调者如何处理:

  • “加入组”、“同步组”、“离开组”请求
  • 再平衡超时、会话超时
  • 延迟的心跳

2. 消费组的状态转换

消费组的状态有:Empty, Stable, PreparingRebalance, CompletingRebalanceDead。初始状态为Empty

对于消费者入组重平衡,消费组的状态转换规律为:

  • 原状态为Empty | Stable
    • 收到JOIN_GROUP:创建DelayJoin,状态变为PreparingRebalance
    • 尝试完成DelayJoin:若成功,返回JOIN_GROUP响应,状态变为CompletingRebalance
    • 收到SYNC_GROUP
      • 状态为Empty:返回错误,状态不变
      • 状态为Stable:返回SYNC_GROUP响应(即分区分配信息),状态不变
  • 原状态为PreparingRebalance
    • 收到JOIN_GROUP:不会创建DelayJoin,状态不变
    • 尝试完成DelayJoin:若成功,返回JOIN_GROUP响应,状态变为CompletingRebalance
    • 收到SYNC_GROUP:返回错误,状态不变
  • 原状态为CompletingRebalance
    • 收到JOIN_GROUP:创建DelayJoin,状态变为PreparingRebalance
    • 尝试完成DelayJoin:一般不会成功,状态不变(若成功,返回JOIN_GROUP响应,状态变为CompletingRebalance
    • 收到SYNC_GROUP
      • 对于非Leader:不返回SYNC_GROUP响应,状态不变
      • 对于Leader:持久化分区分配信息,返回SYNC_GROUP响应给所有消费者,状态变为Stable
  • 原状态为Dead
    • 以上操作均返回错误,状态不变

此外:

  1. 处理JOIN_GROUPSYNC_GROUP时,消费组会上锁,所以对于某个消费组,协调者只能同时处理1个JOIN_GROUPSYNC_GROUP请求。这里可看下图:

  2. 不论是否创建DelayedJoin操作,收到JOIN_GROUP最后,都会尝试完成它,因为该请求是触发这个延迟事件完成的外部条件。

3. 协调者处理“加入组”请求

从2中,很容易得出协调者如何处理“加入组”请求:

  • 原始状态为Empty | Stable:状态变化为PreparingRebalance
    • 创建/更新成员元数据
    • 创建DelayedJoin延迟事件
    • 尝试完成DelayedJoin事件:若成功,则状态变化为CompletingRebalance,广播JOIN_GROUP响应
  • 原始状态为PreparingRebalance:状态不变
    • 创建/更新成员元数据
    • 尝试完成之前创建的DelayedJoin事件:若成功,则状态变化为CompletingRebalance,广播JOIN_GROUP响应
  • 原始状态为CompletingRebalance:状态变化为PreparingRebalance
    • 创建/更新成员元数据
    • 创建DelayedJoin延迟事件(此时已有消费者需要重发JOIN_GROUP响应,因为它SYNC_GROUP响应会返回错误)
    • 尝试完成DelayedJoin事件:通常不会成功
  • 原始状态为Dead:返回错误

这部分代码在doUnknownJoinGroupdoJoinGroup中处理:

  • 对于Empty状态,调用前者,直接调用addMemberAndRebalance进行再平衡:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    
      private def doUnknownJoinGroup(...): Unit = {
        group.inLock {
            // ...
            } else {
              // ...
              // 直接增加成员,并开始重平衡,状态会变成PreparingRebalance
              addMemberAndRebalance(rebalanceTimeoutMs, sessionTimeoutMs, newMemberId, groupInstanceId,
                clientId, clientHost, protocolType, protocols, group, responseCallback)
            }
          }
        }
      }
    
  • 对于其他状态,调用后者,会调用addMemberAndRebalance(未知消费者)或updateMemberAndRebalance(已知消费者)进行再平衡

    这里有一个特殊情形:

    CompletingRebalance下,收到了元数据一致的JOIN_GROUP,会直接返回JOIN_GROUP响应。这是因为消费者可能没收到JOIN_GROUP响应,导致请求重发。这时候不需要再次重平衡,只需返回JOIN_GROUP响应即可。

    同样这也适用于SYNC_GROUP请求:

    Stable下,收到了SYNC_GROUP请求,会直接返回SYNC_GROUP响应。当消费者没收到SYNC_GROUP响应时导致了重发,这也不会造成错误。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    
      private def doJoinGroup(...): Unit = {
        group.inLock {
            // ...
            } else {
              val member = group.get(memberId)
              group.currentState match {
                // PreparingRebalance下,开始重平衡,状态不需要变化
                case PreparingRebalance =>
                  updateMemberAndRebalance(group, member, protocols, responseCallback)
                  
                // CompletingRebalance下,开始重平衡,状态变为PreparingRebalance
                // 已有消费者需要重发入组请求
                case CompletingRebalance =>
                  if (member.matches(protocols)) {
                      // 当JOIN_GROUP请求的元数据是一致的,在CompletingRebalance下,
                      // 可能是消费者没收到JOIN_GROUP的响应,导致了重发
                      // 这时候,直接返回JOIN_GROUP的响应即可
                      responseCallback(JoinGroupResult(
                        members = if (group.isLeader(memberId)) {
                          group.currentMemberMetadata
                        } else {
                          List.empty
                        },
                        memberId = memberId,
                        generationId = group.generationId,
                        protocolType = group.protocolType,
                        protocolName = group.protocolName,
                        leaderId = group.leaderOrNull,
                        error = Errors.NONE))
                    }
                  } else {
                    updateMemberAndRebalance(group, member, protocols, responseCallback)
                  }
            
                // Stable下,开始重平衡,状态变为PreparingRebalance
                // 已有消费者需要重发入组请求
                case Stable =>
                  val member = group.get(memberId)
                  if (group.isLeader(memberId) || !member.matches(protocols)) {
                    updateMemberAndRebalance(group, member, protocols, responseCallback)
                  } else {
                    // ...
                  }
            
                // Empty|Dead下,对于已知消费者,直接返回错误
                case Empty | Dead =>
                  // 返回错误
                  responseCallback(JoinGroupResult(memberId, Errors.UNKNOWN_MEMBER_ID))
              }
            }
          }
        }
      }
    

4. 协调者处理“同步组”请求

从2中,也很容易得出协调者如何处理“同步组”请求:

  • 原始状态为Empty | Dead:返回错误
  • 原始状态为Stable:状态不变
    • 直接返回SYNC_GROUP响应
  • 原始状态为PreparingRebalance:返回错误,这会让消费者重发JOIN_GROUP请求
  • 原始状态为CompletingRebalance:若请求来自Leader消费者,状态变化为Stable;否则什么都不做
    • 将分区分配信息持久化到内部topic中
    • 广播SYNC_GROUP响应给消费者

这部分代码在doSyncGroup

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
private def doSyncGroup(group: GroupMetadata,
                        generationId: Int,
                        memberId: String,
                        protocolType: Option[String],
                        protocolName: Option[String],
                        groupInstanceId: Option[String],
                        groupAssignment: Map[String, Array[Byte]],
                        responseCallback: SyncCallback): Unit = {
  group.inLock {
      // ... 
      group.currentState match {
        // Empty下,返回错误
        case Empty =>
          responseCallback(SyncGroupResult(Errors.UNKNOWN_MEMBER_ID))
        
        // PreparingRebalance下,返回错误,会触发JOIN_GROUP重发
        case PreparingRebalance =>
          responseCallback(SyncGroupResult(Errors.REBALANCE_IN_PROGRESS))

        // CompletingRebalance下,只对Leader响应
        // 先持久化分区分配信息,然后广播SYNC_GROUP响应(不止是Leader),并转换状态为Stable
        case CompletingRebalance =>
          group.get(memberId).awaitingSyncCallback = responseCallback
          if (group.isLeader(memberId)) {
            // ... 
            val missing = group.allMembers -- groupAssignment.keySet
            val assignment = groupAssignment ++ missing.map(_ -> Array.empty[Byte]).toMap
            // ...
            groupManager.storeGroup(group, assignment, (error: Errors) => {
              group.inLock {
                // ...
                if (group.is(CompletingRebalance) && generationId == group.generationId) {
                  // ... 
                  else {
                    setAndPropagateAssignment(group, assignment)
                    group.transitionTo(Stable)
                  }
                }
              }
            })
            groupCompletedRebalanceSensor.record()
          }
        
        // Stable下,直接返回SYNC_GROUP请求
        case Stable =>
          val memberMetadata = group.get(memberId)
          responseCallback(SyncGroupResult(group.protocolType, group.protocolName, memberMetadata.assignment, Errors.NONE))
          completeAndScheduleNextHeartbeatExpiration(group, group.get(memberId))

        // ...
      }
    }
  }
}

5. 协调者处理“离开组”请求

离开组有多种情况,如进程关闭、消费者不订阅topic等等。

正常情况下,对于消费者,它会:

  1. 关闭与协调者的心跳任务

  2. 向协调者发送“离开组”请求

  3. 重置相关信息

正常情况下,对于协调者,它会处理“离开组”请求,它会:

  1. 给消费组加锁(与“加入组”和“同步组”一样,因此它们互斥,不能同时执行)

  2. 关闭和消费者的心跳

  3. 移除消费者成员信息,更新消费组数据
  4. 根据消费组状态,影响分区重平衡

其中第4步相对重要,和消费组状态有关:

  • 原始状态为Dead | Empty:什么都不做
  • 原始状态为Stable | CompletingRelance:由于消费者离开,分区需要重新分配,因此需要触发一次分区再平衡,状态会变为PreparingRebalance
  • 原始状态为PreparingRebalance:由于消费者离开,DelayedJoin的延迟操作就有可能完成,因此需要尝试完成该延迟操作

代码位于removeMemberAndUpdateGroup

1
2
3
4
5
6
7
8
9
10
11
12
13
14
private def removeMemberAndUpdateGroup(group: GroupMetadata, member: MemberMetadata, reason: String): Unit = {
  group.maybeInvokeJoinCallback(member, JoinGroupResult(JoinGroupRequest.UNKNOWN_MEMBER_ID, Errors.UNKNOWN_MEMBER_ID))
  // 更新组信息
  group.remove(member.memberId)
  group.removeStaticMember(member.groupInstanceId)
  // 根据消费组状元,影响分区重平衡
  group.currentState match {
    case Dead | Empty => // Dead|Empty,什么都不做
    // Stable|CompletingRebalance,需要触发一次分区重平衡,状态变为PreparingRebalance
    case Stable | CompletingRebalance => maybePrepareRebalance(group, reason) 
    // PreparingRebalance,DelayedJoin操作可能可以完成,尝试完成它
    case PreparingRebalance => joinPurgatory.checkAndComplete(GroupKey(group.groupId))
  }
}

6. 再平衡超时与会话超时

消费者发送“加入组”请求后,协调者会创建一个延迟操作DelayedJoin。这个操作往往不能立即完成,因此需要一个时间限制。

DelayedJoin超时,即规定时间内没有完全收到所有成员的”加入组”请求,会强制让其完成,GroupCoordinator#onCompleteJoin还是会被调用:

  1. 对于超时的情形,找到所有没有收到“加入组”请求的消费者,关闭其心跳,并将其从组中移出

  2. 依旧给其他消费者返回“加入组”响应

  3. 完成本次的“延迟心跳”(DelayedHeartbeat),并调度下一次的心跳

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
def onCompleteJoin(group: GroupMetadata): Unit = {
  group.inLock {
    // 对于没收到JOIN_GROUP的消费者,可认为心跳失败
    // 直接关闭其心跳,并从组中移除
    group.notYetRejoinedMembers.filterNot(_.isStaticMember) foreach { failedMember =>
      removeHeartbeatForLeavingMember(group, failedMember)
      group.remove(failedMember.memberId)
      group.removeStaticMember(failedMember.groupInstanceId)
    }

    // ... 
    } else {
      group.initNextGeneration() // Generation + 1,状态变成CompletingRebalance
      // ... 
      } else {
        // 返回JOIN_GROUP响应给所有成员
        for (member <- group.allMemberMetadata) {
          val joinResult = JoinGroupResult(...)
          group.maybeInvokeJoinCallback(member, joinResult) // 触发回调以返回响应
          completeAndScheduleNextHeartbeatExpiration(group, member) // 完成“延迟心跳”,调度下一次心跳
          member.isNew = false
        }
      }
    }
  }
}

这里第1步是额外应对超时情形,而后2步和正常情况下一样。

注意这里的第3步。完成DelayedJoin后,可认为它是一个心跳,因此可以认为本次的DelayedHeartbeat已完成,可进行下一轮的调度:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
private def completeAndScheduleNextHeartbeatExpiration(group: GroupMetadata, member: MemberMetadata): Unit = {
  completeAndScheduleNextExpiration(group, member, member.sessionTimeoutMs)
}

private def completeAndScheduleNextExpiration(group: GroupMetadata, member: MemberMetadata, timeoutMs: Long): Unit = {
  val memberKey = MemberKey(member.groupId, member.memberId)
  // 设置成员收到心跳
  member.heartbeatSatisfied = true
  // 完成本次的心跳任务
  heartbeatPurgatory.checkAndComplete(memberKey)
  // 创建新一轮延迟心跳
  // 设置本轮心跳状态为false(因为本轮还没发起心跳)
  member.heartbeatSatisfied = false
  // 创建新的延迟心跳操作
  val delayedHeartbeat = new DelayedHeartbeat(this, group, member.memberId, isPending = false, timeoutMs)
  // 尝试完成延迟心跳,这里尝试会失败,会直接放入到缓存进行监控
  heartbeatPurgatory.tryCompleteElseWatch(delayedHeartbeat, Seq(memberKey))
}

当消费者收到JOIN_GROUP响应后,要尽快发送SYNC_GROUP请求,否则协调者会认为消费者挂掉。处理SYNC_GROUP时,当组状态为StableCompletingRebalance,也会重新调度DelayedHeartbeat操作,其中前者为单个消费者,后者为所有消费者。

关于延迟心跳(DelayedHearbeat),更多的可看下节。

7. 延迟心跳

7.1. 延迟心跳事件

和其他延迟操作一样,DelayedHeartbeat操作也需要外部事件触发来完成,这个外部事件即:服务端和客户端有通信。

和其他延迟操作一样,DelayedHeartbeat也有3个重要方法:

  • tryComplete:若消费者存活,则操作可以完成
  • onExpiration:超时后的回调
  • onComplete:完成操作后的回调,该回调为空

对于尝试完成DelayedHeartbeat,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
def tryCompleteHeartbeat(group: GroupMetadata,
                         memberId: String,
                         isPending: Boolean,
                         forceComplete: () => Boolean): Boolean = {
  group.inLock {
    if (group.is(Dead)) {
      forceComplete() // 组已经Dead,完成即可
    } else if (isPending) {
      // isPending = true时,成员没有加入组中,说明成员正在加入组,肯定存活,直接完成即可
      if (group.has(memberId)) {
        forceComplete()
      } else false
    } else if (shouldCompleteNonPendingHeartbeat(group, memberId)) {
      // isPending = false时,成员已经完成入组(重平衡)
      // 则需要成员已经收到了心跳,或者它即将离开,则直接完成
      forceComplete()
    } else false
  }
}

def shouldCompleteNonPendingHeartbeat(group: GroupMetadata, memberId: String): Boolean = {
  if (group.has(memberId)) {
    val member = group.get(memberId)
    member.hasSatisfiedHeartbeat || member.isLeaving
  } else {
    info(s"Member id $memberId was not found in ${group.groupId} during heartbeat completion check")
    true
  }
}

而对于心跳超时,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
def onExpireHeartbeat(group: GroupMetadata, memberId: String, isPending: Boolean): Unit = {
  group.inLock {
    if (group.is(Dead)) {
      // ... 
    } else if (isPending) {
      // ...
      // 若成员没有入组,超时则直接移除该成员到组外
      removePendingMemberAndUpdateGroup(group, memberId)
    } else if (!group.has(memberId)) {
      // ...
    } else {
      val member = group.get(memberId)
      if (!member.hasSatisfiedHeartbeat) {
        // 若成员入组,且没收到心跳,超时则将其移出组外
        removeMemberAndUpdateGroup(group, member, s"removing member ${member.memberId} on heartbeat expiration")
      }
    }
  }
}

7.2. 处理消费者的“心跳”请求

当处理、响应“加入组”,“同步组”,“离开组”的请求时,协调会触发延迟心跳的完成操作,调用第6节所述的completeAndScheduleNextHeartbeatExpiration方法:

  • 尝试完成DelayedHeartbeat
  • 若正常,则什么都不做
  • 若超时,则会将成员移出组外,并可能触发再平衡

客户端也会向服务端发出“心跳”请求,协调者关于这部分处理和“加入组”、“同步组”类似,也会调用completeAndScheduleNextHeartbeatExpiration方法完成延迟心跳任务,并调度下一轮的延迟心跳:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
def handleHeartbeat(groupId: String,
                    memberId: String,
                    groupInstanceId: Option[String],
                    generationId: Int,
                    responseCallback: Errors => Unit): Unit = {
  // ...
  groupManager.getGroup(groupId) match {
    case None =>
      responseCallback(Errors.UNKNOWN_MEMBER_ID)

    case Some(group) => group.inLock {
      // ...
      else {
        group.currentState match {
          // 若为Empty,返回错我
          // 因为消费者是入组后才启动心跳线程的
          case Empty =>
            responseCallback(Errors.UNKNOWN_MEMBER_ID)
          
          // 若为CompletingRebalance,返回错误,会触发JOIN_GROUP重发
          // 因为这个状态下,消费者还是不能启动心跳的
          // 或者是消费者以为消费组稳定,但是服务端不认为,需要让消费者感知到这个不稳定,从而触发JOIN_GROUP重发
          case CompletingRebalance =>
              responseCallback(Errors.REBALANCE_IN_PROGRESS)
          
          // 若为PreparingRebalance,状态不变,完成延迟心跳任务并调度下一轮心跳
          // 但依旧返回错误,因为这个状态下,消费者依旧不能启动心跳
          // 或者是消费者以为消费组稳定,但是服务端不认为,需要让消费者感知到这个不稳定,从而触发JOIN_GROUP重发
          case PreparingRebalance =>
              val member = group.get(memberId)
              completeAndScheduleNextHeartbeatExpiration(group, member)
              responseCallback(Errors.REBALANCE_IN_PROGRESS)

          // 若为Stable,则就是正常的心跳,完成延迟心跳任务并调度下一轮心跳即可
          case Stable =>
              val member = group.get(memberId)
              completeAndScheduleNextHeartbeatExpiration(group, member)
              responseCallback(Errors.NONE)
          
          // 若为Dead,抛出异常
          case Dead =>
            throw new IllegalStateException(s"Reached unexpected condition for Dead group $groupId")
        }
      }
    }
  }
}

注意CompletingRebalancePreparingRebalance的处理,会触发消费者的JOIN_GROUP重发,这是因为:

  • 该状态下,消费者是不能启动心跳的
  • 或是消费者以为消费组稳定,但是服务端不认为,需要让消费者感知到这个不稳定,从而返回错误,触发JOIN_GROUP的重发

7.3. 心跳超时

注意DelayedHeartbeat也是DelayedOperation,它会被放入Timer中,当超时的时候,被Timer触发执行,强制完成任务并触发超时回调。(见前文4.2.b)

此时服务端就感知到了心跳超时,并将对应消费者移出到组外,并触发分区重平衡。

8. 总结

本文和前文主要讲了协调者是如何处理消费者入组重平衡的。

消费者入组需要完成:

  • “加入组”
  • “同步组”

而协调者的处理至关重要,这里关键的在于:

  • 延迟操作
  • 消费组状态机的转换

而协调者通过监控消费者的心跳来确认消费者是否存活:

  • 若超时,则会将消费者从组中移除
  • 在“加入组”和“同步组”时,也会额外视作心跳处理,以监控消费者的存活状态