當前位置: 妍妍網 > 碼農

6 個技術點帶你理解 Kafka 高效能背後的原理

2024-03-15碼農

大家好,這裏是頂尖架構師棧!點選上方關註,添加 星標 」, 一起學習大廠前沿架構!

Kafka 是一款效能非常優秀的訊息佇列,每秒處理的訊息體量可以達到千萬級別。今天來聊一聊 Kafka 高效能背後的技術原理。

1 批次發送

Kafka 收發訊息都是批次進行處理的。我們看一下 Kafka 生產者發送訊息的程式碼:

private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback callback){
 TopicPartition tp = null;
try {
//省略前面程式碼
Callback interceptCallback = new InterceptorCallback<>(callback, this.interceptors, tp);
//把訊息追加到之前緩存的這一批訊息上
RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp, serializedKey,
serializedValue, headers, interceptCallback, remainingWaitMs);
//積累到設定的緩存大小,則發送出去
if (result.batchIsFull || result.newBatchCreated) {
log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), partition);
this.sender.wakeup();
}
return result.future;
// handling exceptions and record the errors;
// for API exceptions return them in the future,
// for other exceptions throw directly
 } catch/**省略 catch 程式碼*/
}

從程式碼中可以看到,生產者呼叫 doSend 方法後,並不會直接把訊息發送出去,而是把訊息緩存起來,緩存訊息量達到配置的批次大小後,才會發送出去。

註意:從上面 accumulator.append 程式碼可以看到,一批訊息屬於同一個 topic 下面的同一個 partition。

Broker 收到訊息後,並不會把批次訊息解析成單條訊息後落盤,而是作為批次訊息進行落盤,同時也會把批次訊息直接同步給其他副本。

消費者拉取訊息,也不會按照單條進行拉取,而是按照批次進行拉取,拉取到一批訊息後,再解析成單條訊息進行消費。

使用批次收發訊息,減輕了客戶端和 Broker 的互動次數,提升了 Broker 處理能力。

2 訊息壓縮

如果訊息體比較大,Kafka 訊息吞吐量要達到千萬級別,網卡支持的網路傳輸頻寬會是一個瓶頸。Kafka 的解決方案是訊息壓縮。發送訊息時,如果增加參數 compression.type,就可以開啟訊息壓縮:

publicstaticvoidmain(String[] args){
 Properties props = new Properties();
 props.put("bootstrap.servers""localhost:9092");
 props.put("key.serializer""org.apache.kafka.common.serialization.StringSerializer");
 props.put("value.serializer""org.apache.kafka.common.serialization.StringSerializer");
//開啟訊息壓縮
 props.put("compression.type""gzip");
 Producer<String, String> producer = new KafkaProducer<>(props);
 ProducerRecord<String, String> record = new ProducerRecord<>("my_topic""key1""value1");
 producer.send(record, new Callback() {
@Override
publicvoidonCompletion(RecordMetadata metadata, Exception exception){
if (exception != null) {
logger.error("sending message error: ", e);
else {
logger.info("sending message successful, Offset: ", metadata.offset());
}
}
 });
 producer.close();
}

如果 compression.type 的值設定為 none,則不開啟壓縮。那訊息是在什麽時候進行壓縮呢?前面提到過,生產者緩存一批訊息後才會發送,在發送這批訊息之前就會進行壓縮,程式碼如下:

public RecordAppendResult append(TopicPartition tp,
long timestamp,
byte[] key,
byte[] value,
Header[] headers,
Callback callback,
long maxTimeToBlock)
throws InterruptedException 
{
// ...
try {
// ...
buffer = free.allocate(size, maxTimeToBlock);
synchronized (dq) {
//...
RecordAppendResult appendResult = tryAppend(timestamp, key, value, headers, callback, dq);
if (appendResult != null) {
// Somebody else found us a batch, return the one we waited for! Hopefully this doesn't happen often...
return appendResult;
}
//這批訊息緩存已滿,這裏進行壓縮
MemoryRecordsBuilder recordsBuilder = recordsBuilder(buffer, maxUsableMagic);
ProducerBatch batch = new ProducerBatch(tp, recordsBuilder, time.milliseconds());
FutureRecordMetadata future = Utils.notNull(batch.tryAppend(timestamp, key, value, headers, callback, time.milliseconds()));
dq.addLast(batch);
incomplete.add(batch);
// Don't deallocate this buffer in the finally block as it's being used in the record batch
buffer = null;
returnnew RecordAppendResult(future, dq.size() > 1 || batch.isFull(), true);
}
 } finally {
if (buffer != null)
free.deallocate(buffer);
appendsInProgress.decrementAndGet();
 }
}

上面的 recordsBuilder 方法最終呼叫了下面 MemoryRecordsBuilder 的構造方法。

publicMemoryRecordsBuilder(ByteBufferOutputStream bufferStream,
byte magic,
CompressionType compressionType,
TimestampType timestampType,
long baseOffset,
long logAppendTime,
long producerId,
short producerEpoch,
int baseSequence,
boolean isTransactional,
boolean isControlBatch,
int partitionLeaderEpoch,
int writeLimit)
{
//省略其他程式碼
this.appendStream = new DataOutputStream(compressionType.wrapForOutput(this.bufferStream, magic));
}

上面的 wrapForOutput 方法會根據配置的壓縮演算法進行壓縮或者選擇不壓縮。目前 Kafka 支持的壓縮演算法包括:gzip、snappy、lz4,從 2.1.0 版本開始,Kafka 支持 Zstandard 演算法。

在 Broker 端,會解壓 header 做一些校驗,但不會解壓訊息體。訊息體的解壓是在消費端,消費者拉取到一批訊息後,首先會進行解壓,然後進行訊息處理。

因為壓縮和解壓都是耗費 CPU 的操作,所以在開啟訊息壓縮時,也要考慮生產者和消費者的 CPU 資源情況。

有了訊息批次收集和壓縮,kafka 生產者發送訊息的過程如下圖:

3 磁盤順序讀寫

順序讀寫省去了尋址的時間,只要一次尋址,就可以連續讀寫。

在固態硬碟上,順序讀寫的效能是隨機讀寫的好幾倍。而在機械硬碟上,尋址時需要移動磁頭,這個機械運動會花費很多時間,因此機械硬碟的順序讀寫效能是隨機讀寫的幾十倍。

Kafka 的 Broker 在寫訊息數據時,首先為每個 Partition 建立一個檔,然後把數據順序地追加到該檔對應的磁盤空間中,如果這個檔寫滿了,就再建立一個新檔繼續追加寫。這樣大大減少了尋址時間,提高了讀寫效能。

4 PageCache

在 Linux 系統中,所有檔 IO 操作都要透過 PageCache,PageCache 是磁盤檔在記憶體中建立的緩存。當應用程式讀寫檔時,並不會直接讀寫磁盤上的檔,而是操作 PageCache。

應用程式寫檔時,都先會把數據寫入 PageCache,然後作業系統定期地將 PageCache 的數據寫到磁盤上。如下圖:

而應用程式在讀取檔數據時,首先會判斷數據是否在 PageCache 中,如果在則直接讀取,如果不在,則讀取磁盤,並且將數據緩存到 PageCache。

Kafka 充分利用了 PageCache 的優勢,當生產者生產訊息的速率和消費者消費訊息的速率差不多時,Kafka 基本可以不用落盤就能完成訊息的傳輸。

5 零拷貝

Kafka Broker 將訊息發送給消費端時,即使命中了 PageCache,也需要將 PageCache 中的數據先復制到應用程式的記憶體空間,然後從應用程式的記憶體空間復制到 Socket 緩存區,將數據發送出去。如下圖:

Kafka 采用了零拷貝技術把數據直接從 PageCache 復制到 Socket 緩沖區中,這樣數據不用復制到使用者態的記憶體空間,同時 DMA 控制器直接完成數據復制,不需要 CPU 參與。如下圖:

Java 零拷貝技術采用 FileChannel.transferTo() 方法,底層呼叫了 sendfile 方法。

6 mmap

Kafka 的日誌檔分為數據檔(.log)和索引檔(.index),Kafka 為了提高索引檔的讀取效能,對索引檔采用了 mmap 記憶體對映,將索引檔對映到行程的記憶體空間,這樣讀取索引檔就不需要從磁盤進行讀取。如下圖:

7 總結

本文介紹了 Kafka 實作高效能用到的關鍵技術,這些技術可以為我們學習和工作提供參考。

END

如喜歡本文,請點選右上角,把文章分享到朋友圈

因公眾號更改推播規則,請點「在看」並加「星標」 第一時間獲取精彩技術分享

IT一線從業者抱團群

致力於幫助廣大開發者提供高效合適的工具,讓大家能夠騰出手做更多創造性的工作,也歡迎大家分享自己公司的內推資訊,相互幫助,一起進步!

組建了程式設計師,架構師,IT從業者交流群,以 交流技術 職位內推 行業探討 為主

加大佬 好友 ,備註"加群"