E
ven
t Bus(事件总线)是一种用于在应用程序内部或跨应用程序组件之间进行事件通信的机制。
它允许不同的组件通过发布和订阅事件来进行解耦和通信。在给定的代码片段中,我们可以看到一个使用C#实现的Event Bus。它定义了一些接口和类来实现事件的发布和订阅。
首先,我们有两个基本的约束接口:
IEvent
和
IAsyncEventHandler<TEvent>
。
IEvent是一个空接口,用于约束事件的类型。
IAsyncEventHandler<TEvent>
是一个泛型接口,用于约束事件处理程序的类型。
它定义了处理事件的异步方法HandleAsync和处理异常的方法HandleException。接下来,我们有一个IEventBus接口,它定义了一些操作方法用于发布和订阅事件。
其中,
Publish<TEvent>
和
PublishAsync<TEvent>
方法用于发布事件,而
OnSubscribe<TEvent>
方法用于订阅事件。然后,我们看到一个实现了本地事件总线的类
LocalEventBusManager<TEvent>
。它实现了
ILocalEventBusManager<TEvent>
接口,用于在单一管道内处理本地事件。它使用了一个
Channel<TEvent>
来存储事件,并提供了发布事件的方法
Publish
和
PublishAsync
。此外,它还提供了一个自动处理事件的方法
AutoHandle
。
总的来说
Event Bus
提供了一种方便的方式来实现组件之间的松耦合通信。
通过发布和订阅事件,组件可以独立地进行操作,而不需要直接依赖于彼此的实现细节。
这种机制可以提高代码的可维护性和可扩展性。
Github仓库地址:https://github.com/DonPangPang/soda-event-bus
实现一些基本约束
先实现一些约束,实现
IEvent
约束事件,实现
IAsyncEvnetHandler<TEvent> where TEvent:IEvent
来约束事件的处理程序。
publicinterfaceIEvent
{
}
publicinterfaceIAsyncEventHandler<inTEvent> whereTEvent : IEvent
{
Task HandleAsync(IEvent @event);
voidHandleException(IEvent @event, Exception ex);
}
接下来规定一下咱们的
IEventBus
,会有哪些操作方法。基本就是发布和订阅。
publicinterfaceIEventBus
{
void Publish<TEvent>(TEvent @event) where TEvent : IEvent;
Task PublishAsync<TEvent>(TEvent @event) where TEvent : IEvent;
void OnSubscribe<TEvent>() where TEvent : IEvent;
}
实现一个本地事件总线
本地事件处理
本地事件的处理我打算采用两种方式实现,一种是
LocalEventBusManager
即本地事件管理,第二种是
LocalEventBusPool
池化本地事件。
LocalEvnetBusManager
LocalEventBusManager
主要在单一管道内进行处理,集中进行消费。
publicinterfaceILocalEventBusManager<inTEvent>whereTEvent : IEvent
{
voidPublish(TEvent @event);
Task PublishAsync(TEvent @event) ;
voidAutoHandle();
}
public classLocalEventBusManager<TEvent>(IServiceProviderserviceProvider):ILocalEventBusManager<TEvent>
whereTEvent: IEvent
{
readonly IServiceProvider _servicesProvider = serviceProvider;
privatereadonly Channel<TEvent> _eventChannel = Channel.CreateUnbounded<TEvent>();
publicvoidPublish(TEvent @event)
{
Debug.Assert(_eventChannel != null, nameof(_eventChannel) + " != null");
_eventChannel.Writer.WriteAsync(@event);
}
private CancellationTokenSource Cts { get; } = new();
publicvoidCancel()
{
Cts.Cancel();
}
publicasync Task PublishAsync(TEvent @event)
{
await _eventChannel.Writer.WriteAsync(@event);
}
publicvoidAutoHandle()
{
// 确保只启动一次
if (!Cts.IsCancellationRequested) return;
Task.Run(async () =>
{
while (!Cts.IsCancellationRequested)
{
var reader = await _eventChannel.Reader.ReadAsync();
await HandleAsync(reader);
}
}, Cts.Token);
}
async Task HandleAsync(TEvent @event)
{
var handler = _servicesProvider.GetService<IAsyncEventHandler<TEvent>>();
if (handler isnull)
{
thrownew NullReferenceException($"No handler for event {@event.GetType().Name}");
}
try
{
await handler.HandleAsync(@event);
}
catch (Exception ex)
{
handler.HandleException( @event, ex);
}
}
}
LocalEventBusPool
LocalEventBusPool
即所有的Event都会有一个单独的管道处理,单独消费处理,并行能力更好一些。
publicsealed class LocalEventBusPool(IServiceProvider serviceProvider)
{
privatereadonly IServiceProvider _serviceProvider = serviceProvider;
private classChannelKey
{
public required string Key { get; init; }
publicint Subscribers { get; set; }
publicoverrideboolEquals(object? obj)
{
if (obj is ChannelKey key)
{
returnstring.Equals(key.Key, Key, StringComparison.OrdinalIgnoreCase);
}
returnfalse;
}
publicoverrideintGetHashCode()
{
return0;
}
}
private Channel<IEvent> Rent(string channel)
{
_channels.TryGetValue(new ChannelKey() { Key = channel }, outvarvalue);
if (value != null) returnvalue;
value = Channel.CreateUnbounded<IEvent>();
_channels.TryAdd(new ChannelKey() { Key = channel }, value);
returnvalue;
}
private Channel<IEvent> Rent(ChannelKey channelKey)
{
_channels.TryGetValue(channelKey, outvarvalue);
if (value != null) returnvalue;
value = Channel.CreateUnbounded<IEvent>();
_channels.TryAdd(channelKey, value);
returnvalue;
}
privatereadonly ConcurrentDictionary<ChannelKey, Channel<IEvent>> _channels = new();
private CancellationTokenSource Cts { get; } = new();
publicvoidCancel()
{
Cts.Cancel();
_channels.Clear();
Cts.TryReset();
}
publicasync Task PublishAsync<TEvent>(TEvent @event) where TEvent : IEvent
{
await Rent(typeof(TEvent).Name).Writer.WriteAsync(@event);
}
publicvoid Publish<TEvent>(TEvent @event) where TEvent : IEvent
{
Rent(typeof(TEvent).Name).Writer.TryWrite(@event);
}
publicvoid OnSubscribe<TEvent>() where TEvent : IEvent
{
var channelKey = _channels.FirstOrDefault(x => x.Key.Key == typeof(TEvent).Name).Key ??
new ChannelKey() { Key = typeof(TEvent).Name };
channelKey.Subscribers++;
Task.Run(async () =>
{
try
{
while (!Cts.IsCancellationRequested)
{
var @event = await ReadAsync(channelKey);
var handler = _serviceProvider.GetService<IAsyncEventHandler<TEvent>>();
if (handler == null) thrownew NullReferenceException($"No handler for Event {typeof(TEvent).Name}");
try
{
await handler.HandleAsync((TEvent)@event);
}
catch (Exception ex)
{
handler.HandleException((TEvent)@event, ex);
}
}
}
catch (Exception e)
{
thrownew InvalidOperationException("Error on onSubscribe handler", e);
}
}, Cts.Token);
}
privateasync Task<IEvent> ReadAsync(string channel)
{
returnawait Rent(channel).Reader.ReadAsync(Cts.Token);
}
privateasync Task<IEvent> ReadAsync(ChannelKey channel)
{
returnawait Rent(channel).Reader.ReadAsync(Cts.Token);
}
}
LocalEventBus
实现
LocalEventBus
继承自
IEventBus
即可,如果有需要扩展的方法自行添加,池化和管理器的情况单独处理。
publicinterfaceILocalEventBus: IEventBus
{
}
public class LocalEventBus(IServiceProvider serviceProvider, LocalEventBusOptions options) : ILocalEventBus
{
private LocalEventBusPool? EventBusPool => serviceProvider.GetService<LocalEventBusPool>();
publicvoid Publish<TEvent>(TEvent @event) where TEvent : IEvent
{
if (options.Pool)
{
Debug.Assert(EventBusPool != null, nameof(EventBusPool) + " != null");
EventBusPool.Publish(@event);
}
else
{
var manager = serviceProvider.GetService<LocalEventBusManager<TEvent>>();
if (manager isnull) thrownew NullReferenceException($"No manager for event {typeof(TEvent).Name}, please add singleton service it.");
manager.Publish(@event);
}
}
publicasync Task PublishAsync<TEvent>(TEvent @event) where TEvent : IEvent
{
if (options.Pool)
{
Debug.Assert(EventBusPool != null, nameof(EventBusPool) + " != null");
await EventBusPool.PublishAsync(@event);
}
else
{
var manager = serviceProvider.GetService<LocalEventBusManager<TEvent>>();
if (manager isnull) thrownew NullReferenceException($"No manager for event {typeof(TEvent).Name}, please add singleton service it.");
await manager.PublishAsync(@event);
}
}
publicvoid OnSubscribe<TEvent>() where TEvent : IEvent
{
if (options.Pool)
{
Debug.Assert(EventBusPool != null, nameof(EventBusPool) + " != null");
EventBusPool.OnSubscribe<TEvent>();
}
else
{
var manager = serviceProvider.GetService<LocalEventBusManager<TEvent>>();
if (manager isnull) thrownew NullReferenceException($"No manager for event {typeof(TEvent).Name}, please add singleton service it.");
manager.AutoHandle();
}
}
}
分布式事件总线
根据需要扩展即可,基本逻辑相同,但可能需要增加确认机制等。
转自:胖纸不争
链接:cnblogs.com/donpangpang/p/17939849
- EOF -
技术群:
添加小编微信并备注进群
小编微信:mm1552923
公众号:dotNet编程大全