Kafka技术内幕-协调者(1)

Posted by keys961 on June 26, 2020

1. 概览

之前提及过,消费者拉取消息,需要和协调者进行交互,包括:

  1. 发现并连接协调者

  2. 申请入组,从协调者中获取分配到的分区

  3. 和协调者心跳,协调者判断是否需要触发分区重平衡

  4. 向协调者提交消费的分区偏移量

本文主要分析:

  • 协调者如何处理消费者入组
  • 心跳监控和对应处理

2. 消费者入组

消费者入组的入口在ensureActiveGroup方法,它保证消费者连接到协调者并分配到分区。

2.1. 发现协调者

消费者入组,必须先发现协调者,入口方法为ensureCoordinatorReady,最后调用方法lookupCoordinator

消费者会:

  • 发送FIND_COORDINATOR请求给某个服务端节点(选择的节点,正在传输的请求个数最少)
  • 收到请求后,设置协调者字段,并立刻尝试和协调者连接

这部分代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
protected synchronized RequestFuture<Void> lookupCoordinator() {
    if (findCoordinatorFuture == null) {
        // 选择一个Broker节点
        Node node = this.client.leastLoadedNode();
        if (node == null) {
            return RequestFuture.noBrokersAvailable();
        } else {
            // 发起FIND_COORDINATOR请求
            findCoordinatorFuture = sendFindCoordinatorRequest(node);           
            findCoordinatorFuture.addListener(new RequestFutureListener<Void>() {
                @Override
                public void onSuccess(Void value) {}

                @Override
                public void onFailure(RuntimeException e) {
                    findCoordinatorException = e;
                }
            });
        }
    }
    return findCoordinatorFuture;
}

private RequestFuture<Void> sendFindCoordinatorRequest(Node node) {
    // 向某个节点发送FIND_COORDINATOR请求
    FindCoordinatorRequest.Builder requestBuilder =
            new FindCoordinatorRequest.Builder(
                    new FindCoordinatorRequestData()
                        .setKeyType(CoordinatorType.GROUP.id())
                        .setKey(this.rebalanceConfig.groupId));
    return client.send(node, requestBuilder)
            .compose(new FindCoordinatorResponseHandler());
}

private class FindCoordinatorResponseHandler extends RequestFutureAdapter<ClientResponse, Void> {
    @Override
    public void onSuccess(ClientResponse resp, RequestFuture<Void> future) {
        // 处理FIND_COORDINATOR响应
        clearFindCoordinatorFuture();
        FindCoordinatorResponse findCoordinatorResponse = (FindCoordinatorResponse) resp.responseBody();
        Errors error = findCoordinatorResponse.error();
        if (error == Errors.NONE) {
            // 响应成功
            synchronized (AbstractCoordinator.this) {
                // 设置协调者字段
                int coordinatorConnectionId = Integer.MAX_VALUE - findCoordinatorResponse.data().nodeId();
                AbstractCoordinator.this.coordinator = new Node(
                        coordinatorConnectionId,
                        findCoordinatorResponse.data().host(),
                        findCoordinatorResponse.data().port());
                // 立刻尝试连接协调者
                client.tryConnect(coordinator);
                // 重置心跳
                heartbeat.resetSessionTimeout();
            }
            future.complete(null);
        } else if (error == Errors.GROUP_AUTHORIZATION_FAILED) {
            future.raise(GroupAuthorizationException.forGroupId(rebalanceConfig.groupId));
        } else {
            future.raise(error);
        }
    }

    @Override
    public void onFailure(RuntimeException e, RequestFuture<Void> future) {
        clearFindCoordinatorFuture();
        super.onFailure(e, future);
    }
}

2.2. 加入组和同步组

a) 入组大致步骤

入组大体步骤如下:

  • 消费者发送“加入组”请求,捎带订阅信息给协调者
  • 协调者收集所有消费者的请求
  • 协调者选举从消费者中选举出“主消费者”,将订阅信息返回给它,其他消息也会收到响应,但并不是分区分配结果
    • “主消费者”是第一个发送“加入组”请求的消费者
  • 消费者收到响应后,会发送“同步组”请求给协调者
    • “主消费者”先执行分区分配,将结果塞入“同步组”请求,返回给协调者
    • 其他消费者会立刻发送“同步组”请求
  • 协调者接收“主消费者”的分区分配结果,然后再处理“同步组”请求,将分配结果返回给消费者
  • 消费者接收到分配结果后,设置分区分配状态,入组成功

b) 加入组

消费者发现协调者后,需要发起加入组请求,入口方法为joinGroupIfNeeded,最后调用initiateJoinGroup,里面关键的方法是sendJoinGroupRequest

实际上,sendJoinGroupRequest返回是“加入组”+“同步组”后的结果,内容是分区分配信息。

它实际上使用了chain,保证了先“加入组”,后“同步组”。

消费者会:

  1. 向协调者发送JOIN_GROUP请求,请求包含了消费者、消费组以及订阅的信息

  2. 处理响应,根据消费者是否是Leader分情况处理(见a)小节)

这部分在前文中部分说明过,这里我们着重看之前没说明过的第2步。

“加入组”响应处理在JoinGroupResponseHandler中,根据响应判断本消费者是否是Leader,然后分情况处理(见a)小节)。

代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
private class JoinGroupResponseHandler extends CoordinatorResponseHandler<JoinGroupResponse, ByteBuffer> {
    @Override
    public void handle(JoinGroupResponse joinResponse, RequestFuture<ByteBuffer> future) {
        // 处理“加入组”(JOIN_GROUP)响应
        Errors error = joinResponse.error();
        if (error == Errors.NONE) {
            if (isProtocolTypeInconsistent(joinResponse.data().protocolType())) {
                // 协议错误,触发异常
                future.raise(Errors.INCONSISTENT_GROUP_PROTOCOL);
            } else {
                // ... 
                synchronized (AbstractCoordinator.this) {
                    if (state != MemberState.REBALANCING) {
                        // 实质上消费者已离开,触发异常
                        future.raise(new UnjoinedGroupException());
                    } else {
                        AbstractCoordinator.this.generation = new Generation(
                            joinResponse.data().generationId(),
                            joinResponse.data().memberId(), joinResponse.data().protocolName());
                        if (joinResponse.isLeader()) {
                            // 响应标记为Leader,说明本消费者是Leader
                            // 先执行分区分配,然后将结果塞入“同步组”请求发送给协调者
                            onJoinLeader(joinResponse).chain(future); // 这里用chain,将结果处理转到调用链下游
                        } else {
                            // 本消费者是Follower
                            // 直接发送“同步组”请求
                            onJoinFollower().chain(future);
                        }
                    }
                }
            }
        } else if // ... 其他错误处理
    }
}

c) 同步组

易得知,“同步组”发生在接受到“加入组”响应之后。

对于Follower,直接发送”同步组”(SYNC_GROUP)请求:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
private RequestFuture<ByteBuffer> onJoinFollower() {
    // 直接发送SYNC_GROUP请求
    SyncGroupRequest.Builder requestBuilder =
            new SyncGroupRequest.Builder(
                    new SyncGroupRequestData()
                            .setGroupId(rebalanceConfig.groupId)
                            .setMemberId(generation.memberId)
                            .setProtocolType(protocolType())
                            .setProtocolName(generation.protocolName)
                            .setGroupInstanceId(this.rebalanceConfig.groupInstanceId.orElse(null))
                            .setGenerationId(generation.generationId)
                            .setAssignments(Collections.emptyList()) // Follower请求总不捎带分区分配结果
            );
    return sendSyncGroupRequest(requestBuilder);
}

而对于Leader,先执行分区分配算法,计算出分配结果,然后再发送“同步组”请求,请求捎带分区分配结果:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
private RequestFuture<ByteBuffer> onJoinLeader(JoinGroupResponse joinResponse) {
    try {
        // 1. 执行分区分配算法
        Map<String, ByteBuffer> groupAssignment = performAssignment(joinResponse.data().leader(), joinResponse.data().protocolName(),
                joinResponse.data().members());
        List<SyncGroupRequestData.SyncGroupRequestAssignment> groupAssignmentList = new ArrayList<>();
        for (Map.Entry<String, ByteBuffer> assignment : groupAssignment.entrySet()) {
            groupAssignmentList.add(new SyncGroupRequestData.SyncGroupRequestAssignment()
                    .setMemberId(assignment.getKey())
                    .setAssignment(Utils.toArray(assignment.getValue()))
            );
        }
        // 2. 发送SYNC_GROUP请求,捎带分区分配结果
        SyncGroupRequest.Builder requestBuilder =
                new SyncGroupRequest.Builder(
                        new SyncGroupRequestData()
                                .setGroupId(rebalanceConfig.groupId)
                                .setMemberId(generation.memberId)
                                .setProtocolType(protocolType())
                                .setProtocolName(generation.protocolName)
                                .setGroupInstanceId(this.rebalanceConfig.groupInstanceId.orElse(null))
                                .setGenerationId(generation.generationId)
                                .setAssignments(groupAssignmentList) // 捎带分区分配结果
                );
        return sendSyncGroupRequest(requestBuilder);
    } catch (RuntimeException e) {
        return RequestFuture.failure(e);
    }
}

“同步组”请求的响应,包含了分区分配的信息。这部分主要由SyncGroupResponseHandler处理,它将分区分配的数据传给了对应的Future,外部从而可以获取分区分配的信息:

ensureActiveGroup方法,以及前文2.2.节,外部是通过同步阻塞的方式得到这个Future结果,即入组(“加入组”+“同步组”)结果,执行下面的步骤时,入组一定已经完成了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
private class SyncGroupResponseHandler extends CoordinatorResponseHandler<SyncGroupResponse, ByteBuffer> {
    @Override
    public void handle(SyncGroupResponse syncResponse,
                       RequestFuture<ByteBuffer> future) {
        // 处理“同步组”(SYNC_GROUP)请求
        Errors error = syncResponse.error();
        if (error == Errors.NONE) {
            if (isProtocolTypeInconsistent(syncResponse.data.protocolType())) {
                future.raise(Errors.INCONSISTENT_GROUP_PROTOCOL);
            } else if (isProtocolNameInconsistent(syncResponse.data.protocolName())) {
                future.raise(Errors.INCONSISTENT_GROUP_PROTOCOL);
            } else {
                // ...
                // 读取到响应的分区分配信息,传给Future,外部可以调用
                future.complete(ByteBuffer.wrap(syncResponse.data.assignment()));
            }
        } else {
            // 错误,触发rejoin
            requestRejoin();
            // 传递错误到Future
        }
    }
}

d) 入组总结

入组流程总结为:

  • 多个消费者向协调者发送JOIN_GROUP
  • 协调者选出第一个消费者,作为Leader
  • 收集到所有消费者的请求后,返回JOIN_GROUP响应
    • 对于Leader:包含组成员消息
  • 消费者返回SYNC_GROUP请求
    • 对于Leader:捎带分区分配结果,Leader执行分区分配运算
  • 协调者收到Leader的SYNC_GROUP请求,返回分区分配信息(SYNC_GROUP响应)给所有消费者

2.3. 分区分配

分区分配由主消费者执行,方法位于performAssignment,而具体的分区算法接口是ConsumerPartitionAssignor,它有下面几个实现,默认是RangeAssignor

image.png

这些算法具体就不说明了,可以参考: https://blog.csdn.net/bingshiwuyu/article/details/106763422?utm_medium=distribute.pc_relevant.none-task-blog-baidujs-2

2.4. 其他

a) 入组前的准备

入组前,除了需要询问知道协调者之外,还需要一些准备,包括:

  • 同步提交偏移量,以保存消费进度
  • 收回分配的分区,并触发分区收回的回调
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
protected void onJoinPrepare(int generation, String memberId) {
    // 同步提交分区,保存消费进度,若开启了自动提交,则自动提交会先关闭
    maybeAutoCommitOffsetsSync(time.timer(rebalanceConfig.rebalanceTimeoutMs));
    Exception exception = null;
    final Set<TopicPartition> revokedPartitions;
    // 收回分区并触发回调
    if (generation == Generation.NO_GENERATION.generationId &&
        memberId.equals(Generation.NO_GENERATION.memberId)) {
        revokedPartitions = new HashSet<>(subscriptions.assignedPartitions());
        if (!revokedPartitions.isEmpty()) {
            exception = invokePartitionsLost(revokedPartitions); // 触发回调
            subscriptions.assignFromSubscribed(Collections.emptySet()); // 收回分区
        }
    } else {
        switch (protocol) {
          case EAGER: // EAGER: 先收回所有分区
                revokedPartitions = new HashSet<>(subscriptions.assignedPartitions());
                exception = invokePartitionsRevoked(revokedPartitions); // 触发回调
                subscriptions.assignFromSubscribed(Collections.emptySet()); // 收回分区
                break;
          case COOPERATIVE: // COOPERATIVE: 先收回不再可能分配到的分区
                Set<TopicPartition> ownedPartitions = new HashSet<>(subscriptions.assignedPartitions());
                revokedPartitions = ownedPartitions.stream()
                    .filter(tp -> !subscriptions.subscription().contains(tp.topic()))
                    .collect(Collectors.toSet());
                if (!revokedPartitions.isEmpty()) {
                    exception = invokePartitionsRevoked(revokedPartitions); //触发回调
                    ownedPartitions.removeAll(revokedPartitions); 
                    subscriptions.assignFromSubscribed(ownedPartitions); // 收回分区
                }
                break;
        }
    }
    isLeader = false; // 重置Leader状态
    subscriptions.resetGroupSubscription(); // 重置消费者订阅状态
    if (exception != null) {
        throw new KafkaException("User rebalance callback throws an error", exception);
    }
}

b) 入组后处理

入组后,消费者获得到分配给它的分区,会做下面一些事情:

  • 反序列化分区分配结果
  • 更新订阅信息和分区分配信息
  • 更新Assignor内部状态
  • 打开自动提交(若配置开启)
  • 为分配的分区触发回调
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
protected void onJoinComplete(int generation,
                              String memberId,
                              String assignmentStrategy,
                              ByteBuffer assignmentBuffer) {
    // ...
    ConsumerPartitionAssignor assignor = lookupAssignor(assignmentStrategy);
    // ...
    // 反序列化分区分配结果
    Assignment assignment = ConsumerProtocol.deserializeAssignment(assignmentBuffer);
    Set<TopicPartition> assignedPartitions = new HashSet<>(assignment.partitions());

    if (!subscriptions.checkAssignmentMatchedSubscription(assignedPartitions)) {
        // 若分配结果和本地订阅不匹配,则需要重新入组
        requestRejoin();
        return;
    }

    final AtomicReference<Exception> firstException = new AtomicReference<>(null);
    Set<TopicPartition> addedPartitions = new HashSet<>(assignedPartitions);
    addedPartitions.removeAll(ownedPartitions);

    if (protocol == RebalanceProtocol.COOPERATIVE) {
        // 若协议是COOPERATIVE,还需要收回之前没收回的分区,触发收回的回调,并重新入组
        Set<TopicPartition> revokedPartitions = new HashSet<>(ownedPartitions);
        revokedPartitions.removeAll(assignedPartitions);
        if (!revokedPartitions.isEmpty()) {
            firstException.compareAndSet(null, invokePartitionsRevoked(revokedPartitions));
            requestRejoin(); 
        }
    }
    // 更新订阅信息
    maybeUpdateJoinedSubscription(assignedPartitions);
    try {
        // 更新Assignor内部状态,默认下是空实现
        assignor.onAssignment(assignment, groupMetadata);
    } catch (Exception e) {
        firstException.compareAndSet(null, e);
    }
    // 重新打开自动提交
    if (autoCommitEnabled)
        this.nextAutoCommitTimer.updateAndReset(autoCommitIntervalMs);
    // 更新分配信息
    subscriptions.assignFromSubscribed(assignedPartitions);
    // 调用分配分区的回调函数
    firstException.compareAndSet(null, invokePartitionsAssigned(addedPartitions));
    if (firstException.get() != null)
        throw new KafkaException("User rebalance callback throws an error", firstException.get());
}

c) 分区收回/分配的回调

从上面可以看到,分区被收回/分配时,都会触发回调,这个回调定义在接口ConsumerRebalanceListener中:

1
2
3
4
5
6
public interface ConsumerRebalanceListener {
    // 收回分区后触发
    void onPartitionsRevoked(Collection<TopicPartition> partitions);
    // 分配分区后触发
    void onPartitionsAssigned(Collection<TopicPartition> partitions);
}

上面b)的invokePartitionsRevokedinvokePartitionsAssigned都调用了ConsumerRebalanceListener的方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
private Exception invokePartitionsAssigned(final Set<TopicPartition> assignedPartitions) {
    // ...
    ConsumerRebalanceListener listener = subscriptions.rebalanceListener();
    try {
        // ...
        listener.onPartitionsAssigned(assignedPartitions); // 分配分区的回调
        // ...
    } catch (WakeupException | InterruptException e) {
        // ...
    } catch (Exception e) {
        // ...
    }
    return null;
}

private Exception invokePartitionsRevoked(final Set<TopicPartition> revokedPartitions) {
    // ...
    ConsumerRebalanceListener listener = subscriptions.rebalanceListener();
    try {
        // ...
        listener.onPartitionsRevoked(revokedPartitions); // 收回分区的回调
        // ...
    } catch (WakeupException | InterruptException e) {
        // ...
    } catch (Exception e) {
        // ...
    }
    return null;
}

3. 协调者处理请求

第2节说明了消费者端的入组,本节说明服务端协调者的处理。

3.1. REVIEW: 服务端返回响应

由于“加入组”需要协调者等待所有的消费者的请求,所以在客户端看来是“阻塞”的。但是服务端为了提高性能,不能采用阻塞的方式,因此采用的做法是:

  • 服务端轮询到请求后,会交给后台线程KafkaRequestHandler处理(见本文2.3.节
  • 处理每个请求时,定义“发送响应的回调”(sendResponseCallback
  • 将请求连同回调传给具体负责请求的协调者(如GroupCoordinator),让协调者处理请求
    • 协调者有很多:如消费组GroupCoordinator,副本管理ReplicaManager,控制器相关KafkaController
  • 当协调者认为请求处理完成后,调用回调方法,
    • 回调会将响应压入响应队列,处理器轮询会将响应返回给客户端(依旧见本文2.3.节

3.2. 消费者及消费者组的元数据

a) 消费者元数据

服务端使用MemberMetadata存储每个消费者的元数据:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
private[group] class MemberMetadata(var memberId: String, // 成员ID
                                    val groupId: String, // 消费组ID
                                    val groupInstanceId: Option[String],
                                    val clientId: String,
                                    val clientHost: String,
                                    val rebalanceTimeoutMs: Int,
                                    val sessionTimeoutMs: Int,
                                    val protocolType: String,
                                    var supportedProtocols: List[(String, Array[Byte])]) {

  var assignment: Array[Byte] = Array.empty[Byte] // 消费者分区结果
  var awaitingJoinCallback: JoinGroupResult => Unit = null // JOIN_GROUP回调
  var awaitingSyncCallback: SyncGroupResult => Unit = null // SYNC_GROUP回调
  var isLeaving: Boolean = false
  var isNew: Boolean = false
  val isStaticMember: Boolean = groupInstanceId.isDefined
  var heartbeatSatisfied: Boolean = false
}

b) 成员组元数据管理

这部分在GroupMetadata保存,维护了某个成员组的全部成员以及组元数据,关键字段在注释上标注了:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
private[group] class GroupMetadata(val groupId: String, // 组ID
                                   initialState: GroupState, 
                                   time: Time) extends Logging {
  type JoinCallback = JoinGroupResult => Unit
  private[group] val lock = new ReentrantLock
  private var state: GroupState = initialState // 组状态: Empty(初始)/Dead/Stable/PreparingRebalance/CompletingRebalance
  var currentStateTimestamp: Option[Long] = Some(time.milliseconds())
  var protocolType: Option[String] = None // 协议类型
  var protocolName: Option[String] = None // 协议名称
  var generationId = 0 // Generation号
  private var leaderId: Option[String] = None // 组Leader ID

  private val members = new mutable.HashMap[String, MemberMetadata] // 组成员
  private val staticMembers = new mutable.HashMap[String, String] 
  private val pendingMembers = new mutable.HashSet[String] 
  private var numMembersAwaitingJoin = 0
  private val supportedProtocols = new mutable.HashMap[String, Integer]().withDefaultValue(0)
  private val offsets = new mutable.HashMap[TopicPartition, CommitRecordMetadataAndOffset] // 分区提交进度
  private val pendingOffsetCommits = new mutable.HashMap[TopicPartition, OffsetAndMetadata]
  private val pendingTransactionalOffsetCommits = new mutable.HashMap[Long, mutable.Map[TopicPartition, CommitRecordMetadataAndOffset]]()
  private var receivedTransactionalOffsetCommits = false
  private var receivedConsumerOffsetCommits = false
  private var subscribedTopics: Option[Set[String]] = None // 已订阅的topic集合
  var newMemberAdded: Boolean = false
}

3.3. 请求前检查

对于“加入组”和“同步组”请求,需要做下面检查:

  • 协调者不可用
  • 消费者组编号无效
  • 连错协调者
  • 协调者正在加载(如正在迁移数据)
  • 消费者会话超时
  • 协调者没有消费组,但消费者的成员编号已知
  • 协调者有消费组,消费者的成员编号已知,但不在消费组中

这些都会返回特定的错误码给消费者,消费者会根据错误码进行操作(如重新选择协调者并重试入组)。

这些检查通过后:

  • “加入组”请求最终会进入doJoinGroup/doUnknownJoinGroup,进行进一步检查,只有在下面条件(之一)满足时才允许加入
    • 消费组为空,成员编号未知(第一个消费者第一次加入组时)
    • 消费组不为空,成员编号未知(消费组第一次加入组)
    • 消费组不为空,成员编号已知,且成员编号在组内
  • “同步组”请求最终会进入doSyncGroup,进行进一步检查,只有在下面条件满足时才允许同步
    • 请求中保证消费组不为空

通过检查后,就可以执行对应的操作了,这里涉及到复杂的状态机转换操作,因此放到后面讲。

3.4. 通过回调返回响应

a) “加入组”响应

返回响应的回调函数定义如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
// 返回响应时的回调
def sendResponseCallback(joinResult: JoinGroupResult): Unit = {
  // 响应创建函数
  def createResponse(requestThrottleMs: Int): AbstractResponse = {
    val protocolName = if (request.context.apiVersion() >= 7)
      joinResult.protocolName.orNull
    else
      joinResult.protocolName.getOrElse(GroupCoordinator.NoProtocol)

    val responseBody = new JoinGroupResponse(
      new JoinGroupResponseData()
        .setThrottleTimeMs(requestThrottleMs)
        .setErrorCode(joinResult.error.code) // 错误码
        .setGenerationId(joinResult.generationId) // generation号
        .setProtocolType(joinResult.protocolType.orNull) // 协议类型
        .setProtocolName(protocolName) // 协议名称
        .setLeader(joinResult.leaderId) // Leader ID(主消费者)
        .setMemberId(joinResult.memberId) // 当前消费者ID
        .setMembers(joinResult.members.asJava) // 已知消费组的成员
    )
    responseBody
  }
  // 返回响应
  sendResponseMaybeThrottle(request, createResponse)
}

当完成请求处理并返回响应时,会调用这个回调,并设置该回调字段为null。这部分代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
def onCompleteJoin(group: GroupMetadata): Unit = {
  group.inLock {
    // ... 

    if /* ... */ {
      // ...
    } else {
      group.initNextGeneration() // 增加generation
      if (group.is(Empty)) {
        // ...
      } else {
        // 一次性向每个消费者返回响应
        for (member <- group.allMemberMetadata) {
          val joinResult = JoinGroupResult(
            members = if (group.isLeader(member.memberId)) {
              group.currentMemberMetadata
            } else {
              List.empty
            },
            memberId = member.memberId,
            generationId = group.generationId,
            protocolType = group.protocolType,
            protocolName = group.protocolName,
            leaderId = group.leaderOrNull, // 响应中包含Leader ID
            error = Errors.NONE) // 默认错误吗为NONE
          group.maybeInvokeJoinCallback(member, joinResult) // 调用回调以返回响应
          completeAndScheduleNextHeartbeatExpiration(group, member)
          member.isNew = false
        }
      }
    }
  }
}

注意,返回响应是一次性返回给所有消费者的,这里涉及到“延迟操作”,它和消费组的状态机有关系,这部分放到下面说明。

b) “同步组”响应

“同步组”响应的回调定义如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
// 返回响应时的回调
def sendResponseCallback(syncGroupResult: SyncGroupResult): Unit = {
  // 直接将SYNC_GROUP结果返回给客户端
  sendResponseMaybeThrottle(request, requestThrottleMs =>
    new SyncGroupResponse(
      new SyncGroupResponseData()
        .setErrorCode(syncGroupResult.error.code)
        .setProtocolType(syncGroupResult.protocolType.orNull)
        .setProtocolName(syncGroupResult.protocolName.orNull)
        .setAssignment(syncGroupResult.memberAssignment) // 分区分配信息
        .setThrottleTimeMs(requestThrottleMs)
    ))
}

返回响应的函数在doSyncGroup中,这里截取最关键的一段:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
private def doSyncGroup(group: GroupMetadata,
                        generationId: Int,
                        memberId: String,
                        protocolType: Option[String],
                        protocolName: Option[String],
                        groupInstanceId: Option[String],
                        groupAssignment: Map[String, Array[Byte]],
                        responseCallback: SyncCallback): Unit = {
  group.inLock {
      // ...
      group.currentState match {
        case Empty =>
          responseCallback(SyncGroupResult(Errors.UNKNOWN_MEMBER_ID))
        case PreparingRebalance =>
          responseCallback(SyncGroupResult(Errors.REBALANCE_IN_PROGRESS))
        case CompletingRebalance =>
          group.get(memberId).awaitingSyncCallback = responseCallback
          if (group.isLeader(memberId)) {
            // 对于主消费者而言,此时分配结果协调者已经知道
            val missing = group.allMembers -- groupAssignment.keySet
            val assignment = groupAssignment ++ missing.map(_ -> Array.empty[Byte]).toMap
            // ...
            groupManager.storeGroup(group, assignment, (error: Errors) => {
              group.inLock {
                if (group.is(CompletingRebalance) && generationId == group.generationId) {
                  if (error != Errors.NONE) {
                    // ... 
                  } else {
                    // 传播消费组的分配结果,并持久化到内部topic中
                    setAndPropagateAssignment(group, assignment)
                    // 将状态转成Stable
                    group.transitionTo(Stable)
                  }
                }
              }
            })
            // ...
          }
          // 而对于其他消费者,不会立刻返回响应,而是:
          // 1. 由上面的Leader消费者的请求触发,传播给其他消费者,此时消费组状态为Stable
          // 2. 由下面Stable状态的流程触发,直接返回分配信息

        case Stable =>
          // 若稳定,直接返回分配结果
          val memberMetadata = group.get(memberId)
          responseCallback(SyncGroupResult(group.protocolType, group.protocolName, memberMetadata.assignment, Errors.NONE))
          completeAndScheduleNextHeartbeatExpiration(group, group.get(memberId))
        case Dead =>
          throw new IllegalStateException(s"Reached unexpected condition for Dead group ${group.groupId}")
      }
    }
  }
}

上面关键的在setAndPropagateAssignment函数,它设置并传播每个成员的分区分配结果:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
private def setAndPropagateAssignment(group: GroupMetadata, assignment: Map[String, Array[Byte]]): Unit = {
  assert(group.is(CompletingRebalance))
  // 设置每个成员的分配结果
  group.allMemberMetadata.foreach(member => member.assignment = assignment(member.memberId))
  // 返回分配结果给每个成员
  propagateAssignment(group, Errors.NONE) 
}
private def propagateAssignment(group: GroupMetadata, error: Errors): Unit = {
  val (protocolType, protocolName) = if (error == Errors.NONE)
    (group.protocolType, group.protocolName)
  else
    (None, None)
  for (member <- group.allMemberMetadata) {
    // 对于每个成员,调用回调函数,将结果返回给每个成员
    if (group.maybeInvokeSyncCallback(member, SyncGroupResult(protocolType, protocolName, member.assignment, error))) {
      completeAndScheduleNextHeartbeatExpiration(group, member)
    }
  }
}

c) 协调者保存消费组任务

在返回“同步组”响应前,协调者会把分区分配结果持久化到topic __consumer_offsets中。当协调节点故障时,新的协调者可从该topic读取数据并恢复。

保存状态到内部topic,调用了GroupMetadataManager#storeGroup

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
def storeGroup(group: GroupMetadata,
               groupAssignment: Map[String, Array[Byte]],
               responseCallback: Errors => Unit): Unit = {
  getMagic(partitionFor(group.groupId)) match {
    case Some(magicValue) =>
      // ... 
      val groupMetadataRecords = // ... 需要保存到记录
      // 将记录保存到内部topic后,调用该回调将其存入缓存中
      def putCacheCallback(responseStatus: Map[TopicPartition, PartitionResponse]): Unit = {
        // ...
        val status = responseStatus(groupMetadataPartition)
        val responseError = if (status.error == Errors.NONE) {
          Errors.NONE
        } else {
          // ...
          }
        }
        // 调用回调responseCallback
        responseCallback(responseError)
      }
      // 将分配信息记录追加到内部topic中,追加完后,执行回调putCacheCallback
      appendForGroup(group, groupMetadataRecords, putCacheCallback)
    case None =>
      responseCallback(Errors.NOT_COORDINATOR)
      None
  }
}

private def appendForGroup(group: GroupMetadata,
                           records: Map[TopicPartition, MemoryRecords],
                           callback: Map[TopicPartition, PartitionResponse] => Unit): Unit = {
  // 将分配信息记录追加到内部topic中
  replicaManager.appendRecords(
    timeout = config.offsetCommitTimeoutMs.toLong,
    requiredAcks = config.offsetCommitRequiredAcks,
    internalTopicsAllowed = true,
    origin = AppendOrigin.Coordinator,
    entriesPerPartition = records,
    delayedProduceLock = Some(group.lock),
    responseCallback = callback)  // 执行完成后,运行回调,即上面的putCacheCallback
}

当然分区分配结果也会缓存起来(不然一直读盘会很慢),保存在groupMetadataCache中(位于GroupMetadataManager)。它保存了它管理的所有消费组的元数据(注意,一个Broker/协调者可管理多个消费组):

1
2
3
4
5
6
7
8
9
class GroupMetadataManager(brokerId: Int,
                           interBrokerProtocolVersion: ApiVersion,
                           config: OffsetConfig,
                           replicaManager: ReplicaManager,
                           zkClient: KafkaZkClient,
                           time: Time,
                           metrics: Metrics) extends Logging with KafkaMetricsGroup {
  private val groupMetadataCache = new Pool[String, GroupMetadata] // 消费组元数据
}

4. 延迟“加入组”

上面入组的操作省略了一些细节,例如:

  • 如何判断收集完所有消费者的“加入组”请求
  • 如何延迟响应“加入组”和“同步组”的响应:
    • “加入组”:等到所有消费者请求完后,才响应
    • “同步组”:等到主消费者返回分区分配结果后,才响应

回答这些问题,需要引入这些概念:

  • 延迟操作:通过它实现延迟的加入组(对应的是DelayJoin
  • 消费组状态转换:5种状态(Empty,Dead,Stable,PreparingRebalance,CompletingRebalance

本节先看延迟操作。

延迟操作的意思是:协调者不能立即执行该操作。在服务端,使用延迟操作DelayJoin进行延迟“加入组”,表示协调者会延迟返回“加入组”响应给消费者

4.1. 准备“再平衡”

协调者收到消费者的”加入组“请求后,除了校验,还会做其他事情。

首先协调者会准备创建/更新成员元数据:

  • 当消费者成员编号未知,则分配一个编号,创建该成员元数据,并保存,编号会返回给消费者(见addMemberAndRebalance方法)
  • 当消费者成员编号已知,则更新该成员元数据(见updateMemberAndRebalance

然后,开始准备“再平衡”,进入方法maybePrepareRebalance。这里触发条件是:成员组状态必须为Stable/Empty。它会:

  • 创建DelayedJoin操作(若状态为Empty,则创建其子类InitialDelayedJoin
  • 将状态转换成PreparingRebalance
  • 尝试操作能否立即完成,若不能,则加入到延迟缓存中(DelayedOperationPurgatory,后面会讲)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
private def maybePrepareRebalance(group: GroupMetadata, reason: String): Unit = {
  group.inLock { // 上锁
    if (group.canRebalance) // 这里状态必须是Stable, CompletingRebalance, Empty
      prepareRebalance(group, reason) // 开始准备“再平衡”
  }
}

private def prepareRebalance(group: GroupMetadata, reason: String): Unit = {
  if (group.is(CompletingRebalance)) // 但这里,若状态为CompletingRebalance,还是返回错误
                                     // 所以,成员状态必须时Stable或Empty,才能出发“再平衡”准备
    resetAndPropagateAssignmentError(group, Errors.REBALANCE_IN_PROGRESS)
  // 创建DelayJoin
  // 若目前状态为Empty,则创建InitialDelayedJoin;若为Stable,则创建DelayedJoin
  val delayedRebalance = if (group.is(Empty))
    new InitialDelayedJoin(this,
      joinPurgatory,
      group,
      groupConfig.groupInitialRebalanceDelayMs,
      groupConfig.groupInitialRebalanceDelayMs,
      max(group.rebalanceTimeoutMs - groupConfig.groupInitialRebalanceDelayMs, 0))
  else
    new DelayedJoin(this, group, group.rebalanceTimeoutMs)
  // 转换组状态为PreparingRebalance
  group.transitionTo(PreparingRebalance)
  val groupKey = GroupKey(group.groupId)
  // 创建完DelayedJoin操作后,立即尝试,看看是否能立即完成
  // 若不能立即完成,则会将操作加入到延迟缓存中(指定键为组ID)
  joinPurgatory.tryCompleteElseWatch(delayedRebalance, Seq(groupKey))
}

易知,由于上面使用了锁,所以一次“再平衡”只会由一个消费者发起,DelayedJoin操作只会创建1个。

4.2. 延迟操作与延迟缓存

a) 延迟操作

延迟操作(DelayedOperation)有下面的特点:

  • 需要指定超时时间,超时后会强制完成
  • 需要将其加入到缓存(DelayedOperationPurgatory)中,并指定一个键(例如消费组编号)
  • 创建后,需要先尝试该操作是否能立即完成
  • 判断延迟操作是否能完成,需要自定义判断和实现

这里延迟操作有3个主要接口:

  • tryComplete:尝试完成
  • onComplete:操作完成后的回调
  • onExpiration:操作超时后的回调

b) 延迟缓存

延迟缓存(DelayedOperationPurgatory)保存了延迟操作对象,并将操作和键关联。

延迟缓存有下面2个主要方法:

  • tryCompleteElseWatch:尝试完成延迟操作,若不能完成,就以指定的键将该操作缓存起来,以进行监控

    监控时,任务会被加入到Timer中,它是一个延迟执行的线程池。当超时时,任务会被该线程池执行,执行的方法即Runnable#run

    1
    2
    3
    4
    5
    6
    7
    
    // DelayedTask被加入到DelayedOperationPurgatory#timer中延迟执行
    // 当超时发生时,timer会后台执行这个操作
    override def run(): Unit = {
      // 超时时执行
      if (forceComplete()) // 1. 强制结束任务
        onExpiration() // 2. 触发超时回调
    }
    
  • checkAndComplete:检查并尝试完成指定键的延迟操作

c) DelayedJoin

DelayedJoin是延迟操作的一个实现。它有自己的状态,即消费组元数据,保存在GroupCoordinator中。

GroupCoordinator可根据消费组元数据,判断是否能够完成延迟加入的操作,依据是:消费组的每个消费者都发送了“加入组”请求。

1
2
3
4
5
6
7
8
9
10
private[group] class DelayedJoin(coordinator: GroupCoordinator,
                                 group: GroupMetadata,
                                 rebalanceTimeout: Long) extends DelayedOperation(rebalanceTimeout, Some(group.lock)) {
  // 利用GroupCoordinator判断是否可以完成延迟入组,依据是所有消费者都发送了“加入组”请求
  override def tryComplete(): Boolean = coordinator.tryCompleteJoin(group, forceComplete _)
  // 任务过期的回调,定义在GroupCoordinator#onExpirationJoin中
  override def onExpiration() = coordinator.onExpireJoin()
  // 任务完成的回调,定义在GroupCoordinator#onCompleteJoin中
  override def onComplete() = coordinator.onCompleteJoin(group)
}

这里具体看tryComplete实现,具体放到下节说明。

4.3. 尝试完成延迟“加入组”操作

进入GroupCoordinator#tryCompleteJoin,发现很简单:

  • 判断每个消费者是否都发送了”加入组“请求
  • 调用forceComplete,触发DelayedJoin#onComplete回调
1
2
3
4
5
6
7
8
9
10
11
12
// In GroupCoordinator
def tryCompleteJoin(group: GroupMetadata, forceComplete: () => Boolean) = {
  group.inLock {
    if (group.hasAllMembersJoined) // 判断是否都发了“加入组”
      forceComplete() // 若都发了,则完成任务,最后会触发onComplete回调
    else false
  }
}

// In GroupMetadata
// 这里条件即:等待的成员个数等于组成员个数,且处于pending状态的为空
def hasAllMembersJoined = members.size == numMembersAwaitingJoin && pendingMembers.isEmpty

而完成这个延迟“加入组”的方法是onCompleteJoin,主要做:

  • 增加组generation值,并更改组状态为CompletingRebalance
  • 触发awaitingJoinCallback,返回“加入组”响应
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
def onCompleteJoin(group: GroupMetadata): Unit = {
  group.inLock { // 上锁
    // ... 
    } else {
      group.initNextGeneration() // 增加generation值,并将状态转化为CompletingRebalance
      // ...
      } else {
        // 对组内每个成员,发送JOIN_GROUP响应
        for (member <- group.allMemberMetadata) {
          val joinResult = JoinGroupResult(
            members = if (group.isLeader(member.memberId)) {
              group.currentMemberMetadata // 给Leader返回组成员信息
            } else {
              List.empty
            },
            memberId = member.memberId,
            generationId = group.generationId,
            protocolType = group.protocolType,
            protocolName = group.protocolName,
            leaderId = group.leaderOrNull,
            error = Errors.NONE)
					// 触发回调,即返回响应
          group.maybeInvokeJoinCallback(member, joinResult)
          completeAndScheduleNextHeartbeatExpiration(group, member)
          member.isNew = false
        }
      }
    }
  }
}

def maybeInvokeJoinCallback(member: MemberMetadata,
                            joinGroupResult: JoinGroupResult): Unit = {
  if (member.isAwaitingJoin) {
    member.awaitingJoinCallback(joinGroupResult) // 触发回调
    member.awaitingJoinCallback = null // 设置成员状态,成员不再awaitingJoin
    numMembersAwaitingJoin -= 1 // numMembersAwaitingJoin计数-1
  }
}

之前提及“只有所有消费者都发送‘加入组’请求后,才会响应”,但是有例外,即第一个消费者发送“入组”请求:

  • 协调者处理第一个消费者的“加入组”请求,创建元数据并保存
  • 消费组状态变成PreparingRebalance
  • 创建DelayedJoin操作,并立刻尝试完成它,由于从上面的hasAllMembersJoined条件成立(members.size为1,而numMembersAwaitingJoin也是1,pendingMembers为空),所以可以完成该操作
  • 完成延迟操作,调用GroupCoordinator#onCompleteJoin,将组信息返回给消费者(这里消费者只有1个,且主消费者也是它)

因此,最后的结果就是:

  • 整个组只有1个成员,组状态为CompletingRebalance
  • 该成员即主消费者,可以执行分区分配,并返回SYNC_GROUP请求
  • 最后整个消费组(仅1个成员)状态变为Stable,入组结束

注意:JOIN_GROUPSYNC_GROUP处理都会对消费组上锁,所以对于某个消费组,只能同时执行1个

但是还有一些问题:

  1. 尝试完成DelayedJoin前,其他消费者发送了JOIN_GROUP

    由于处理JOIN_GROUP上锁,第一个消费者的延迟操作会成功,会返回响应,状态会变为CompletingRebalance,此时消费组状态不稳定。之后,才会处理其他消费者的JOIN_GROUP。这和下面2的第2个子问题等价。

  2. 完成DelayedJoin后,其他消费者发送了JOIN_GROUP

    • 若其他消费者的JOIN_GROUP在后,第一个消费者SYNC_GROUP在前:对应消费组稳定(Stable)后,其他消费者入组

    • 若其他消费者的JOIN_GROUP在前,第一个消费者SYNC_GROUP在后:对应消费组不稳定(CompletingRebalance)时,其他消费者入组

第2个问题需要解决,分为2个字问题,放在下节说明。

4.4. 消费组稳定后,加入新消费者

这是4.3.中的第1个子问题。

当消费组稳定后,新消费者发送JOIN_GROUP,会触发整个组的重新加入,原有存在的消费者也需要重新入组

那么新消费者和原有消费者都会执行4.1~4.3提及的事情:

  • 2个消费者都向协调者发送JOIN_GROUP

  • 消费组状态变成PreparingRebalancing,并创建DelayedJoin操作(只会有1个),最先收到JOIN_GROUP对应的消费者是Leader

  • 由于有2个消费者,因此需要members.size == numMembersAwaitingJoin == 2,等待收到2个消费者的JOIN_GROUP后,才能完成DelayedJoin操作,触发完成该操作的条件有:

    • 外部事件触发:收到最后一个消费者的JOIN_GROUP后,最后会尝试完成这个延迟操作,将响应返回给2个消费者

      代码解释如下:

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      
        def handleJoinGroup(...): Unit = {
          // 对于最后一个消费者
          // ...
          } else {
            val isUnknownMember = memberId == JoinGroupRequest.UNKNOWN_MEMBER_ID
            groupManager.getGroup(groupId) match {
              case None =>
                // ...
              case Some(group) =>
                group.inLock {
                  // ...
                  // 加入新消费者/更新消费者,使其入组
                  // 这里不会创建DelayedJoin操作,因为之前创建过,且状态为PreparingRebalance,也不能创建
                  } else if (isUnknownMember) {
                    doUnknownJoinGroup(group, groupInstanceId, requireKnownMemberId, clientId, clientHost, rebalanceTimeoutMs, sessionTimeoutMs, protocolType, protocols, responseCallback)
                  } else {
                    doJoinGroup(group, memberId, groupInstanceId, clientId, clientHost, rebalanceTimeoutMs, sessionTimeoutMs, protocolType, protocols, responseCallback)
                  }
                  // 对于最后一个消费者,会进入该分支
                  if (group.is(PreparingRebalance)) {
                    // 这一步尝试完成DelayedJoin事件
                    // 这里会触发DelayedJoin事件的完成回调,因为它满足4.3.中提及的hasAllMembersJoined条件
                    joinPurgatory.checkAndComplete(GroupKey(group.groupId))
                  }
                }
              }
            }
          }
      
    • 超时触发:最后一个消费者的JOIN_GROUP没有收到,事件也会强制完成,响应只会返回给前面收到JOIN_GROUP请求的消费者

下面举例,比较直观地说明了新消费者入组的流程:

  • (第一轮):第一个消费者JOIN_GROUPSYNC_GROUP,完成重分配
  • (第二轮):
    • 第二个消费者JOIN_GROUP,触发第一个消费者重新JOIN_GROUP(通过心跳可感知)
    • 第二个JOIN_GROUP请求先来,消费组状态变为PreparingRebalance,并创建了DelayedJoin操作,但不能完成
  • (第二轮)
    • 第一个消费者JOIN_GROUP后来,不能创建DelayedJoin,但最后尝试并且能完成DelayedJoin操作
    • 消费组状态变成CompletingRebalance,返回响应给两个消费者
    • 最后进行SYNC_GROUP,Leader上传分配信息,并由协调者传播给其他消费者,消费组状态变成Stable,具体参考3.4.b

4.5. 消费组稳定前,加入新消费者

这是4.3.中的第2个子问题。

由于新消费者的JOIN_GROUP先到,之前消费者收到了响应后,再进行SYNC_GROUP就没有用了。

此时,消费者组的状态是CompletingRebalance,新消费者的JOIN_GROUP请求做的和4.1.~4.3.一模一样

  • 将状态改回PreparingRebalance
  • 创建延迟操作DelayedJoin
  • 尝试完成延迟操作,但是不能完成,因为hasAllMembersJoined没满足(members.size == 2numMembersAwaitingJoin == 1
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
private def doJoinGroup(...): Unit = {
  group.inLock { // 上锁
    // ...
    } else {
      val groupInstanceIdNotFound = groupInstanceId.isDefined && !group.hasStaticMember(groupInstanceId)
      // ...
      } else {
        val member = group.get(memberId)
        group.currentState match {
          // ...
          // 对应CompletingRebalance状态
          case CompletingRebalance =>
            if (member.matches(protocols)) {
              // 若元数据没改变,则不需要重分配,直接返回响应
              responseCallback(JoinGroupResult(...)
            } else {
              // 一般进入这里,执行4.1.~4.3.一样的操作
              updateMemberAndRebalance(group, member, protocols, responseCallback)
            }
					// ...
        }
      }
    }
  }
}

之前的消费者会响应SYNC_GROUP,它在新消费者处理完JOIN_GROUP后处理,因为上锁的缘故,消费者组的状态变为了PreparingRebalance(正常是CompletingRebalance),所以:

  • 返回之前的消费者REBALANCE_IN_PROGRESS错误码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
private def doSyncGroup(...): Unit = {
  group.inLock {
    // ...
    } else {
      group.currentState match {
        // ...
        // 此时之前的消费者SYNC_GROUP请求处理会来到这
        // 返回REBALANCE_IN_PROGRESS错误码
        // 消费者收到后会重新发送JOIN_GROUP请求
        case PreparingRebalance =>
          responseCallback(SyncGroupResult(Errors.REBALANCE_IN_PROGRESS))
        // ...
      }
    }
  }
}

消费者收到REBALANCE_IN_PROGRESS后,会:

  • 抛出RebalanceInProgressException异常
  • 然后会重新发起“加入组”请求
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
boolean joinGroupIfNeeded(final Timer timer) {
    while (rejoinNeededOrPending()) {
        // ...
        // 发起JOIN_GROUP,然后发起SYNC_GROUP
        // SYNC_GROUP会返回REBALANCE_IN_PROGRESS错误
        final RequestFuture<ByteBuffer> future = initiateJoinGroup();
        client.poll(future, timer); // 轮询以获取入组结果
        if (!future.isDone()) {
            return false; // 超时
        }
        if (future.succeeded()) {
            // ...
        } else {
            // 收到REBALANCE_IN_PROGRESS,Future会抛出RebanalceInProgress异常
            final RuntimeException exception = future.exception();
            // ...
            resetJoinGroupFuture(); // 重置(将joinFuture设为null,此时while条件返回true)
            if (exception instanceof UnknownMemberIdException ||
                exception instanceof RebalanceInProgressException || // 遇到RebanalceInProgress
                exception instanceof IllegalGenerationException ||
                exception instanceof MemberIdRequiredException)
                continue; // 进入下一轮循环,即重试,发起JOIN_GROUP请求
            else if (!future.isRetriable())
                throw exception;

            timer.sleep(rebalanceConfig.retryBackoffMs);
        }
    }
    return true;
}

protected synchronized boolean rejoinNeededOrPending() {
    return rejoinNeeded || joinFuture != null;
}

private synchronized void resetJoinGroupFuture() {
    this.joinFuture = null;
}

在此之后,消费者入组就和4.4.节的入组流程一致,这里不再赘述。

5. 消费组状态机与入组

从上面4章可以看出,协调者处理消费者的“加入组”和“同步组”,不仅需要“延迟操作”,而且更加依赖消费组的状态。

下一篇文章会专门讲消费组状态及其转换,并总结协调者“加入组”、“同步组”、“离开组”、超时和心跳的处理。