当前位置: 欣欣网 > 码农

.NET 任务并行库与 System.Threading.Channels

2024-03-25码农


概述: 最近,一位朋友联系了一位朋友,询问了任务并行库 (TPL) 和一些 .NET 代码的使用情况。我询问了代码在做什么,结果发现,对于某些工作负载中的每个条目,它正在执行一些数据库操作和 API 调用。ConcurrentBag我立即想知道是否使用 .NET的(STC)不是一个更好的选择:可能更高的吞吐量和更容易编程。System.Threading.Channels一起来看看吧!第一个想法直观地说,TPL 的细微差别在于,限制并行度意味着在任何给定时间等待的最大任务数由配置的并行度决定(在最后的警告中对此有更多介绍);其余任务将排队。这意味着吞吐量很大程度上取决于正确选择 .MaxDegreeO

最近,一位朋友联系了一位朋友,询问了任务并行库 ( TPL ) 和一些 .NET 代码的使用情况。我询问了代码在做什么,结果发现,对于某些工作负载中的每个条目,它正在执行一些数据库操作和 API 调用。ConcurrentBag

我立即想知道是否使用 .NET的( STC )不是一个更好的选择:可能更高的吞吐量和更容易编程。System.Threading.Channels

一起来看看吧!

第一个想法

直观地说,TPL 的细微差别在于,限制并行度意味着在任何给定时间等待的最大任务数由配置的并行度决定( 在最后的警告中对此有更多介绍 );其余任务将排队。这意味着吞吐量很大程度上取决于正确选择 .MaxDegreeOfParallelism

对于并行执行中存在网络依赖关系的任何工作负载,这意味着工作负载的完成速度将受到很大限制,因为大部分时间可能都花在等待网络 I/O 上。

此外,当使用 TPL 时,必须使用对共享状态的同步访问,这将再次增加等待时间,尤其是随着并行度的增加(我们不会在本文中探讨这个角度)。

因此,让我们构建一个测试用例。

测试设计

测试设计很简单:我们将创建一个包含 100 个项目的工作负载,每个项目都有 10 到 50 毫秒的随机延迟,以模拟一些 I/O:

var workload = Enumerable
.Range(0, 100)
.Select(i => (Index: i, Delay: Random.Shared.Next(10, 50)))
.ToImmutableArray();

帮助程序方法将包装每个测试用例的执行,以提供一些基本检测:

asyncTaskInstrumentedRun(string name, Func<Task> test) {
var threadsAtStart = Process.GetCurrentProcess().Threads.Count;
var timer = newStopwatch();
timer.Start();
awaittest(); // ⭐️ Actual test here.
timer.Stop();
Console.WriteLine($"[{name}] = {timer.ElapsedMilliseconds}ms");
Console.WriteLine($" ⮑ {threadsAtStart} threads at start");
Console.WriteLine($" ⮑ {Process.GetCurrentProcess().Threads.Count} threads at end");
}

现在,我们可以运行一些案例并衡量结果。

使用频道

首先是使用通道:

// Using System.Threading.Channels
awaitInstrumentedRun("Channel", async () => {
var channel = Channel.CreateUnbounded<int>();
asyncTaskRun(ChannelWriter<int> writer, int id, int delay) {
await Task.Delay(delay); // ⭐️ Simulate work
await writer.WriteAsync(id);
}
asyncTaskReceive(ChannelReader<int> reader) {
while (await reader.WaitToReadAsync()) {
if (reader.TryRead(outvar id)) {
// No work here.
}
}
}
var receiveTask = Receive(channel.Reader);
var processingTasks = workload
.AsParallel()
.Select(e => Run(channel.Writer, e.Index, e.Delay));
await Task
.WhenAll(processingTasks)
.ContinueWith(_ => channel.Writer.Complete());
await receiveTask;
});



我们首先创建一个简单的通道和两个方法,它们将保留通道的两端(写入器和读取器)。该方法被调用并返回一组 100 秒,我们将等待该集合,并在完成后,通过调用 通知读取端。RunTaskComplete()

使用 Parallel.For @ 4

接下来,我们将设置 4:Parallel.ForMaxDegreeOfParallelism

// Using Parallel.For with concurrency of 4
awaitInstrumentedRun("Parallel.For @ 4", () => {
Parallel.For(0, 100,
newParallelOptions { MaxDegreeOfParallelism = 4 },
index => {
Thread.Sleep(workload[index].Delay); // ⭐️ Simulate work
}
);
return Task.CompletedTask;
});

请注意,这不支持,因此我们只需在此处使用此处来暂停当前线程以进行配置的延迟。Parallel.Forasync/awaitThread.Sleep

使用 Parallel.ForEachAsync @ 4

接下来,我们将使用一个版本:async

// Using Parallel.ForEachAsync with concurrency of 4
awaitInstrumentedRun("Parallel.ForEachAsync @ 4", async () =>
await Parallel.ForEachAsync(workload,
newParallelOptions { MaxDegreeOfParallelism = 4 },
async (item, cancel) => {
await Task.Delay(item.Delay, cancel); // ⭐️ Simulate work
}
)
);

使用 Parallel.ForEachAsync @ 40

然后,我们将尝试将并行度设置为 40:

// Using Parallel.ForEachAsync with concurrency of 40
awaitInstrumentedRun("Parallel.ForEachAsync @ 40", async () =>
await Parallel.ForEachAsync(workload,
newParallelOptions { MaxDegreeOfParallelism = 40 },
async (item, cancel) => {
await Task.Delay(item.Delay, cancel); // ⭐️ Simulate work
}
)
);

将 Parallel.ForEachAsync 与默认值一起使用

最后,我们将尝试使用默认值:

// Using Parallel.ForEachAsync with concurrency unset
awaitInstrumentedRun("Parallel.ForEachAsync (Default)", async () =>
await Parallel.ForEachAsync(workload, async (item, cancel) => {
await Task.Delay(item.Delay, cancel); // ⭐️ Simulate work
})
);

让我们来看看结果和分析:

结果

每次运行都会略有不同,因为工作负载的初始化是随机的。下面是一次运行的输出:

这是你所期望的吗?

如果不是,那么如果你追求原始性能,那么你可能需要考虑 TPL 或 STC 是否是更好的并发范例(除非你真的知道如何调整你的 )。MaxDegreeOfParallelism

在 4 度并行度下,STC 实现的速度几乎是 TPL 实现_ 的 12 倍 _。当然,从直观上讲,这是有道理的:由于设置为 4 时一次只能执行 4 个任务,因此任务中的任何 I/O 都会导致其余 96 个任务排队。另一方面,STC 实现会立即执行全部工作负载,直到每个任务达到 I/O。MaxDegreeOfParallelism

即使在 40 度的并行度下,TPL 实现也必然比 STC 实现慢。由于我们总共有 100 个任务需要处理,这意味着在前 40 个进入等待状态后,剩余的 60 个将排队。STC 实现有效地执行所有 100 个任务,直到它们达到等待状态。

可能有点令人惊讶的是线程数:它永远不会超过 19。顾名思义,「任务并行库」实际上就是使用进程线程池处理_任务_,即使我们将 40 设置为 40,进程也不会生成相应数量的线程。换句话说,它是一种并发-并行混合体。MaxDegreeOfParallelism

了解并发、并行和混合之间的区别。通过 Open classrooms.com 的图表

(值得自己运行此代码并将案例的顺序切换到最后一个,看看这对线程数有何影响)。Channel

注意事项和结论

这里需要注意的是,STC方法可以有效地_运行一切_。这意味着它可能不适用于上游系统(API 或数据库)无法处理突然爆发的请求或某些资源存在请求配额或争用(例如数据库中的写入锁定)的情况。在这种情况下,TPL 可能是实现某种「受限制」的并发请求处理的合适方法。

另一个需要考虑的因素是同步访问。STC 方法的优点在于它有效地序列化了数据流,并且可以像单线程一样处理循环中的输出(因为它是单线程的)。我认为这简化了编程模型(也许也更容易调试),并且在 TPL 情况下写入同步状态可能会带来额外的成本,尤其是在_并行性_增加的情况下。Receive

通过引入两个串行,也可以实现受限制的 STC 实现,因此,如果工作负载的某些部分应该受到限制,则可以尽快立即执行工作负载的未限制部分,然后控制工作负载中受限制部分的流。Channel

结论是,如果工作负载使用的是任务并行库,则了解实现目标限制行为的正确优化非常重要。不显式_设置并行性似乎是两全其美的,因为您没有显式限制工作负载,但同时限制了吞吐量_。在不涉及或不需要限制的情况下,使用似乎是一个更好的选择,既能提供更好的性能,又能提供更简单的_无同步_编程模型。

附录 — 完整代码

// Generate a set of 100 records, each with a random wait interval.
usingSystem.Collections.Immutable;
usingSystem.Diagnostics;
usingSystem.Threading.Channels;
var log = (object msg) => Console.WriteLine(msg);
var workload = Enumerable
.Range(0, 100)
.Select(i => (Index: i, Delay: Random.Shared.Next(10, 50)))
.ToImmutableArray();
// Using System.Threading.Channels
awaitInstrumentedRun("Channel", async () => {
var channel = Channel.CreateUnbounded<int>();
asyncTaskRun(ChannelWriter<int> writer, int id, int delay) {
await Task.Delay(delay);
await writer.WriteAsync(id);
}
asyncTaskReceive(ChannelReader<int> reader) {
while (await reader.WaitToReadAsync()) {
if (reader.TryRead(outvar id)) {
// No work here.
//log($" Completed {id}");
}
}
}
var receiveTask = Receive(channel.Reader);
var processingTasks = workload
.AsParallel()
.Select(e => Run(channel.Writer, e.Index, e.Delay));
await Task
.WhenAll(processingTasks)
.ContinueWith(_ => channel.Writer.Complete());
await receiveTask;
});
// Using Parallel.For with concurrency of 4
awaitInstrumentedRun("Parallel.For @ 4", () => {
Parallel.For(0, 100, newParallelOptions { MaxDegreeOfParallelism = 4 }, (index) => {
Thread.Sleep(workload[index].Delay);
});
return Task.CompletedTask;
});
// Using Parallel.ForEachAsync with concurrency of 4
awaitInstrumentedRun("Parallel.ForEachAsync @ 4", async () =>
await Parallel.ForEachAsync(workload, newParallelOptions { MaxDegreeOfParallelism = 4 }, async (item, cancel) => {
await Task.Delay(item.Delay, cancel);
})
);
// Using Parallel.ForEachAsync with concurrency of 40
awaitInstrumentedRun("Parallel.ForEachAsync @ 40", async () =>
await Parallel.ForEachAsync(workload, newParallelOptions { MaxDegreeOfParallelism = 40 }, async (item, cancel) => {
await Task.Delay(item.Delay, cancel);
})
);
// Using Parallel.ForEachAsync with concurrency unset
awaitInstrumentedRun("Parallel.ForEachAsync (Default)", async () =>
await Parallel.ForEachAsync(workload, async (item, cancel) => {
await Task.Delay(item.Delay, cancel);
})
);
/*-----------------------------------------------------------
* Supporting functions
---------------------------------------------------------*/

asyncTaskInstrumentedRun(string name, Func<Task> test) {
var threadsAtStart = Process.GetCurrentProcess().Threads.Count;
var timer = newStopwatch();
timer.Start();
awaittest();
timer.Stop();
Console.WriteLine($"[{name}] = {timer.ElapsedMilliseconds}ms");
Console.WriteLine($" ⮑ {threadsAtStart} threads at start");
Console.WriteLine($" ⮑ {Process.GetCurrentProcess().Threads.Count} threads at end");
}
/*
YMMV since each run uses a random workload.
[Channel] = 68ms
⮑ 8 threads at start
⮑ 19 threads at end
[Parallel.For @ 4] = 799ms
⮑ 19 threads at start
⮑ 19 threads at end
[Parallel.ForEachAsync @ 4] = 754ms
⮑ 19 threads at start
⮑ 19 threads at end
[Parallel.ForEachAsync @ 40] = 100ms
⮑ 19 threads at start
⮑ 19 threads at end
[Parallel.ForEachAsync (Default)] = 384ms
⮑ 19 threads at start
⮑ 19 threads at end
*/