当前位置: 欣欣网 > 码农

C# 如何实现一个事件总线

2024-02-17码农

E ven t Bus(事件总线)是一种用于在应用程序内部或跨应用程序组件之间进行事件通信的机制。

它允许不同的组件通过发布和订阅事件来进行解耦和通信。在给定的代码片段中,我们可以看到一个使用C#实现的Event Bus。它定义了一些接口和类来实现事件的发布和订阅。

首先,我们有两个基本的约束接口: IEvent IAsyncEventHandler<TEvent>

IEvent是一个空接口,用于约束事件的类型。 IAsyncEventHandler<TEvent> 是一个泛型接口,用于约束事件处理程序的类型。 它定义了处理事件的异步方法HandleAsync和处理异常的方法HandleException。接下来,我们有一个IEventBus接口,它定义了一些操作方法用于发布和订阅事件。

其中, Publish<TEvent> PublishAsync<TEvent> 方法用于发布事件,而 OnSubscribe<TEvent> 方法用于订阅事件。然后,我们看到一个实现了本地事件总线的类 LocalEventBusManager<TEvent> 。它实现了 ILocalEventBusManager<TEvent> 接口,用于在单一管道内处理本地事件。它使用了一个 Channel<TEvent> 来存储事件,并提供了发布事件的方法 Publish PublishAsync 。此外,它还提供了一个自动处理事件的方法 AutoHandle

总的来说 Event Bus 提供了一种方便的方式来实现组件之间的松耦合通信。

通过发布和订阅事件,组件可以独立地进行操作,而不需要直接依赖于彼此的实现细节。

这种机制可以提高代码的可维护性和可扩展性。

Github仓库地址:https://github.com/DonPangPang/soda-event-bus

实现一些基本约束

先实现一些约束,实现 IEvent 约束事件,实现 IAsyncEvnetHandler<TEvent> where TEvent:IEvent 来约束事件的处理程序。

publicinterfaceIEvent
{
}
publicinterfaceIAsyncEventHandler<inTEventwhereTEvent : IEvent
{
Task HandleAsync(IEvent @event);
voidHandleException(IEvent @event, Exception ex);
}

接下来规定一下咱们的 IEventBus ,会有哪些操作方法。基本就是发布和订阅。

publicinterfaceIEventBus
{
void Publish<TEvent>(TEvent @eventwhere TEvent : IEvent;
Task PublishAsync<TEvent>(TEvent @eventwhere TEvent : IEvent;
void OnSubscribe<TEvent>() where TEvent : IEvent;
}

实现一个本地事件总线

本地事件处理

本地事件的处理我打算采用两种方式实现,一种是 LocalEventBusManager 即本地事件管理,第二种是 LocalEventBusPool 池化本地事件。

LocalEvnetBusManager

LocalEventBusManager 主要在单一管道内进行处理,集中进行消费。

publicinterfaceILocalEventBusManager<inTEvent>whereTEvent : IEvent
{
voidPublish(TEvent @event);
Task PublishAsync(TEvent @event) ;
voidAutoHandle();
}
public classLocalEventBusManager<TEvent>(IServiceProviderserviceProvider):ILocalEventBusManager<TEvent>
whereTEventIEvent
{
readonly IServiceProvider _servicesProvider = serviceProvider;
privatereadonly Channel<TEvent> _eventChannel = Channel.CreateUnbounded<TEvent>();
publicvoidPublish(TEvent @event)
{
Debug.Assert(_eventChannel != nullnameof(_eventChannel) + " != null");
_eventChannel.Writer.WriteAsync(@event);
}
private CancellationTokenSource Cts { get; } = new();
publicvoidCancel()
{
Cts.Cancel();
}
publicasync Task PublishAsync(TEvent @event)
{
await _eventChannel.Writer.WriteAsync(@event);
}
publicvoidAutoHandle()
{
// 确保只启动一次
if (!Cts.IsCancellationRequested) return;
Task.Run(async () =>
{
while (!Cts.IsCancellationRequested)
{
var reader = await _eventChannel.Reader.ReadAsync();
await HandleAsync(reader);
}
}, Cts.Token);
}
async Task HandleAsync(TEvent @event)
{
var handler = _servicesProvider.GetService<IAsyncEventHandler<TEvent>>();
if (handler isnull)
{
thrownew NullReferenceException($"No handler for event {@event.GetType().Name}");
}
try
{
await handler.HandleAsync(@event);
}
catch (Exception ex)
{
handler.HandleException( @event, ex);
}
}
}









LocalEventBusPool

LocalEventBusPool 即所有的Event都会有一个单独的管道处理,单独消费处理,并行能力更好一些。

publicsealed class LocalEventBusPool(IServiceProvider serviceProvider)
{
privatereadonly IServiceProvider _serviceProvider = serviceProvider;
private classChannelKey
{
public required string Key { get; init; }
publicint Subscribers { getset; }
publicoverrideboolEquals(object? obj)
{
if (obj is ChannelKey key)
{
returnstring.Equals(key.Key, Key, StringComparison.OrdinalIgnoreCase);
}
returnfalse;
}
publicoverrideintGetHashCode()
{
return0;
}
}
private Channel<IEvent> Rent(string channel)
{
_channels.TryGetValue(new ChannelKey() { Key = channel }, outvarvalue);
if (value != nullreturnvalue;
value = Channel.CreateUnbounded<IEvent>();
_channels.TryAdd(new ChannelKey() { Key = channel }, value);
returnvalue;
}
private Channel<IEvent> Rent(ChannelKey channelKey)
{
_channels.TryGetValue(channelKey, outvarvalue);
if (value != nullreturnvalue;
value = Channel.CreateUnbounded<IEvent>();
_channels.TryAdd(channelKey, value);
returnvalue;
}
privatereadonly ConcurrentDictionary<ChannelKey, Channel<IEvent>> _channels = new();
private CancellationTokenSource Cts { get; } = new();
publicvoidCancel()
{
Cts.Cancel();
_channels.Clear();
Cts.TryReset();
}
publicasync Task PublishAsync<TEvent>(TEvent @eventwhere TEvent : IEvent
{
await Rent(typeof(TEvent).Name).Writer.WriteAsync(@event);
}
publicvoid Publish<TEvent>(TEvent @eventwhere TEvent : IEvent
{
Rent(typeof(TEvent).Name).Writer.TryWrite(@event);
}
publicvoid OnSubscribe<TEvent>() where TEvent : IEvent
{
var channelKey = _channels.FirstOrDefault(x => x.Key.Key == typeof(TEvent).Name).Key ??
new ChannelKey() { Key = typeof(TEvent).Name };
channelKey.Subscribers++;
Task.Run(async () =>
{
try
{
while (!Cts.IsCancellationRequested)
{
var @event = await ReadAsync(channelKey);
var handler = _serviceProvider.GetService<IAsyncEventHandler<TEvent>>();
if (handler == nullthrownew NullReferenceException($"No handler for Event {typeof(TEvent).Name}");
try
{
await handler.HandleAsync((TEvent)@event);
}
catch (Exception ex)
{
handler.HandleException((TEvent)@event, ex);
}
}
}
catch (Exception e)
{
thrownew InvalidOperationException("Error on onSubscribe handler", e);
}
}, Cts.Token);
}
privateasync Task<IEvent> ReadAsync(string channel)
{
returnawait Rent(channel).Reader.ReadAsync(Cts.Token);
}
privateasync Task<IEvent> ReadAsync(ChannelKey channel)
{
returnawait Rent(channel).Reader.ReadAsync(Cts.Token);
}
}















LocalEventBus

实现 LocalEventBus 继承自 IEventBus 即可,如果有需要扩展的方法自行添加,池化和管理器的情况单独处理。

publicinterfaceILocalEventBusIEventBus
{
}
public class LocalEventBus(IServiceProvider serviceProvider, LocalEventBusOptions options) : ILocalEventBus
{
private LocalEventBusPool? EventBusPool => serviceProvider.GetService<LocalEventBusPool>();

publicvoid Publish<TEvent>(TEvent @eventwhere TEvent : IEvent
{
if (options.Pool)
{
Debug.Assert(EventBusPool != nullnameof(EventBusPool) + " != null");
EventBusPool.Publish(@event);
}
else
{
var manager = serviceProvider.GetService<LocalEventBusManager<TEvent>>();
if (manager isnullthrownew NullReferenceException($"No manager for event {typeof(TEvent).Name}, please add singleton service it.");
manager.Publish(@event);
}
}
publicasync Task PublishAsync<TEvent>(TEvent @eventwhere TEvent : IEvent
{
if (options.Pool)
{
Debug.Assert(EventBusPool != nullnameof(EventBusPool) + " != null");
await EventBusPool.PublishAsync(@event);
}
else
{
var manager = serviceProvider.GetService<LocalEventBusManager<TEvent>>();
if (manager isnullthrownew NullReferenceException($"No manager for event {typeof(TEvent).Name}, please add singleton service it.");
await manager.PublishAsync(@event);
}
}
publicvoid OnSubscribe<TEvent>() where TEvent : IEvent
{
if (options.Pool)
{
Debug.Assert(EventBusPool != nullnameof(EventBusPool) + " != null");
EventBusPool.OnSubscribe<TEvent>();
}
else
{
var manager = serviceProvider.GetService<LocalEventBusManager<TEvent>>();
if (manager isnullthrownew NullReferenceException($"No manager for event {typeof(TEvent).Name}, please add singleton service it.");
manager.AutoHandle();
}
}
}


分布式事件总线

根据需要扩展即可,基本逻辑相同,但可能需要增加确认机制等。

转自:胖纸不争

链接:cnblogs.com/donpangpang/p/17939849

- EOF -

技术群: 添加小编微信并备注进群

小编微信:mm1552923

公众号:dotNet编程大全