Kafka技术内幕-消费者

Posted by keys961 on June 24, 2020

1. 概览

Kafka消费者主要涉及到的就是KafkaConsumer类,其涉及到的主要是2个操作:

  • subscribe:订阅topic,动态分配分区给消费者
  • poll:轮询,返回消息集

此外还有一些操作:

  • assign:静态分配分区给消费者
  • commitAsync/commitSync:异步/同步提交偏移量到协调者

2. 客户端消费消息

2.1. 消费者订阅状态

KafkaConsumer提供了2种消费模式:

  • 订阅(subscribe):只指定订阅的topic,分区由协调者动态分配
  • 分配(assign):除了topic外,还指定消费的分区,即分区静态分配

订阅的状态保存在SubscriptionState中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public class SubscriptionState {
    private enum SubscriptionType {
        NONE, AUTO_TOPICS, AUTO_PATTERN, USER_ASSIGNED
    }
    private SubscriptionType subscriptionType; // 订阅类型
    private Pattern subscribedPattern; // 订阅模式
    private Set<String> subscription; // 本消费者消费的topic
    private Set<String> groupSubscription; // 本消费者所在组消费的topic
    private final PartitionStates<TopicPartitionState> assignment; // 当前分配的分区及其状态
    private final OffsetResetStrategy defaultResetStrategy; // 偏移量重置策略
    private ConsumerRebalanceListener rebalanceListener; // 触发重平衡的回调
    private int assignmentId = 0; // 当前分配分区方案的ID
		// ...
}

客户端采用订阅模式时,并不会先分配好分区:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
// In KafkaConsumer
public void subscribe(Collection<String> topics, ConsumerRebalanceListener listener) {
     // ...
     try {
        // ...
          if (topics == null)
              throw new IllegalArgumentException("Topic collection to subscribe to cannot be null");
          if (topics.isEmpty()) {
              this.unsubscribe();
          } else {
              // ...
              // 清除没有订阅的主题的数据
              fetcher.clearBufferedDataForUnassignedTopics(topics);
              // 添加订阅信息 
              if (this.subscriptions.subscribe(new HashSet<>(topics), listener))
                  metadata.requestUpdateForNewTopics(); // 设置成功则标记以启动元数据更新
          }
      } finally {
          release();
      }
}

// In SubscriptionState
public synchronized boolean subscribe(Set<String> topics, ConsumerRebalanceListener listener) {
    registerRebalanceListener(listener); // 注册回调
    setSubscriptionType(SubscriptionType.AUTO_TOPICS); // 设置订阅类型AUTO_TOPICS
    return changeSubscription(topics); // 变更订阅
}

private boolean changeSubscription(Set<String> topicsToSubscribe) {
    if (subscription.equals(topicsToSubscribe))
        return false; // 若没更改则返回false,元数据不需要更新
    subscription = topicsToSubscribe; // 直接设置订阅的topic
    return true;
}
    

而分配到分区后(重平衡后),会调用下面的方法,添加分区和状态到assignment

1
2
3
4
5
6
7
8
9
10
11
12
13
public synchronized void assignFromSubscribed(Collection<TopicPartition> assignments) {
    // ...
    // assignment保存了分区TopicPartition和它的状态TopicPartitionState
    Map<TopicPartition, TopicPartitionState> assignedPartitionStates = new HashMap<>(assignments.size());
    for (TopicPartition tp : assignments) {
        TopicPartitionState state = this.assignment.stateValue(tp);
        if (state == null)
            state = new TopicPartitionState();
        assignedPartitionStates.put(tp, state);
    }
    assignmentId++;
    this.assignment.set(assignedPartitionStates); // 直接设置新的assignment
}

而分配模式下,assignment会被预先分配好。

a) 分区状态

分区状态即TopicPartitionState,主要保存下面的信息:

1
2
3
4
5
6
7
8
9
10
private static class TopicPartitionState {

    private FetchState fetchState; // 拉取状态
    private FetchPosition position; // 拉取偏移量,包含偏移量值、epoch和Leader
    private Long highWatermark; // 上一次拉取的HW(低于HW的消息Kafka Broker都已提交/备份)
    // ...
    private boolean paused;  // 拉取是否暂停
    private OffsetResetStrategy resetStrategy;  // 偏移量重置策略
		// ...
}

其中position代表拉取的进度,它不能为空,否则消费者不能拉取这个分区。

b) 初始化分区状态

消费者初始化后,分区状态时没有设置的,轮询前需要设置,这最终会调用updateFetchPositions方法,下面说明一下调用逻辑,代码不贴了:

  1. 首先判断分配到的分区是否有合法position,若有则返回,开始拉取消息
  2. 进入refreshCommittedOffsetsIfNeededConsumerCoordinator)方法,获取需要拉取偏移量的分区
  3. 进入fetchCommittedOffsetsConsumerCoordinator)方法,向协调者发送请求,并轮询,获得偏移量,并设置
  4. 全部设置完成后,开始拉取消息

c) 重置和更新拉取偏移量

偏移量的更新入口在KafkaConsumer#updateFetchPositions中:

1
2
3
4
5
6
7
8
9
10
11
12
13
private boolean updateFetchPositions(final Timer timer) {
    // 验证偏移量
    fetcher.validateOffsetsIfNeeded();
    // 判断是否所有已订阅分区都获取了拉取偏移量,若都有则直接返回
    cachedSubscriptionHashAllFetchPositions = subscriptions.hasAllFetchPositions();
    if (cachedSubscriptionHashAllFetchPositions) return true;
    // 关键:向协调者发送请求,刷新拉取偏移量,从已提交的偏移量开始
    if (coordinator != null && !coordinator.refreshCommittedOffsetsIfNeeded(timer)) return false;
    // 重置偏移量(若剩余分区还需要一个偏移量)
    subscriptions.resetMissingPositions();
    fetcher.resetOffsetsIfNeeded();
    return true;
}

这里关键在refreshCommittedOffsetsIfNeeded方法,它:

  1. 会向本消费者组的协调者发送OFFSET_FETCH请求(具体在sendOffsetFetchRequest方法),获取消费者在该分区已提交的偏移量
  2. 使用该偏移量设置分区状态

而重置偏移量在Fetcher处,入口为resetOffsetsIfNeeded

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public void resetOffsetsIfNeeded() {
    // ...
    // 获取需要重置偏移量的分区
    Set<TopicPartition> partitions = subscriptions.partitionsNeedingReset(time.milliseconds());
    if (partitions.isEmpty())
        return;

    final Map<TopicPartition, Long> offsetResetTimestamps = new HashMap<>();
    for (final TopicPartition partition : partitions) {
        // 根据重置策略,填入时间戳标记(最早、最新)
        // 客户端会拉取Kafka Broker最早/最新的记录偏移量进行重置
        Long timestamp = offsetResetStrategyTimestamp(partition);
        if (timestamp != null)
            offsetResetTimestamps.put(partition, timestamp);
    }
    // 异步重置偏移量
    resetOffsetsAsync(offsetResetTimestamps);
}

这里关键在resetOffsetsAsync方法,它:

  1. 每个分区的主节点发送LIST_OFFSET请求(具体在sendListOffsetRequest方法),获取最新/最早的记录偏移量

  2. 触发回调函数,调用resetOffsetIfNeeded方法,里面具体调用的是SubscriptionState#maybeSeekUnvalidated方法,重置分区状态的偏移量

在启动/重分配时,会触发偏移量的拉取和重置,消费者从该偏移量开始进行消费:

  1. 首先向协调者发送OFFSET_FETCH请求,获取分区已提交的偏移量,设置为当前偏移量
  2. 若协调者没有存储提交的偏移量,则向分区主节点发送LIST_OFFSETS请求,根据策略获取主节点指定时间的(最新/最早)记录项的偏移量,重置为当前偏移量

2.2. 轮询前准备

准备工作有:

  • 连接消费组的协调者节点
  • 向协调者发送入组请求
  • 从协调者中获得分配的分区

具体而言,消费者会调用ConsumerCoordinator#poll以轮询协调者的事件(如保证协调者可用,入组,提交偏移量等等)。

关于入组:

  • 入组在轮询中,客户端向协调者异步发送入组请求,响应通过轮询获取,入组关键的方法是ensureActiveGroup,保证消费者入组
  • 入组操作说一个死循环,直到入组成功后才能跳出
  • 入组响应中会包含协调者返回的分区分配信息
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
// In AbstractCoordinator
public void ensureActiveGroup() {
    // 入组,不停重试,直到成功
    while (!ensureActiveGroup(time.timer(Long.MAX_VALUE))) {
        log.warn("still waiting to ensure active group");
    }
}

boolean ensureActiveGroup(final Timer timer) {
    // 保证协调者就绪
    if (!ensureCoordinatorReady(timer)) {
        return false;
    }
    // 启动心跳线程
    startHeartbeatThreadIfNeeded();
    // 必要时入组,返回消费者是否在组内
    return joinGroupIfNeeded(timer);
}

boolean joinGroupIfNeeded(final Timer timer) {
    while (rejoinNeededOrPending()) {
        if (!ensureCoordinatorReady(timer)) {
            return false;
        }

        if (needsJoinPrepare) {
            // 准备加入(标记以防止重复执行)
            needsJoinPrepare = false;
            onJoinPrepare(generation.generationId, generation.memberId);
        }
				// 向协调者异步发送入组请求,返回的结果是分区分配信息
        final RequestFuture<ByteBuffer> future = initiateJoinGroup();
        client.poll(future, timer); // 注意,这里轮询会阻塞,等待入组结果到来(后面会说包括“加入组”+“同步组”),或者超时
        if (!future.isDone()) {
            // 超时,返回失败
            return false;
        }

        if (future.succeeded()) {
            // 响应成功
            Generation generationSnapshot;
            synchronized (AbstractCoordinator.this) {
                generationSnapshot = this.generation;
            }

            if (generationSnapshot != Generation.NO_GENERATION) {
                // 返回响应的不是NO_GENERATION
                ByteBuffer memberAssignment = future.value().duplicate();
                // 完成入组,响应中带有分配的分区
                // 这步设置分区的分配信息
                onJoinComplete(generationSnapshot.generationId, generationSnapshot.memberId, generationSnapshot.protocolName, memberAssignment);
                resetJoinGroupFuture();
                needsJoinPrepare = true;
            } else {
                // 若消息说NO_GENERATION,则响应被心跳线程接受了,入组失败,所以还需要重新入组
                log.info("Generation data was cleared by heartbeat thread. Initiating rejoin.");
                resetStateAndRejoin();
                resetJoinGroupFuture();
                return false;
            }
        } else {
            // ...
            // 出现错误,若能重复,则睡眠一段时间重试
            timer.sleep(rebalanceConfig.retryBackoffMs);
        }
    }
    return true;
}

关于协调者的查找和连接:

  1. 客户端会询问已知的某个节点,异步发送FIND_COORDINATOR请求,以得到本组的协调者,这通过ensureCoordinatorReady方法里调用的lookupCoordinator方法实现,具体代码不贴了
  2. 响应的回调里,具体在FindCoordinatorResponseHandler#onSuccess,会:
    • 从响应中抽出并设置协调者节点信息
    • 调用client#tryConnect尝试连接

入组具体准备流程为(之后还会提到):

  1. 消费者订阅topic
  2. 消费者准备轮询
    1. 检查是否需要分配到分区
    2. 若需要,则发送JOIN_GROUP+SYNC_GROUP,确保分区的分配
  3. 消费者开始拉取消息

2.3. 轮询流程

消费者轮询消息流程如下:

  • 若初始化/重分区,执行准备工作,包括获取分区起始的拉取偏移量
  • 轮询消息以拉取消息
    • 异步发出拉取请求
    • 轮询,当就绪时将请求异步发出去,并通过异步回调接收响应
    • 获取拉取的消息,同时更新拉取偏移量,消息通过响应回调进行保存

对于轮询流程,上层核心代码在pollForFetches,大致如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
private Map<TopicPartition, List<ConsumerRecord<K, V>>> pollForFetches(Timer timer) {
    long pollTimeout = coordinator == null ? timer.remainingMs() :
            Math.min(coordinator.timeToNextPoll(timer.currentTimeMs()), timer.remainingMs());

    // 若已有接收到的消息,直接将其返回
    // 有这一步是因为拉取的消息是通过异步回调保存的
    final Map<TopicPartition, List<ConsumerRecord<K, V>>> records = fetcher.fetchedRecords();
    if (!records.isEmpty()) {
        return records;
    }

    // 向服务端发起FETCH请求,此时请求并没有发出
    fetcher.sendFetches();
    // ...
    Timer pollTimer = time.timer(pollTimeout);
    // 轮询,此时请求在写就绪后发出,同时响应通过Future的回调处理,得到的消息会被保存起来
    client.poll(pollTimer, () -> {
        return !fetcher.hasAvailableFetches();
    });
    timer.update(pollTimer.currentTimeMs());
		// 返回已拉取的消息(拉取的消息通过响应回调保存),并更新拉取偏移量
    return fetcher.fetchedRecords();
}

消费者上层的poll(Duration)内部是用一个循环,不停调用pollForFetches,条件是“是否超过时限且没拉到消息”

消费者可能会一直拉取不到消息,此时消费者会一直重试,直到超时,因此内部会用循环。因此外部调用poll(Duration)也必须使用循环,才能持续收到拉取的消息

响应回调:函数定义在sendFetches的一个匿名类中,回调函数大致如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
// 发起拉取请
RequestFuture<ClientResponse> future = client.send(fetchTarget, request); 
// 注册响应回调,将拉取的消息保存到队列
future.addListener(new RequestFutureListener<ClientResponse>() {
 @Override
 public void onSuccess(ClientResponse resp) {
     synchronized (Fetcher.this) {
         try {
             // 获取响应
             FetchResponse<Records> response = (FetchResponse<Records>) resp.responseBody();
             // ...
             Set<TopicPartition> partitions = new HashSet<>(response.responseData().keySet());
             // ...
             // 遍历收到的消息
             for (Map.Entry<TopicPartition, FetchResponse.PartitionData<Records>> entry : response.responseData().entrySet()) {
                 TopicPartition partition = entry.getKey();
                 FetchRequest.PartitionData requestData = data.sessionPartitions().get(partition);
                 // ...
                 long fetchOffset = requestData.fetchOffset;
                 FetchResponse.PartitionData<Records> partitionData = entry.getValue();
                 Iterator<? extends RecordBatch> batches = partitionData.records.batches().iterator();
                 short responseVersion = resp.requestHeader().apiVersion();
									  // 保存收到的消息到队列
                 // completedFetches是一个队列,保存已经拉取完成的消息
                 completedFetches.add(new CompletedFetch(partition, partitionData,
                          metricAggregator, batches, fetchOffset, responseVersion));
         }
         // ...
     }
 }
 
 // ... onFail 
});

回调函数在客户端轮询(poll)时,完成响应接收后调用,因此回调函数调用的线程和外部消费者线程是同一个线程(具体到内部还是多路复用,这里简略说明,实现在Selector#attempReadSelector#attempWrite中)

a) 流水线优化

此外,poll(Duration)操作还使用了“流水线”的思想:在返回拉取的消息前,再发送一次拉取请求,从而在处理拉取的消息时,获取下一轮拉取的消息。这部分关键代码如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
private ConsumerRecords<K, V> poll(final Timer timer, final boolean includeMetadataInTimeout) {
    acquireAndEnsureOpen();
    try {
        // ...
        do {
            // ....
					  // 轮询,获得已拉取的消息 
            final Map<TopicPartition, List<ConsumerRecord<K, V>>> records = pollForFetches(timer);
            if (!records.isEmpty()) {
                // 流水线:返回本轮拉取的消息前,把下一轮拉取的请求发出去
                // 
                // 这里可能会把请求真正地发出去,而不仅仅是将请求入队
                // 因为这里调研了客户端的poll,只是它超时时间是0
                // 当写事件就绪时,请求会发出去,否则直接返回
                // 因此它不会阻塞客户端线程
                if (fetcher.sendFetches() > 0 || client.hasPendingRequests()) {
                    client.transmitSends();
                }

                return this.interceptors.onConsume(new ConsumerRecords<>(records));
            }
        } while (timer.notExpired());
        return ConsumerRecords.empty();
    } finally {
        // ...
    }
}

流水线的加入提高了消费者的性能,使得第二次拉取的请求,不需要等到第一次拉取的消息处理完之后(注意是“处理完”)再发送

b) 线程模型

实际上,消费者线程只有1个线程,拉取器Fetcher只负责将拉取请求发出,并通过异步回调到方式处理和保存拉取的消息,这个动作是立刻返回的。

之所以说是“轮询”,是因为客户端使用了多路复用,必须通过轮询(如select, poll, epoll等)才能真正发出请求和接收响应。

与之非常类似的线程模型有Redis服务端aeEventLoop

c) 其他

首先,即时有流水线,消费者必须得到第一次拉取的响应后,才能发送第二个拉取请求,这在上面的poll代码里可以得到,只有当获取的记录非空时,才能发送第二个拉取请求(且第二个请求轮询等待时间为0)。

其次,拉取记录大小有阈值,有下面:

  • 消息大小阈值(message.max.bytes):服务器允许接收1条消息到最大字节数
  • 分区拉取大小阈值(max.partition.fetch.bytes): 拉取消息时,每个分区的最大字节数
  • 消息拉取大小阈值(fetch.message.max.bytes):消费者能够读取的最大消息大小

这里后两个必须比第一个大,否则某些分区永远无法消费(当消息大小介于两者之间时)。这些大小需要控制,过大或过小都会影响吞吐。

然后,轮询消息个数也有阈值(max.poll.records),控制一次轮询最多获取多少条记录。

由于有这个限制,可能会有多余的消息被拉取下来:

  • 多余的消息会在下面的轮询中返回给上层

  • 下一次拉取时,不会对多余消息所在的分区拉取消息

1
2
3
4
5
6
7
8
9
10
11
12
// 获取可拉取消息的分区
private List<TopicPartition> fetchablePartitions() {
    Set<TopicPartition> exclude = new HashSet<>();
    // 当多余拉取到的消息没被消费时,下一轮拉取时,对应分区不会被拉取
    if (nextInLineFetch != null && !nextInLineFetch.isConsumed) {
        exclude.add(nextInLineFetch.partition);
    }
    for (CompletedFetch completedFetch : completedFetches) {
        exclude.add(completedFetch.partition);
    }
    return subscriptions.fetchablePartitions(tp -> !exclude.contains(tp)); // 过滤没消费完过多消息的分区
}

3. 心跳任务

KafkaConsumer会执行心跳任务,这里的心跳是指“消费者”和“对应消费者组的协调者”的心跳。通过心跳,可以表明自己是存活的;若心跳丢失,协调者就认为消费者挂掉,从而进行失败处理,例如触发分区重平衡。

心跳任务定义在HeartbeatThread中,这个任务会在一个单独的线程中执行。

3.1. 心跳状态

心跳状态记录了心跳任务的元数据,定义在HeartBeat中:

1
2
3
4
5
6
7
8
9
10
11
12
public final class Heartbeat {
    private final int maxPollIntervalMs; // 最大拉取信息时间间隔
    private final GroupRebalanceConfig rebalanceConfig; // 重平衡配置 
    private final Time time; // 系统当前时间(实际上是个工具类)
    private final Timer heartbeatTimer; // 心跳计时器
    private final Timer sessionTimer; // 会话计时器
    private final Timer pollTimer; // 拉取计时器

    private volatile long lastHeartbeatSend = 0L; // 最近一次心跳请求发出的时间戳
    private volatile boolean heartbeatInFlight = false; // 心跳是否正在传输
    // ...
}

通过上面几个Timer,可以很容易地得到现在是否需要心跳,以及下一次心跳还需要的时间。当心跳成功后,这些Timer也会被更新:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
private void update(long now) {
    // 更新3个计时器,推进计时器的时间
    heartbeatTimer.update(now);
    sessionTimer.update(now);
    pollTimer.update(now);
}

void receiveHeartbeat() {
    update(time.milliseconds()); // 收到心跳则更新时间戳
    heartbeatInFlight = false; // 取消正在传输的标记
    sessionTimer.reset(rebalanceConfig.sessionTimeoutMs); // 重置会话计时器
}

boolean shouldHeartbeat(long now) {
    update(now); // 更新时间
    return heartbeatTimer.isExpired(); // 过期即需要心跳
}

long timeToNextHeartbeat(long now) {
    update(now); // 更新时间
    return heartbeatTimer.remainingMs(); // 剩余的时间即下次心跳还需等待的时间
}

// 启动一次心跳
void sentHeartbeat(long now) {
    lastHeartbeatSend = now; // 设置这次心跳时间戳
    heartbeatInFlight = true; // 标记心跳正在传输
    update(now); // 更新计时器
    heartbeatTimer.reset(rebalanceConfig.heartbeatIntervalMs);
}

3.2. 发送心跳

心跳发送在HeartbeatThread#run中,具体的代码段如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
heartbeat.sentHeartbeat(now);
// 这里发送心跳请求(HEARTBEAT)
final RequestFuture<Void> heartbeatFuture = sendHeartbeatRequest();
// 注册回调
heartbeatFuture.addListener(new RequestFutureListener<Void>() {
    @Override
    public void onSuccess(Void value) {
        // 心跳成功,则更新心跳状态
        synchronized (AbstractCoordinator.this) {
            heartbeat.receiveHeartbeat();
        }
    }

    @Override
    public void onFailure(RuntimeException e) {
        // 心跳失败,则分类
        // 1. 若正在重平衡,则认为心跳成功
        // 2. FenceException: 停止心跳
        // 3. 其他:标记心跳失败,唤醒协调者规划下一次心跳
        synchronized (AbstractCoordinator.this) {
            if (e instanceof RebalanceInProgressException) {
                heartbeat.receiveHeartbeat();
            } else if (e instanceof FencedInstanceIdException) {
                log.error("Caught fenced group.instance.id {} error in heartbeat thread", rebalanceConfig.groupInstanceId);
                heartbeatThread.failed.set(e);
                heartbeatThread.disable();
            } else {
                heartbeat.failHeartbeat();
                AbstractCoordinator.this.notify();
            }
        }
    }
});

而心跳返回时有响应的,即HeartbeatResponse。响应会先被HeartbeatResponseHandler处理,将HeartbeatResponse转换成Void,然后传入上面的回调回调函数中。具体代码不贴了。

1
2
3
4
5
6
7
8
9
10
11
synchronized RequestFuture<Void> sendHeartbeatRequest() {
    HeartbeatRequest.Builder requestBuilder =
            new HeartbeatRequest.Builder(new HeartbeatRequestData()
                    .setGroupId(rebalanceConfig.groupId)
                    .setMemberId(this.generation.memberId)
                    .setGroupInstanceId(this.rebalanceConfig.groupInstanceId.orElse(null))
                    .setGenerationId(this.generation.generationId));
    return client.send(coordinator, requestBuilder)
            .compose(new HeartbeatResponseHandler(generation)); // 先交给HeartbeatResponseHandler处理
                                                                // 处理时将结果传给Future<Void>,触发上面代码块的回调
}

3.3. 运行心跳任务

客户端加入消费组,就需要启动心跳任务,具体如下代码所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
boolean ensureActiveGroup(final Timer timer) {
    if (!ensureCoordinatorReady(timer)) {
        return false;
    }

    startHeartbeatThreadIfNeeded(); // 入组前,创建心跳线程,但没被启动
    return joinGroupIfNeeded(timer); // 入组后,心跳线程会被开启
}

private synchronized void startHeartbeatThreadIfNeeded() {
    if (heartbeatThread == null) {
        // 启动心跳线程
        heartbeatThread = new HeartbeatThread();
        heartbeatThread.start(); // 运行中,但没开启,会处于等待状态
                                 // 入组后,线程会被开启进行心跳
    }
}

而整个任务是通过一个死循环执行的,并通过调用AbstractCoordinator.this#notify/wait来实现定时的心跳:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
@Override
public void run() {
    try {
        while (true) {
            synchronized (AbstractCoordinator.this) {
                if (closed)
                   return;

                if (!enabled) {
                    // 开启了才能执行,否则等待
                    // 入组前,没有开启
                    // 等待入组后,会调用enable开启,enable中会调用AbstractCoordinator.this.notify
                    // 入组后的回调,可参考initiateJoinGroup方法
                    AbstractCoordinator.this.wait();
                    continue;
                }
                // ...
                // 轮询,以拉取和发送心跳响应和请求
                client.pollNoWakeup();
                long now = time.milliseconds();
                if (coordinatorUnknown()) {
                    if (findCoordinatorFuture != null || lookupCoordinator().failed())
                        // 协调者找不到,等待一段时间再重试
                        AbstractCoordinator.this.wait(rebalanceConfig.retryBackoffMs);
                } else if (heartbeat.sessionTimeoutExpired(now)) {
                    // 会话丢失,标记协调者未知
                    markCoordinatorUnknown();
                } else if (heartbeat.pollTimeoutExpired(now)) {
                    // 拉取心跳超时,则可能离开消费组
                    String leaveReason = "...";
                    maybeLeaveGroup(leaveReason);
                } else if (!heartbeat.shouldHeartbeat(now)) {
                    // 若当前不需要心跳,则等待一段时间
                    AbstractCoordinator.this.wait(rebalanceConfig.retryBackoffMs);
                } else {
                    // 发送心跳
                    heartbeat.sentHeartbeat(now);
                    final RequestFuture<Void> heartbeatFuture = sendHeartbeatRequest();
                    heartbeatFuture.addListener(new RequestFutureListener<Void>() {
                        // ... 见上面分析
                    });
                }
            }
        }
    } 
    // ...
}

3.4. 心跳与协调者的关系

心跳是为了确保:

  • 协调者已知,消费者必须连接上协调者
  • 消费者处于活动状态,消费者必须分配到分区

从上面可知,心跳是在入组后进行的。心跳可能会出现错误,这时候需要处理,分下面几类:

  • 协调者挂掉:标记原协调者死亡,需要重新连接新协调者
  • 协调者没挂:标记“重新入组”,此时需要向协调者发送“入组”请求

处理的代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
private class HeartbeatCompletionHandler extends CoordinatorResponseHandler<HeartbeatResponse, Void> {
    @Override
    public void handle(HeartbeatResponse heartbeatResponse, RequestFuture<Void> future) {
        sensors.heartbeatLatency.record(response.requestLatencyMs());
        Errors error = Errors.forCode(heartbeatResponse.errorCode());
        if (error == Errors.NONE) {
            // 没错误,直接完成即可
            future.complete(null);
        } else if (error == Errors.GROUP_COORDINATOR_NOT_AVAILABLE
                || error == Errors.NOT_COORDINATOR_FOR_GROUP) {
            // 协调者挂了,标记,需要重连新协调者
            coordinatorDead();
            future.raise(error);
        } else if (error == Errors.REBALANCE_IN_PROGRESS) {
            // 正在重分配,节点没挂,则需要重新入组
            AbstractCoordinator.this.rejoinNeeded = true;
            future.raise(Errors.REBALANCE_IN_PROGRESS);
        } else if (error == Errors.ILLEGAL_GENERATION) {
            // generation不合法,节点没挂,则需要重新入组
            AbstractCoordinator.this.rejoinNeeded = true;
            future.raise(Errors.ILLEGAL_GENERATION);
        } else if (error == Errors.UNKNOWN_MEMBER_ID) {
            // member id不合法,节点没挂,则需要重新入组
            memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID;
            AbstractCoordinator.this.rejoinNeeded = true;
            future.raise(Errors.UNKNOWN_MEMBER_ID);
        } else if (error == Errors.GROUP_AUTHORIZATION_FAILED) {
            future.raise(new GroupAuthorizationException(groupId)); // 验证错误
        } else {
            // 其他错误
            future.raise(new KafkaException("Unexpected error in heartbeat response: " + error.message()));
        }
    }
}

4. 偏移量提交

KafkaConsumer有下面几种偏移量提交方式:

  • commitAsync:异步提交偏移量到协调者
  • commitSync:同步提交偏移量到协调者
  • 自动提交:提交是异步的

4.1. 自动提交任务

enable.auto.commit=true时,启用自动提交。自动提交可以同步也可以异步,这里只看异步提交。

通过调用链,可知消费者在对协调者进行轮训时时,会触发自动提交:

1
2
3
4
5
6
7
8
// In ConsumerCoordinator
public boolean poll(Timer timer) {
    // ...
    invokeCompletedOffsetCommitCallbacks(); // 下文会说,它会读取completedOffsetCommits队列(里面保存提交偏移量完成的响应),然后执行响应里指定的回调函数
    // ...
    maybeAutoCommitOffsetsAsync(timer.currentTimeMs()); // 轮询最后可能会触发自动提交
    return true;
}

自动提交触发的时间间隔由auto.commit.interval.ms参数提供,默认5s。该值会被传入一个计时器nextAutoCommitTimer(位于ConsumerCoordinator中),当计时器到期时,就会触发提交任务,并重置计时器:

1
2
3
4
5
6
7
8
9
10
11
// In ConsumerCoordinator
public void maybeAutoCommitOffsetsAsync(long now) {
    if (autoCommitEnabled) { // 启动自动提交时
        nextAutoCommitTimer.update(now); // 更新计时器
        if (nextAutoCommitTimer.isExpired()) { 
            // 若计时器到期,说明需要提交,则重置计时器,并执行异步提交任务
            nextAutoCommitTimer.reset(autoCommitIntervalMs);
            doAutoCommitOffsetsAsync(); // 异步提交
        }
    }
}

自动提交任务代码如下,它首先获取每个分区已消费的最新分区偏移量,然后以该偏移量异步提交(异步提交偏移量就放到下一节):

1
2
3
4
5
6
7
8
private void doAutoCommitOffsetsAsync() {
    // 获取已消费的分区的最新偏移量
    Map<TopicPartition, OffsetAndMetadata> allConsumedOffsets = subscriptions.allConsumed();
    // 执行异步提交,具体在下一节说明
    commitOffsetsAsync(allConsumedOffsets, (offsets, exception) -> {
        // 执行完成后到回调,这里主要是错误处理,略
    });
}

4.2. 异步提交偏移量

异步提交偏移量外部接口时commitAsync,内部最终会调用coordinator#commitOffsetAsync方法:

该方法自动提交偏移量时也会调用,只不过参数中的偏移量由客户端自己获取

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public void commitOffsetsAsync(final Map<TopicPartition, OffsetAndMetadata> offsets, final OffsetCommitCallback callback) {
    invokeCompletedOffsetCommitCallbacks();
    if (!coordinatorUnknown()) {
        // 若协调者已知,则执行异步提交
        doCommitOffsetsAsync(offsets, callback);
    } else {
        // 否则先查找协调者,找到协调者后,回调立刻异步提交并进行轮询(将请求发出)
        pendingAsyncCommits.incrementAndGet();
        lookupCoordinator().addListener(new RequestFutureListener<Void>() {
            @Override
            public void onSuccess(Void value) {
                // 找到后,立刻执行异步提交
                pendingAsyncCommits.decrementAndGet();
                doCommitOffsetsAsync(offsets, callback);
                // 轮询,把请求发出去
                client.pollNoWakeup();
            }

            @Override
            public void onFailure(RuntimeException e) {
                // 查找协调者失败,将提交错误结果追加到completedOffsetCommits队列,在轮询时统一执行回调(参数的callback)
                pendingAsyncCommits.decrementAndGet();
                completedOffsetCommits.add(new OffsetCommitCompletion(callback, offsets,
                        new RetriableCommitFailedException(e)));
            }
        });
    }
    // 轮询,将请求发出,这里的轮询不会造成进行阻塞
    client.pollNoWakeup();
}

上面方法的核心在doCommitOffsetCommit

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
// 执行异步提交偏移量
private void doCommitOffsetsAsync(final Map<TopicPartition, OffsetAndMetadata> offsets, final OffsetCommitCallback callback) {
    // 发送OFFSET_COMMIT请求(需要poll才能将请求发出去)
    RequestFuture<Void> future = sendOffsetCommitRequest(offsets);
    // 回调函数,若为空则为默认回调,默认什么都不处理
    final OffsetCommitCallback cb = callback == null ? defaultOffsetCommitCallback : callback;
    // 注册响应回调
    future.addListener(new RequestFutureListener<Void>() {
        @Override
        public void onSuccess(Void value) {
            // 提交成功,将成功结果添加到completedOffsetCommits队列,在轮询时统一执行回调(参数的callback)
            if (interceptors != null)
                interceptors.onCommit(offsets);
            completedOffsetCommits.add(new OffsetCommitCompletion(cb, offsets, null));
        }

        @Override
        public void onFailure(RuntimeException e) {
            // 提交失败,将失败结果连同异常添加到completedOffsetCommits队列,在轮询时统一执行回调(参数的callback)
            Exception commitException = e;
            if (e instanceof RetriableException) {
                commitException = new RetriableCommitFailedException(e);
            }
            completedOffsetCommits.add(new OffsetCommitCompletion(cb, offsets, commitException));
            if (commitException instanceof FencedInstanceIdException) {
                asyncCommitFenced.set(true);
            }
        }
});

而处理提交偏移量响应的地方位于OffsetCommitResponseHandler,处理的代码如下:

和其他请求发送类似,该回调会通过send(req).compose(handler)添加进去,当响应就绪时,会执行handler的回调;handler里面有参数future,当回调触发后,最终会让future就绪,从而触发future里的回调。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
 @Override
public void handle(OffsetCommitResponse commitResponse, RequestFuture<Void> future) {
    sensors.commitSensor.record(response.requestLatencyMs());
    Set<String> unauthorizedTopics = new HashSet<>();
    // 遍历每个分区的提交响应
    for (OffsetCommitResponseData.OffsetCommitResponseTopic topic : commitResponse.data().topics()) {
        for (OffsetCommitResponseData.OffsetCommitResponsePartition partition : topic.partitions()) {
            TopicPartition tp = new TopicPartition(topic.name(), partition.partitionIndex());
            OffsetAndMetadata offsetAndMetadata = this.offsets.get(tp);
            long offset = offsetAndMetadata.offset();
						// 根据返回码生成异常
            Errors error = Errors.forCode(partition.errorCode());
            if (error == Errors.NONE) {
                // 分区提交成功
                log.debug("Committed offset {} for partition {}", offset, tp);
            } else {
                // 分区提交失败
                // 根据不同的错误处理,最后都会调用future.raise(error)
                // ...
            }
        }
    }
    if (!unauthorizedTopics.isEmpty()) {
        log.error("Not authorized to commit to topics {}", unauthorizedTopics);
        future.raise(new TopicAuthorizationException(unauthorizedTopics));
    } else {
        // 全部成功,让future complete
        future.complete(null);
    }
}

4.3. 同步提交偏移量

异步提交偏移量外部接口时commitSync,内部最终会调用coordinator#commitOffsetsSync方法。

这里相对比较简单,和异步提交类似,不过这里使用了带阻塞的poll,调用完成后返回的Future通常就会有结果,若没结果,说明必定超时,方法会返回false

同步提交,底层调用的是select,它会阻塞;

异步提交,底层调用的是selectNow,它不会阻塞,立即返回。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public boolean commitOffsetsSync(Map<TopicPartition, OffsetAndMetadata> offsets, Timer timer) {
    invokeCompletedOffsetCommitCallbacks();

    if (offsets.isEmpty())
        return true;
    do {
        // 若协调者没准备好
        if (coordinatorUnknown() && !ensureCoordinatorReady(timer)) {
            return false;
        }
				
        RequestFuture<Void> future = sendOffsetCommitRequest(offsets); // 发送OFFSET_COMMIT请求
        client.poll(future, timer); // 这里产生阻塞,真正地将请求发出,并接收响应,完成后才会继续下去
                                    // 这一步返回后,Future通常会有结果
        invokeCompletedOffsetCommitCallbacks(); // 首先调用这一步,执行之前提交偏移量的响应回调
      
				// 若在这段时间内,响应成功,则返回true
        if (future.succeeded()) {
            if (interceptors != null)
                interceptors.onCommit(offsets);
            return true;
        }
        // 若在这段时间内,响应失败且不可重试,则抛出异常
        if (future.failed() && !future.isRetriable())
            throw future.exception();
        // 若这段时间内,响应失败但可重试,或没响应(不过一般都跳出循环直接超时了),则尝试重试
        timer.sleep(rebalanceConfig.retryBackoffMs);
    } while (timer.notExpired());
    // 超时,则返回false
    return false;
}

同步提交偏移量在内部的调用地方在2个地方:

  • 准备入组时:消费者重新入组前,需要先提交一次偏移量,这里的偏移量是最新的拉取偏移量,这里必须同步(见onJoinPrepare
  • 关闭消费者:关闭消费者前,也需要先提交一次偏移量,这里的偏移量也是最新的拉取偏移量,同样这里也必须同步(见close

4.4. 消费者消息处理语义

语义通常有:至多一次、至少一次、有且仅有一次。

a) 至多一次

至多一次允许数据的丢失。

因此,消费者需要先提交偏移量,然后处理消息,因此:

  • 打开自动提交
  • 设置较短的自动提交时间间隔

b) 至少一次

至少一次不允许数据丢失,但可以重复。

因此,消费者需要先处理消息,再提交偏移量,因此:

  • 关闭自动提交
  • 处理完消息消息后,调用commitSync同步提交

c) 有且仅有一次

方案有2种:

  1. 引入两阶段提交协议
  2. 同时更新消息偏移量和处理结果(即提交放入一个事务里),例如将其写入数据库

以第2种方案为例:

  • 关闭自动提交
  • 实现一个偏移量管理器OffsetManager,将分区偏移量和处理结果的更新以事务的方式提交(例如提交给数据库)
  • 实现并注册一个分区重平衡监听器ConsumerRebalanceListener,触发重平衡时,从OffsetManager中恢复出已提交的分区偏移量(通过调用KafkaConsumer#seek实现)

而对于生产者,也有2种方案:

  1. 读取分区最后一条消息,决定是否重传
  2. 给消息附上唯一ID,下游进行去重
  3. 使用“事务”