當前位置: 妍妍網 > 碼農

Kafka消費者源碼解析,學習總結

2024-02-24碼農

作者:後來丶_a24d
連結:https://www.jianshu.com/p/2932410aa1ec

使用範例

 public staticvoid main(String[] args) {
Properties props = new Properties();
String topic = "test";
// auto-offset-commit
String group = "test0";
props.put("bootstrap.servers""XXX:9092,XXX:9092");
props.put("group.id", group);
props.put("auto.offset.reset""earliest");
// 自動commit
props.put("enable.auto.commit""true");
// 自動commit的間隔
props.put("auto.commit.interval.ms""1000");
props.put("session.timeout.ms""30000");
props.put("key.deserializer""org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer""org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<StringString> consumer = new KafkaConsumer<>(props);
// 可消費多個topic,組成一個list
consumer.subscribe(Arrays.asList(topic));
while (true) {
ConsumerRecords<StringString> records = consumer.poll(100);
for (ConsumerRecord<StringString> record : records) {
System.out.printf("offset = %d, key = %s, value = %s \n", record.offset(), record.key(), record.value());
try {
Thread.sleep(100);
catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}

前置知識之傳遞性語意保證

定義
  • At most once:訊息可能會丟,但是不會重復傳遞

  • At least once:訊息不會丟,但是有可能重復傳遞

  • Exactly once:每條訊息只被傳遞一次

  • Exactly once實作
  • 生產者端: 可以為每個訊息定義個全域唯一id,消費者端進行過濾,不重復消費

  • 消費者端可能發生的事情:

    1. 先處理訊息,後送出offset(送出的話有自動送出和手動送出兩種):這種有可能導致At most once,如果訊息處理之後伺服器宕機或者再均衡,這時候已消費的訊息並沒有被送出到(_consumer_offset主題),就會導致重復消費。

    2. 先送出offset, 再處理。這種有可能導致At least once, 如果訊息處理之後伺服器宕機或者再均衡,這時候已送出offset,但是並未消費

  • 消費者端解決方案

    1. 首先關閉自動送出,也不手動送出到(_consumer_offset主題)。而是將offset送出和訊息處理放到一個事務(數據可以儲存到資料庫,redis之類), 事務執行成功則認為消費成功,否則事務回滾。當出現伺服器宕機或者再均衡,可以從關系型資料庫或者redis找到對應offset並利用KafkaConsumer.seek手動設定消費位置,從此offset開始消費。

    2. 如果是伺服器再均衡,則可以利用再均衡監聽器,其中提供兩個方法,一個是再均衡完成之後,拉去數據前執行,這時候可以用seek設定offset。還有一個是停止拉去數據後,再均衡前,這時候可以手動送出,避免再均衡重復消費。

    前置知識之消費端再均衡

  • 觸發條件

    1. 組成員個數發生變化。例如有新的 consumer 例項加入該消費組或者離開組。

    2. 訂閱的 Topic 個數發生變化。

    3. 訂閱 Topic 的分區數發生變化

  • 此圖是參考Kafka Rebalance機制分析

  • 方案

    1. 最開始實作策略是利用zookeeper,zookeeper底下有報錯group_ids的路勁,每個consumer都在zookeeper上有相應路勁並註冊了Watch。透過Watch每個消費者就可以監控Consumer Group和Kafka了。但是這種策略有兩個不過的地方:其一, 容易引起羊群效應,當某個伺服器新加入Consumer Group時,所有關註的Watch都會接收到通知。其二, 腦裂, 有可能一台伺服器連線兩個不同的zookeeper,而其中一台zookeeper有可能不是最新數據。

    2. 每個Consumer Group子集對應一個伺服端的GroupCoordinator進行管理。此時消費者不再依賴zookeeper,而是依賴GroupCoordinator,GroupCoordinator依賴zookeeper並在zookeeper上註冊Watch。當消費者加入或結束消費者群組時會修改zookeeper上面的後設資料,這時候會觸發相應GroupCoordinator的Watch, 通知GroupCoordinator進行Rebalance。簡書上述步驟,步驟1: 如果是新加入Consumer Group的伺服器,會向Kafka伺服端發送ConsumerMataDataRequest,kafka伺服端會告知具體的GroupCoordinator。步驟2: 消費端在知道GroupCoordinator後,會向其發送HeartbeatRequest。如果長時間沒發送,GroupCoordinator就會認為對應伺服器下線了,會觸發Rebalance。步驟3: 如果HeartbeatResponse中包含了IllegalGeneration,說明GroupCoordinator在執行Rebalance, 此時消費者會向GroupCoordinator發現JoinGroupRequest,GroupCoordinator根據JoinGroupRequest和zookeeper後設資料完成對Group的分區分配。步驟4: GroupCoordinator完成分區分配時會保存數據到zookeeper,並JoinGroupResponse給對應的request。

    3. 第二種方案有個不好的地方就是,每次修改分區策略都得改伺服端程式碼並重新開機。方案三將分區分配交給消費者端處理,這樣實作了解耦。相對方案二改變為: 消費者向GroupCoordinator發送JoinGroupRequest後,伺服端的GroupCoordinator會等待一夥直到所有Consumer Group的伺服器都發送之後,選取一個Group leader。Group leader會受到所有的消費者資訊,並且會根據分區策略進行分區分配。下一個階段是Synchronizing Group state階段,在這個階段,每個消費者會發送SyncGroupRequest給GroupCoordinator,但是Group leader包含了分區分配結果,GroupCoordinator會將分區分配結果作為SyncGroupResponse返回給各個消費者伺服器。

    前置知識之分區分配策略:

    1. RoundRobinAssignor: 列出所有 topic-partition 和列出所有的 consumer member,然後開始分配,一輪之後繼續下一輪

    2. RangeAssignor: 對於剩下的那些 partition 分配到前 consumersWithExtraPartition 個 consumer 上。假設有一個 topic 有 7 個 partition,group 有5個 consumer,這個5個 consumer 都訂閱這個 topic,那麽 range 的分配方式如下,如果有consumer不參加則不算對應的減少consumer的size

    註: 
    // 表示平均每個 consumer 會分配到幾個 partition
    numPartitionsPerConsumer = numPartitionsForTopic / consumersForTopic.size():
    // 表示平均分配後還剩下多少個 partition 未分配
    consumersWithExtraPartition = numPartitionsForTopic % consumersForTopic.size():
    例子: 
    consumer 0:start0length2, topic-partition: p0,p1;
    consumer 1start2length2, topic-partition: p2,p3;
    consumer 2start4length1, topic-partition: p4;
    consumer 3start5length1, topic-partition: p5;
    consumer 4start6length1, topic-partition: p6

    KafkaConsumer分析

    執行緒安全性
  • KafkaConsumer非執行緒安全,這種設計將執行緒安全轉移到了呼叫方。

  • 解決方案: 可以使用兩個執行緒池,生產-消費模式,解耦訊息消費和訊息處理,其中一個執行緒池每個執行緒持有一個KafkaConsumer物件,(可以根據關註的主題數量來決定執行緒數量),拉取數據之後放入佇列,另一個執行緒池處理佇列裏面的數據。qmq的處理方式是一個執行緒池拉取到數據之後直接交給另一個執行緒處理,並沒有直接透過佇列。

  • 高度抽象程式碼方法總結 這裏總結了所接觸計畫中各種抽象程式碼的方式,可作為參考

  • Kafka訊息端源碼把多種請發送,處理響應,處理完響應成功或者失敗傳播出去的各個實作都進行了高度統一抽象,很值得學習。

  • 現在看下Heartbeat的響應處理流程,其他請求響應處理流程類似,只不過Handler不同。請求發送完成之後,伺服端有響應此時會回呼請求持有的。RequestCompletionHandler.onComplete方法,訊息端RequestCompletionHandler的實作類是RequestFutureCompletionHandler。RequestFuture,RequestFutureListtener都是為了輔助實作統一格式的響應處理以及響應處理後的事件傳播出去。

  • publicstatic classRequestFutureCompletionHandler
    // RequestFuture是真正的處理邏輯
    extendsRequestFuture<ClientResponse>
    // RequestCompletionHandler 是發送請求時,持有的需要回呼的函式介面
    implementsRequestCompletionHandler
    {
    @Override
    publicvoidonComplete(ClientResponse response){
    if (response.wasDisconnected()) {
    // 省略
    else {
    complete(response);
    }
    }
    }
    publicvoidcomplete(T value){
    //省略其他
    fireSuccess();
    }
    // 所以這裏的關鍵是listener的添加,添加也交給各個請求的Handler實作
    privatevoidfireSuccess(){
    for (RequestFutureListener<T> listener : listeners)
    listener.onSuccess(value);
    }

  • 消費者端有多種請求,比如JoinGroupRequest,SyncGroupRequest,HeartbeatRequest。舉個Heartbeat發送請求的邏輯,其他請求發送的格式都大同小異。

  • private classHeartbeatTaskimplementsDelayedTask{
    @Override
    publicvoidrun(finallong now){
    //省略一部份程式碼
    //這裏發送HeartbeatRequest,其他請求發送也是走這兩步,但是實作不同
    RequestFuture<Void> future = sendHeartbeatRequest();
    //HeartbeatResonse成功或失敗的事件傳播下來繼續處理,傳播事件的RequestFuture與響應解析的RequestFuture已經不同了
    future.addListener(new RequestFutureListener<Void>() {
    @Override
    publicvoidonSuccess(Void value){
    requestInFlight = false;
    long now = time.milliseconds();
    heartbeat.receiveHeartbeat(now);
    long nextHeartbeatTime = now + heartbeat.timeToNextHeartbeat(now);
    client.schedule(HeartbeatTask.this, nextHeartbeatTime);
    }
    @Override
    publicvoidonFailure(RuntimeException e){
    requestInFlight = false;
    client.schedule(HeartbeatTask.this, time.milliseconds() + retryBackoffMs);
    }
    });
    }
    }
    public RequestFuture<Void> sendHeartbeatRequest(){
    HeartbeatRequest req = new HeartbeatRequest(this.groupId, this.generation, this.memberId);
    // send返回RequestFuture,響應該請求時呼叫回呼的參照與此RequestFuture相同
    return client.send(coordinator, ApiKeys.HEARTBEAT, req)
    // 這裏主要是為RequestFuture添加listener
    .compose(new HeartbeatCompletionHandler());
    }
    // 此函式高度統一JoinGroupRequest,SyncGroupRequest發送也是呼叫它
    public RequestFuture<ClientResponse> send(Node node,
    ApiKeys api,
    AbstractRequest request)
    {
    // future模式,send函式返回future參照,可以對future進行處理,future繼承了RequestFuture
    RequestFutureCompletionHandler future = new RequestFutureCompletionHandler();
    RequestHeader header = client.nextRequestHeader(api);
    // 發送的請求持有future,等該請求接收到響應時就可以呼叫此future的回呼函式
    RequestSend send = new RequestSend(node.idString(), header, request.toStruct());
    put(node, new ClientRequest(now, true, send, future));
    return future;
    }
    public <S> RequestFuture<S> compose(final RequestFutureAdapter<T, S> adapter){
    // 返回新的RequestFuture,此處RequestFuture於響應要回呼的RequestFuture不同,這裏的目前是給呼叫send請求的物件添加listener的機會,將成功或失敗的響應傳播出去
    final RequestFuture<S> adapted = new RequestFuture<S>();
    // 添加listener
    addListener(new RequestFutureListener<T>() {
    @Override
    publicvoidonSuccess(T value){
    // 將成功的響應傳播出去
    adapter.onSuccess(value, adapted);
    }
    @Override
    publicvoidonFailure(RuntimeException e){
    // 將失敗的響應傳播出去
    adapter.onFailure(e, adapted);
    }
    });
    return adapted;
    }





  • RequestFutureAdapter介面卡模式,將各種請求的處理都糅合到一起了。

  • AutoCommitTask, HeartbeatTask。定時任務高度抽象,具體實作交給對應類
  • 加入定時任務,其中DelayedTask 有自動送出任務AutoCommitTask和心跳檢測任務兩種實作

  • publicvoidschedule(DelayedTask task, long at{
    delayedTasks.add(task, at);
    }
    publicinterfaceDelayedTask {
    voidrun(long now);
    }

  • 執行定時任務,每次poll拉取資訊的時候看需要執行下所有加入的定時任務

  • publicvoidpoll(long now){
    // 類似for迴圈一個個執行, Entry 有時間欄位,實作定時任務
    while (!tasks.isEmpty() && tasks.peek().timeout <= now) {
    Entry entry = tasks.poll();
    entry.task.run(now);
    }
    }

    整體流程
  • 整體架構

    1. SubscriptionState管理訂閱的Topic集合和消費Partition情況;

    2. Fetch是抓取數據;

    3. ConsumerCoordinator是與伺服端GroupCoordinator協作的;

    4. ConsumerNetworkClient是對NetworkClient進行更高一層封裝,當然其作用是發網路請求的。

  • 看下KafkaConsumer核心poll方法

  • public ConsumerRecords<K, V> poll(long timeout){
    // 防止多執行緒並行操作,如果是一個topic對應一個KafkaConsumer,
    // 並且將拉取數據放到對應佇列,多執行緒執行數據處理情況下, 可以理解為每個topic都是單執行緒的
    acquire();
    try {
    // 省略部份程式碼
    do {
    // 1\. 核心方法
    Map<TopicPartition, List<ConsumerRecord<K, V>>> records = pollOnce(remaining);
    //2\. 從訂閱的 partition 中拉取數據,pollOnce() 才是對 Consumer 客戶端拉取數據的核心實作
    if (!records.isEmpty()) {
    // 3\. 在返回數據之前,發送下次的 fetch 請求,避免使用者在下次獲取數據時執行緒 block
    // 因為fetch結果會被緩存,所以這裏等於可以並列的執行發送下次請求和處理這次請求
    if (fetcher.sendFetches() > 0 || client.pendingRequestCount() > 0)
    client.pollNoWakeup();
    if (this.interceptors == null)
    returnnew ConsumerRecords<>(records);
    else
    returnthis.interceptors.onConsume(new ConsumerRecords<>(records));
    }
    while (remaining > 0);
    return ConsumerRecords.empty();
    finally {
    release();
    }
    }
    // NO_CURRENT_THREAD -1
    // 多執行緒檢測機制
    privatefinal AtomicLong currentThread = new AtomicLong(NO_CURRENT_THREAD);
    privatefinal AtomicInteger refcount = new AtomicInteger(0);
    privatevoidacquire(){
    ensureNotClosed();
    long threadId = Thread.currentThread().getId();
    // 第一次進入會賦值第一個進入的執行緒,後續有其他執行緒進入就會報錯
    if (threadId != currentThread.get() && !currentThread.compareAndSet(NO_CURRENT_THREAD, threadId))
    thrownew ConcurrentModificationException("KafkaConsumer is not safe for multi-threaded access");
    refcount.incrementAndGet();
    }


  • pollOnce方法會先透過ConsumerCoordinator與GroupCoordinator完成Rebalance操作,之後從GroupCoordinator獲取最新的offset,最後才是fetch訊息

  • private Map<TopicPartition, List<ConsumerRecord<K, V>>> pollOnce(long timeout) {
    // 1\. 尋找GroupCoordinator,如果需要發送GroupCoordinatorRequest,則啟用定時HeartbeatRequet任務
    coordinator.ensureCoordinatorReady();
    // 2\. 完成rebalance,如果需要rebalance,需要發送JoinGroupRequest, SyncGroupRequest(這裏會有分區分配結果)
    if (subscriptions.partitionsAutoAssigned())
    coordinator.ensurePartitionAssignment();
    // 3\. 更新fetch位置
    if (!subscriptions.hasAllFetchPositions())
    updateFetchPositions(this.subscriptions.missingFetchPositions());
    long now = time.milliseconds();
    // 4\. 執行定時任務,心跳請求,自動送出請求
    client.executeDelayedTasks(now);
    // 5\. 從緩存中獲取資訊
    Map<TopicPartition, List<ConsumerRecord<K, V>>> records = fetcher.fetchedRecords();
    if (!records.isEmpty())
    return records;
    // 6\. 緩存沒有則發送請求,這裏只是標記channel可寫
    fetcher.sendFetches();
    // 7\. 真正發送
    client.poll(timeout, now);
    return fetcher.fetchedRecords();
    }