當前位置: 妍妍網 > 碼農

.NET 任務並列庫與 System.Threading.Channels

2024-03-25碼農


概述: 最近,一位朋友聯系了一位朋友,詢問了任務並列庫 (TPL) 和一些 .NET 程式碼的使用情況。我詢問了程式碼在做什麽,結果發現,對於某些工作負載中的每個條目,它正在執行一些資料庫操作和 API 呼叫。ConcurrentBag我立即想知道是否使用 .NET的(STC)不是一個更好的選擇:可能更高的吞吐量和更容易編程。System.Threading.Channels一起來看看吧!第一個想法直觀地說,TPL 的細微差別在於,限制並列度意味著在任何給定時間等待的最大任務數由配置的並列度決定(在最後的警告中對此有更多介紹);其余任務將排隊。這意味著吞吐量很大程度上取決於正確選擇 .MaxDegreeO

最近,一位朋友聯系了一位朋友,詢問了任務並列庫 ( TPL ) 和一些 .NET 程式碼的使用情況。我詢問了程式碼在做什麽,結果發現,對於某些工作負載中的每個條目,它正在執行一些資料庫操作和 API 呼叫。ConcurrentBag

我立即想知道是否使用 .NET的( STC )不是一個更好的選擇:可能更高的吞吐量和更容易編程。System.Threading.Channels

一起來看看吧!

第一個想法

直觀地說,TPL 的細微差別在於,限制並列度意味著在任何給定時間等待的最大任務數由配置的並列度決定( 在最後的警告中對此有更多介紹 );其余任務將排隊。這意味著吞吐量很大程度上取決於正確選擇 .MaxDegreeOfParallelism

對於並列執行中存在網路依賴關系的任何工作負載,這意味著工作負載的完成速度將受到很大限制,因為大部份時間可能都花在等待網路 I/O 上。

此外,當使用 TPL 時,必須使用對共享狀態的同步存取,這將再次增加等待時間,尤其是隨著並列度的增加(我們不會在本文中探討這個角度)。

因此,讓我們構建一個測試用例。

測試設計

測試設計很簡單:我們將建立一個包含 100 個計畫的工作負載,每個計畫都有 10 到 50 毫秒的隨機延遲,以模擬一些 I/O:

var workload = Enumerable
.Range(0, 100)
.Select(i => (Index: i, Delay: Random.Shared.Next(10, 50)))
.ToImmutableArray();

幫助程式方法將包裝每個測試用例的執行,以提供一些基本檢測:

asyncTaskInstrumentedRun(string name, Func<Task> test) {
var threadsAtStart = Process.GetCurrentProcess().Threads.Count;
var timer = newStopwatch();
timer.Start();
awaittest(); // ⭐️ Actual test here.
timer.Stop();
Console.WriteLine($"[{name}] = {timer.ElapsedMilliseconds}ms");
Console.WriteLine($" ⮑ {threadsAtStart} threads at start");
Console.WriteLine($" ⮑ {Process.GetCurrentProcess().Threads.Count} threads at end");
}

現在,我們可以執行一些案例並衡量結果。

使用頻道

首先是使用通道:

// Using System.Threading.Channels
awaitInstrumentedRun("Channel", async () => {
var channel = Channel.CreateUnbounded<int>();
asyncTaskRun(ChannelWriter<int> writer, int id, int delay) {
await Task.Delay(delay); // ⭐️ Simulate work
await writer.WriteAsync(id);
}
asyncTaskReceive(ChannelReader<int> reader) {
while (await reader.WaitToReadAsync()) {
if (reader.TryRead(outvar id)) {
// No work here.
}
}
}
var receiveTask = Receive(channel.Reader);
var processingTasks = workload
.AsParallel()
.Select(e => Run(channel.Writer, e.Index, e.Delay));
await Task
.WhenAll(processingTasks)
.ContinueWith(_ => channel.Writer.Complete());
await receiveTask;
});



我們首先建立一個簡單的通道和兩個方法,它們將保留通道的兩端(寫入器和讀取器)。該方法被呼叫並返回一組 100 秒,我們將等待該集合,並在完成後,透過呼叫 通知讀取端。RunTaskComplete()

使用 Parallel.For @ 4

接下來,我們將設定 4:Parallel.ForMaxDegreeOfParallelism

// Using Parallel.For with concurrency of 4
awaitInstrumentedRun("Parallel.For @ 4", () => {
Parallel.For(0, 100,
newParallelOptions { MaxDegreeOfParallelism = 4 },
index => {
Thread.Sleep(workload[index].Delay); // ⭐️ Simulate work
}
);
return Task.CompletedTask;
});

請註意,這不支持,因此我們只需在此處使用此處來暫停當前執行緒以進行配置的延遲。Parallel.Forasync/awaitThread.Sleep

使用 Parallel.ForEachAsync @ 4

接下來,我們將使用一個版本:async

// Using Parallel.ForEachAsync with concurrency of 4
awaitInstrumentedRun("Parallel.ForEachAsync @ 4", async () =>
await Parallel.ForEachAsync(workload,
newParallelOptions { MaxDegreeOfParallelism = 4 },
async (item, cancel) => {
await Task.Delay(item.Delay, cancel); // ⭐️ Simulate work
}
)
);

使用 Parallel.ForEachAsync @ 40

然後,我們將嘗試將並列度設定為 40:

// Using Parallel.ForEachAsync with concurrency of 40
awaitInstrumentedRun("Parallel.ForEachAsync @ 40", async () =>
await Parallel.ForEachAsync(workload,
newParallelOptions { MaxDegreeOfParallelism = 40 },
async (item, cancel) => {
await Task.Delay(item.Delay, cancel); // ⭐️ Simulate work
}
)
);

將 Parallel.ForEachAsync 與預設值一起使用

最後,我們將嘗試使用預設值:

// Using Parallel.ForEachAsync with concurrency unset
awaitInstrumentedRun("Parallel.ForEachAsync (Default)", async () =>
await Parallel.ForEachAsync(workload, async (item, cancel) => {
await Task.Delay(item.Delay, cancel); // ⭐️ Simulate work
})
);

讓我們來看看結果和分析:

結果

每次執行都會略有不同,因為工作負載的初始化是隨機的。下面是一次執行的輸出:

這是你所期望的嗎?

如果不是,那麽如果你追求原始效能,那麽你可能需要考慮 TPL 或 STC 是否是更好的並行範例(除非你真的知道如何調整你的 )。MaxDegreeOfParallelism

在 4 度並列度下,STC 實作的速度幾乎是 TPL 實作_ 的 12 倍 _。當然,從直觀上講,這是有道理的:由於設定為 4 時一次只能執行 4 個任務,因此任務中的任何 I/O 都會導致其余 96 個任務排隊。另一方面,STC 實作會立即執行全部工作負載,直到每個任務達到 I/O。MaxDegreeOfParallelism

即使在 40 度的並列度下,TPL 實作也必然比 STC 實作慢。由於我們總共有 100 個任務需要處理,這意味著在前 40 個進入等待狀態後,剩余的 60 個將排隊。STC 實作有效地執行所有 100 個任務,直到它們達到等待狀態。

可能有點令人驚訝的是執行緒數:它永遠不會超過 19。顧名思義,「任務並列庫」實際上就是使用行程執行緒池處理_任務_,即使我們將 40 設定為 40,行程也不會生成相應數量的執行緒。換句話說,它是一種並行-並列混合體。MaxDegreeOfParallelism

了解並行、並列和混合之間的區別。透過 Open classrooms.com 的圖表

(值得自己執行此程式碼並將案例的順序切換到最後一個,看看這對執行緒數有何影響)。Channel

註意事項和結論

這裏需要註意的是,STC方法可以有效地_執行一切_。這意味著它可能不適用於上遊系統(API 或資料庫)無法處理突然爆發的請求或某些資源存在請求配額或爭用(例如資料庫中的寫入釘選)的情況。在這種情況下,TPL 可能是實作某種「受限制」的並行請求處理的合適方法。

另一個需要考慮的因素是同步存取。STC 方法的優點在於它有效地序列化了數據流,並且可以像單執行緒一樣處理迴圈中的輸出(因為它是單執行緒的)。我認為這簡化了編程模型(也許也更容易偵錯),並且在 TPL 情況下寫入同步狀態可能會帶來額外的成本,尤其是在_並列性_增加的情況下。Receive

透過引入兩個序列,也可以實作受限制的 STC 實作,因此,如果工作負載的某些部份應該受到限制,則可以盡快立即執行工作負載的未限制部份,然後控制工作負載中受限制部份的流。Channel

結論是,如果工作負載使用的是任務並列庫,則了解實作目標限制行為的正確最佳化非常重要。不顯式_設定並列性似乎是兩全其美的,因為您沒有顯式限制工作負載,但同時限制了吞吐量_。在不涉及或不需要限制的情況下,使用似乎是一個更好的選擇,既能提供更好的效能,又能提供更簡單的_無同步_編程模型。

附錄 — 完整程式碼

// Generate a set of 100 records, each with a random wait interval.
usingSystem.Collections.Immutable;
usingSystem.Diagnostics;
usingSystem.Threading.Channels;
var log = (object msg) => Console.WriteLine(msg);
var workload = Enumerable
.Range(0, 100)
.Select(i => (Index: i, Delay: Random.Shared.Next(10, 50)))
.ToImmutableArray();
// Using System.Threading.Channels
awaitInstrumentedRun("Channel", async () => {
var channel = Channel.CreateUnbounded<int>();
asyncTaskRun(ChannelWriter<int> writer, int id, int delay) {
await Task.Delay(delay);
await writer.WriteAsync(id);
}
asyncTaskReceive(ChannelReader<int> reader) {
while (await reader.WaitToReadAsync()) {
if (reader.TryRead(outvar id)) {
// No work here.
//log($" Completed {id}");
}
}
}
var receiveTask = Receive(channel.Reader);
var processingTasks = workload
.AsParallel()
.Select(e => Run(channel.Writer, e.Index, e.Delay));
await Task
.WhenAll(processingTasks)
.ContinueWith(_ => channel.Writer.Complete());
await receiveTask;
});
// Using Parallel.For with concurrency of 4
awaitInstrumentedRun("Parallel.For @ 4", () => {
Parallel.For(0, 100, newParallelOptions { MaxDegreeOfParallelism = 4 }, (index) => {
Thread.Sleep(workload[index].Delay);
});
return Task.CompletedTask;
});
// Using Parallel.ForEachAsync with concurrency of 4
awaitInstrumentedRun("Parallel.ForEachAsync @ 4", async () =>
await Parallel.ForEachAsync(workload, newParallelOptions { MaxDegreeOfParallelism = 4 }, async (item, cancel) => {
await Task.Delay(item.Delay, cancel);
})
);
// Using Parallel.ForEachAsync with concurrency of 40
awaitInstrumentedRun("Parallel.ForEachAsync @ 40", async () =>
await Parallel.ForEachAsync(workload, newParallelOptions { MaxDegreeOfParallelism = 40 }, async (item, cancel) => {
await Task.Delay(item.Delay, cancel);
})
);
// Using Parallel.ForEachAsync with concurrency unset
awaitInstrumentedRun("Parallel.ForEachAsync (Default)", async () =>
await Parallel.ForEachAsync(workload, async (item, cancel) => {
await Task.Delay(item.Delay, cancel);
})
);
/*-----------------------------------------------------------
* Supporting functions
---------------------------------------------------------*/

asyncTaskInstrumentedRun(string name, Func<Task> test) {
var threadsAtStart = Process.GetCurrentProcess().Threads.Count;
var timer = newStopwatch();
timer.Start();
awaittest();
timer.Stop();
Console.WriteLine($"[{name}] = {timer.ElapsedMilliseconds}ms");
Console.WriteLine($" ⮑ {threadsAtStart} threads at start");
Console.WriteLine($" ⮑ {Process.GetCurrentProcess().Threads.Count} threads at end");
}
/*
YMMV since each run uses a random workload.
[Channel] = 68ms
⮑ 8 threads at start
⮑ 19 threads at end
[Parallel.For @ 4] = 799ms
⮑ 19 threads at start
⮑ 19 threads at end
[Parallel.ForEachAsync @ 4] = 754ms
⮑ 19 threads at start
⮑ 19 threads at end
[Parallel.ForEachAsync @ 40] = 100ms
⮑ 19 threads at start
⮑ 19 threads at end
[Parallel.ForEachAsync (Default)] = 384ms
⮑ 19 threads at start
⮑ 19 threads at end
*/