E
ven
t Bus(事件匯流排)是一種用於在應用程式內部或跨應用程式元件之間進行事件通訊的機制。
它允許不同的元件透過釋出和訂閱事件來進行解耦和通訊。在給定的程式碼片段中,我們可以看到一個使用C#實作的Event Bus。它定義了一些介面和類來實作事件的釋出和訂閱。
首先,我們有兩個基本的約束介面:
IEvent
和
IAsyncEventHandler<TEvent>
。
IEvent是一個空介面,用於約束事件的型別。
IAsyncEventHandler<TEvent>
是一個泛型介面,用於約束事件處理常式的型別。
它定義了處理事件的異步方法HandleAsync和處理異常的方法HandleException。接下來,我們有一個IEventBus介面,它定義了一些操作方法用於釋出和訂閱事件。
其中,
Publish<TEvent>
和
PublishAsync<TEvent>
方法用於釋出事件,而
OnSubscribe<TEvent>
方法用於訂閱事件。然後,我們看到一個實作了本地事件匯流排的類
LocalEventBusManager<TEvent>
。它實作了
ILocalEventBusManager<TEvent>
介面,用於在單一管道內處理本地事件。它使用了一個
Channel<TEvent>
來儲存事件,並提供了釋出事件的方法
Publish
和
PublishAsync
。此外,它還提供了一個自動處理事件的方法
AutoHandle
。
總的來說
Event Bus
提供了一種方便的方式來實作元件之間的松耦合通訊。
透過釋出和訂閱事件,元件可以獨立地進行操作,而不需要直接依賴於彼此的實作細節。
這種機制可以提高程式碼的可維護性和可延伸性。
Github倉庫地址:https://github.com/DonPangPang/soda-event-bus
實作一些基本約束
先實作一些約束,實作
IEvent
約束事件,實作
IAsyncEvnetHandler<TEvent> where TEvent:IEvent
來約束事件的處理常式。
publicinterfaceIEvent
{
}
publicinterfaceIAsyncEventHandler<inTEvent> whereTEvent : IEvent
{
Task HandleAsync(IEvent @event);
voidHandleException(IEvent @event, Exception ex);
}
接下來規定一下咱們的
IEventBus
,會有哪些操作方法。基本就是釋出和訂閱。
publicinterfaceIEventBus
{
void Publish<TEvent>(TEvent @event) where TEvent : IEvent;
Task PublishAsync<TEvent>(TEvent @event) where TEvent : IEvent;
void OnSubscribe<TEvent>() where TEvent : IEvent;
}
實作一個本地事件匯流排
本地事件處理
本地事件的處理我打算采用兩種方式實作,一種是
LocalEventBusManager
即本地事件管理,第二種是
LocalEventBusPool
池化本地事件。
LocalEvnetBusManager
LocalEventBusManager
主要在單一管道內進行處理,集中進行消費。
publicinterfaceILocalEventBusManager<inTEvent>whereTEvent : IEvent
{
voidPublish(TEvent @event);
Task PublishAsync(TEvent @event) ;
voidAutoHandle();
}
public classLocalEventBusManager<TEvent>(IServiceProviderserviceProvider):ILocalEventBusManager<TEvent>
whereTEvent: IEvent
{
readonly IServiceProvider _servicesProvider = serviceProvider;
privatereadonly Channel<TEvent> _eventChannel = Channel.CreateUnbounded<TEvent>();
publicvoidPublish(TEvent @event)
{
Debug.Assert(_eventChannel != null, nameof(_eventChannel) + " != null");
_eventChannel.Writer.WriteAsync(@event);
}
private CancellationTokenSource Cts { get; } = new();
publicvoidCancel()
{
Cts.Cancel();
}
publicasync Task PublishAsync(TEvent @event)
{
await _eventChannel.Writer.WriteAsync(@event);
}
publicvoidAutoHandle()
{
// 確保只啟動一次
if (!Cts.IsCancellationRequested) return;
Task.Run(async () =>
{
while (!Cts.IsCancellationRequested)
{
var reader = await _eventChannel.Reader.ReadAsync();
await HandleAsync(reader);
}
}, Cts.Token);
}
async Task HandleAsync(TEvent @event)
{
var handler = _servicesProvider.GetService<IAsyncEventHandler<TEvent>>();
if (handler isnull)
{
thrownew NullReferenceException($"No handler for event {@event.GetType().Name}");
}
try
{
await handler.HandleAsync(@event);
}
catch (Exception ex)
{
handler.HandleException( @event, ex);
}
}
}
LocalEventBusPool
LocalEventBusPool
即所有的Event都會有一個單獨的管道處理,單獨消費處理,並列能力更好一些。
publicsealed class LocalEventBusPool(IServiceProvider serviceProvider)
{
privatereadonly IServiceProvider _serviceProvider = serviceProvider;
private classChannelKey
{
public required string Key { get; init; }
publicint Subscribers { get; set; }
publicoverrideboolEquals(object? obj)
{
if (obj is ChannelKey key)
{
returnstring.Equals(key.Key, Key, StringComparison.OrdinalIgnoreCase);
}
returnfalse;
}
publicoverrideintGetHashCode()
{
return0;
}
}
private Channel<IEvent> Rent(string channel)
{
_channels.TryGetValue(new ChannelKey() { Key = channel }, outvarvalue);
if (value != null) returnvalue;
value = Channel.CreateUnbounded<IEvent>();
_channels.TryAdd(new ChannelKey() { Key = channel }, value);
returnvalue;
}
private Channel<IEvent> Rent(ChannelKey channelKey)
{
_channels.TryGetValue(channelKey, outvarvalue);
if (value != null) returnvalue;
value = Channel.CreateUnbounded<IEvent>();
_channels.TryAdd(channelKey, value);
returnvalue;
}
privatereadonly ConcurrentDictionary<ChannelKey, Channel<IEvent>> _channels = new();
private CancellationTokenSource Cts { get; } = new();
publicvoidCancel()
{
Cts.Cancel();
_channels.Clear();
Cts.TryReset();
}
publicasync Task PublishAsync<TEvent>(TEvent @event) where TEvent : IEvent
{
await Rent(typeof(TEvent).Name).Writer.WriteAsync(@event);
}
publicvoid Publish<TEvent>(TEvent @event) where TEvent : IEvent
{
Rent(typeof(TEvent).Name).Writer.TryWrite(@event);
}
publicvoid OnSubscribe<TEvent>() where TEvent : IEvent
{
var channelKey = _channels.FirstOrDefault(x => x.Key.Key == typeof(TEvent).Name).Key ??
new ChannelKey() { Key = typeof(TEvent).Name };
channelKey.Subscribers++;
Task.Run(async () =>
{
try
{
while (!Cts.IsCancellationRequested)
{
var @event = await ReadAsync(channelKey);
var handler = _serviceProvider.GetService<IAsyncEventHandler<TEvent>>();
if (handler == null) thrownew NullReferenceException($"No handler for Event {typeof(TEvent).Name}");
try
{
await handler.HandleAsync((TEvent)@event);
}
catch (Exception ex)
{
handler.HandleException((TEvent)@event, ex);
}
}
}
catch (Exception e)
{
thrownew InvalidOperationException("Error on onSubscribe handler", e);
}
}, Cts.Token);
}
privateasync Task<IEvent> ReadAsync(string channel)
{
returnawait Rent(channel).Reader.ReadAsync(Cts.Token);
}
privateasync Task<IEvent> ReadAsync(ChannelKey channel)
{
returnawait Rent(channel).Reader.ReadAsync(Cts.Token);
}
}
LocalEventBus
實作
LocalEventBus
繼承自
IEventBus
即可,如果有需要擴充套件的方法自行添加,池化和管理器的情況單獨處理。
publicinterfaceILocalEventBus: IEventBus
{
}
public class LocalEventBus(IServiceProvider serviceProvider, LocalEventBusOptions options) : ILocalEventBus
{
private LocalEventBusPool? EventBusPool => serviceProvider.GetService<LocalEventBusPool>();
publicvoid Publish<TEvent>(TEvent @event) where TEvent : IEvent
{
if (options.Pool)
{
Debug.Assert(EventBusPool != null, nameof(EventBusPool) + " != null");
EventBusPool.Publish(@event);
}
else
{
var manager = serviceProvider.GetService<LocalEventBusManager<TEvent>>();
if (manager isnull) thrownew NullReferenceException($"No manager for event {typeof(TEvent).Name}, please add singleton service it.");
manager.Publish(@event);
}
}
publicasync Task PublishAsync<TEvent>(TEvent @event) where TEvent : IEvent
{
if (options.Pool)
{
Debug.Assert(EventBusPool != null, nameof(EventBusPool) + " != null");
await EventBusPool.PublishAsync(@event);
}
else
{
var manager = serviceProvider.GetService<LocalEventBusManager<TEvent>>();
if (manager isnull) thrownew NullReferenceException($"No manager for event {typeof(TEvent).Name}, please add singleton service it.");
await manager.PublishAsync(@event);
}
}
publicvoid OnSubscribe<TEvent>() where TEvent : IEvent
{
if (options.Pool)
{
Debug.Assert(EventBusPool != null, nameof(EventBusPool) + " != null");
EventBusPool.OnSubscribe<TEvent>();
}
else
{
var manager = serviceProvider.GetService<LocalEventBusManager<TEvent>>();
if (manager isnull) thrownew NullReferenceException($"No manager for event {typeof(TEvent).Name}, please add singleton service it.");
manager.AutoHandle();
}
}
}
分布式事件匯流排
根據需要擴充套件即可,基本邏輯相同,但可能需要增加確認機制等。
轉自:胖紙不爭
連結:cnblogs.com/donpangpang/p/17939849
- EOF -
技術群:
添加小編微信並備註進群
小編微信:mm1552923
公眾號:dotNet編程大全