当前位置: 欣欣网 > 码农

.NET 轻量级、高效任务调度器:ScheduleTask

2024-05-20码农

前言

至于任务调度这个基础功能,重要性不言而喻,大多数业务系统都会用到,世面上有很多成熟的三方库比如Quartz,Hangfire,Coravel

这里我们不讨论三方的库如何使用 而是从0开始自己制作一个简易的任务调度,如果只是到分钟级别的粒度基本够用。

正文

技术栈用到了: BackgroundService NCrontab

第一步我们定义一个简单的任务约定,不干别的就是一个执行方法:

publicinterfaceIScheduleTask
{
Task ExecuteAsync();
}
publicabstract classScheduleTask : IScheduleTask
{
publicvirtual Task ExecuteAsync()
{
return Task.CompletedTask;
}
}

第二步定义特性标注任务执行周期等信的metadata

[AttributeUsage(AttributeTargets. class, AllowMultiple = true, Inherited = false)]
public class ScheduleTaskAttribute(string cron) : Attribute
{
///<summary>
/// 支持的cron表达式格式 * * * * *:https://en.wikipedia.org/wiki/Cron
/// 最小单位为分钟
///</summary>
publicstring Cron { getset; } = cron;
publicstring? Description { getset; }
///<summary>
/// 是否异步执行.默认false会阻塞接下来的同类任务
///</summary>
publicbool IsAsync { getset; } = false;
///<summary>
/// 是否初始化即启动,默认false
///</summary>
publicbool IsStartOnInit { getset; } = false;
}

第三步我们定义一个调度器约定,不干别的就是判断当前的任务是否可以执行:

publicinterfaceIScheduler
{
///<summary>
/// 判断当前的任务是否可以执行
///</summary>
boolCanRun(ScheduleTaskAttribute scheduleMetadata, DateTime referenceTime);
}

好了,基础步骤就完成了,如果我们需要实现配置级别的任务调度或者动态的任务调度 那我们再抽象一个Store:

public class ScheduleTaskMetadata(Type scheduleTaskType, string cron)
{
public Type ScheduleTaskType { getset; } = scheduleTaskType;
publicstring Cron { getset; } = cron;
publicstring? Description { getset; }
publicbool IsAsync { getset; } = false;
publicbool IsStartOnInit { getset; } = false;
}
publicinterfaceIScheduleMetadataStore
{
///<summary>
/// 获取所有ScheduleTaskMetadata
///</summary>
Task<IEnumerable<ScheduleTaskMetadata>> GetAllAsync();
}

实现一个Configuration级别的Store

internal class ConfigurationScheduleMetadataStore(IConfiguration configuration) : IScheduleMetadataStore
{
conststring Key = "BiwenQuickApi:Schedules";
public Task<IEnumerable<ScheduleTaskMetadata>> GetAllAsync()
{
var options = configuration.Getp(Key).GetChildren();
if (options?.Any() istrue)
{
var metadatas = options.Select(x =>
{
var type = Type.GetType(x[nameof(ConfigurationScheduleOption.ScheduleType)]!);
if (type isnull)
thrownew ArgumentException($"Type {x[nameof(ConfigurationScheduleOption.ScheduleType)]} not found!");
returnnew ScheduleTaskMetadata(type, x[nameof(ConfigurationScheduleOption.Cron)]!)
{
Description = x[nameof(ConfigurationScheduleOption.Description)],
IsAsync = string.IsNullOrEmpty(x[nameof(ConfigurationScheduleOption.IsAsync)]) ? false : bool.Parse(x[nameof(ConfigurationScheduleOption.IsAsync)]!),
IsStartOnInit = string.IsNullOrEmpty(x[nameof(ConfigurationScheduleOption.IsStartOnInit)]) ? false : bool.Parse(x[nameof(ConfigurationScheduleOption.IsStartOnInit)]!),
};
});
return Task.FromResult(metadatas);
}
return Task.FromResult(Enumerable.Empty<ScheduleTaskMetadata>());
}
}


然后,我们可能需要多任务调度的事件做一些操作或者日志存储。

比如失败了该干嘛,完成了回调其他后续业务等。

我们再来定义一下具体的事件 IEvent ,具体可以参考文章: https://www.cnblogs.com/vipwan/p/18184088

事件 IEvent 代码

1、首先定义一个事件约定的空接口

publicinterfaceIEvent{}

2、然后定义事件订阅者接口

publicinterfaceIEventSubscriber<TwhereT : IEvent
{
Task HandleAsync(T @event, CancellationToken ct);
///<summary>
/// 执行排序
///</summary>
int Order { get; }
///<summary>
/// 如果发生错误是否抛出异常,将阻塞后续Handler
///</summary>
bool ThrowIfError { get; }
}
publicabstract classEventSubscriber<T> : IEventSubscriber<TwhereT : IEvent
{
publicabstract Task HandleAsync(T @event, CancellationToken ct);
publicvirtualint Order => 0;
///<summary>
/// 默认不抛出异常
///</summary>
publicvirtualbool ThrowIfError => false;
}

3、接着就是发布者

internal class Publisher(IServiceProvider serviceProvider)
{
publicasync Task PublishAsync<T>(T @event, CancellationToken ct) where T : IEvent
 {
var handlers = serviceProvider.GetServices<IEventSubscriber<T>>();
if (handlers isnullreturn;
foreach (var handler in handlers.OrderBy(x => x.Order))
{
try
{
await handler.HandleAsync(@event, ct);
}
catch
{
if (handler.ThrowIfError)
{
throw;
}
//todo:
}
}
 }
}

4、到此发布订阅的基本代码也就写完了.接下来就是注册发布者和所有的订阅者了

publicabstract class ScheduleTaskEvent(IScheduleTask scheduleTask, DateTime eventTime) : IEvent
{
///<summary>
/// 任务
///</summary>
public IScheduleTask ScheduleTask { getset; } = scheduleTask;
///<summary>
/// 触发时间
///</summary>
public DateTime EventTime { getset; } = eventTime;
}
///<summary>
/// 执行完成
///</summary>
publicsealed class TaskSuccessedEvent(IScheduleTask scheduleTask, DateTime eventTime, DateTime endTime) : ScheduleTaskEvent(scheduleTask, eventTime)
{
///<summary>
/// 执行结束的时间
///</summary>
public DateTime EndTime { getset; } = endTime;
}
///<summary>
/// 执行开始
///</summary>
publicsealed class TaskStartedEvent(IScheduleTask scheduleTask, DateTime eventTime) : ScheduleTaskEvent(scheduleTask, eventTime);
///<summary>
/// 执行失败
///</summary>
publicsealed class TaskFailedEvent(IScheduleTask scheduleTask, DateTime eventTime, Exception exception) : ScheduleTaskEvent(scheduleTask, eventTime)
{
///<summary>
/// 异常信息
///</summary>
public Exception Exception { getprivateset; } = exception;
}

接下来我们再实现基于 NCrontab 的简易调度器,这个调度器主要是解析 Cron 表达式判断传入时间是否可以执行ScheduleTask,具体的代码:

internal classSampleNCrontabScheduler : IScheduler
{
///<summary>
/// 暂存上次执行时间
///</summary>
privatestatic ConcurrentDictionary<ScheduleTaskAttribute, DateTime> LastRunTimes = new();
publicboolCanRun(ScheduleTaskAttribute scheduleMetadata, DateTime referenceTime)
{
var now = DateTime.Now;
var haveExcuteTime = LastRunTimes.TryGetValue(scheduleMetadata, outvar time);
if (!haveExcuteTime)
{
var nextStartTime = CrontabSchedule.Parse(scheduleMetadata.Cron).GetNextOccurrence(referenceTime);
LastRunTimes.TryAdd(scheduleMetadata, nextStartTime);
//如果不是初始化启动,则不执行
if (!scheduleMetadata.IsStartOnInit)
returnfalse;
}
if (now >= time)
{
var nextStartTime = CrontabSchedule.Parse(scheduleMetadata.Cron).GetNextOccurrence(referenceTime);
//更新下次执行时间
LastRunTimes.TryUpdate(scheduleMetadata, nextStartTime, time);
returntrue;
}
returnfalse;
}
}

然后就是核心的 BackgroundService 了,这里我用的IdleTime心跳来实现,粒度分钟,当然内部也可以封装 Timer 等实现更复杂精度更高的调度,这里就不展开讲了。

代码如下:

internal classScheduleBackgroundService : BackgroundService
{
privatestaticreadonly TimeSpan _pollingTime
DEBUG
//轮询20s 测试环境下,方便测试。
= TimeSpan.FromSeconds(20);
if
!DEBUG
//轮询60s 正式环境下,考虑性能轮询时间延长到60s
= TimeSpan.FromSeconds(60);
if
//心跳10s.
privatestaticreadonly TimeSpan _minIdleTime = TimeSpan.FromSeconds(10);
privatereadonly ILogger<ScheduleBackgroundService> _logger;
privatereadonly IServiceProvider _serviceProvider;
publicScheduleBackgroundService(ILogger<ScheduleBackgroundService> logger, IServiceProvider serviceProvider)
{
_logger = logger;
_serviceProvider = serviceProvider;
}
protectedoverrideasync Task ExecuteAsync(CancellationToken stoppingToken)
{
while (!stoppingToken.IsCancellationRequested)
{
var pollingDelay = Task.Delay(_pollingTime, stoppingToken);
try
{
await RunAsync(stoppingToken);
}
catch (Exception ex)
{
//todo:
_logger.LogError(ex.Message);
}
await WaitAsync(pollingDelay, stoppingToken);
}
}
privateasync Task RunAsync(CancellationToken stoppingToken)
{
usingvar scope = _serviceProvider.CreateScope();
var tasks = scope.ServiceProvider.GetServices<IScheduleTask>();
if (tasks isnull || !tasks.Any())
{
return;
}
//调度器
var scheduler = scope.ServiceProvider.GetRequiredService<IScheduler>();
async Task DoTaskAsync(IScheduleTask task, ScheduleTaskAttribute metadata)
{
if (scheduler.CanRun(metadata, DateTime.Now))
{
var eventTime = DateTime.Now;
//通知启动
_ = new TaskStartedEvent(task, eventTime).PublishAsync(default);
try
{
if (metadata.IsAsync)
{
//异步执行
_ = task.ExecuteAsync();
}
else
{
//同步执行
await task.ExecuteAsync();
}
//执行完成
_ = new TaskSuccessedEvent(task, eventTime, DateTime.Now).PublishAsync(default);
}
catch (Exception ex)
{
_ = new TaskFailedEvent(task, DateTime.Now, ex).PublishAsync(default);
}
}
};
//注解中的task
foreach (var task in tasks)
{
if (stoppingToken.IsCancellationRequested)
{
break;
}
//标注的metadatas
var metadatas = task.GetType().GetCustomAttributes<ScheduleTaskAttribute>();
if (!metadatas.Any())
{
continue;
}
foreach (var metadata in metadatas)
{
await DoTaskAsync(task, metadata);
}
}
//store中的scheduler
var stores = _serviceProvider.GetServices<IScheduleMetadataStore>().ToArray();
//并行执行,提高性能
Parallel.ForEach(stores, async store =>
{
if (stoppingToken.IsCancellationRequested)
{
return;
}
var metadatas = await store.GetAllAsync();
if (metadatas isnull || !metadatas.Any())
{
return;
}
foreach (var metadata in metadatas)
{
var attr = new ScheduleTaskAttribute(metadata.Cron)
{
Description = metadata.Description,
IsAsync = metadata.IsAsync,
IsStartOnInit = metadata.IsStartOnInit,
};
var task = scope.ServiceProvider.GetRequiredService(metadata.ScheduleTaskType) as IScheduleTask;
if (task isnull)
{
return;
}
await DoTaskAsync(task, attr);
}
});
}
privatestaticasync Task WaitAsync(Task pollingDelay, CancellationToken stoppingToken)
{
try
{
await Task.Delay(_minIdleTime, stoppingToken);
await pollingDelay;
}
catch (OperationCanceledException)
{
}
}
}



最后收尾阶段我们老规矩扩展一下 IServiceCollection :

internalstatic IServiceCollection AddScheduleTask(this IServiceCollection services)
{
foreach (var task in ScheduleTasks)
{
services.AddTransient(task);
services.AddTransient(typeof(IScheduleTask), task);
}
//调度器
services.AddScheduler<SampleNCrontabScheduler>();
//配置文件Store:
ices.AddScheduleMetadataStore<ConfigurationScheduleMetadataStore>();
//BackgroundService
services.AddHostedService<ScheduleBackgroundService>();
return services;
}
///<summary>
/// 注册调度器AddScheduler
///</summary>
publicstatic IServiceCollection AddScheduler<T>(this IServiceCollection services) where T :  classIScheduler
{
services.AddSingleton<IScheduler, T>();
return services;
}
///<summary>
/// 注册ScheduleMetadataStore
///</summary>
publicstatic IServiceCollection AddScheduleMetadataStore<T>(this IServiceCollection services) where T :  classIScheduleMetadataStore
{
services.AddSingleton<IScheduleMetadataStore, T>();
return services;
}

老规矩我们来测试一下:

//通过特性标注的方式执行:
[ScheduleTask(Constants.CronEveryMinute)//每分钟一次
[ScheduleTask("0/3 * * * *")]//每3分钟执行一次
public class KeepAlive(ILogger<KeepAlive> logger) : IScheduleTask
{
publicasync Task ExecuteAsync()
{
//执行5s
await Task.Delay(TimeSpan.FromSeconds(5));
logger.LogInformation("keep alive!");
}
}
public class DemoConfigTask(ILogger<DemoConfigTask> logger) : IScheduleTask
{
public Task ExecuteAsync()
{
logger.LogInformation("Demo Config Schedule Done!");
return Task.CompletedTask;
}
}

通过配置文件的方式配置Store:

{
"BiwenQuickApi": {
"Schedules": [
{
"ScheduleType""Biwen.QuickApi.DemoWeb.Schedules.DemoConfigTask,Biwen.QuickApi.DemoWeb",
"Cron""0/5 * * * *",
"Description""Every 5 mins",
"IsAsync"true,
"IsStartOnInit"false
},
{
"ScheduleType""Biwen.QuickApi.DemoWeb.Schedules.DemoConfigTask,Biwen.QuickApi.DemoWeb",
"Cron""0/10 * * * *",
"Description""Every 10 mins",
"IsAsync"false,
"IsStartOnInit"true
}
]
}
}

我们还可以实现自己的Store,这里以放到内存为例,如果有兴趣 你可以可以自行开发一个面板管理:

public classDemoStore : IScheduleMetadataStore
{
public Task<IEnumerable<ScheduleTaskMetadata>> GetAllAsync()
{
//模拟从数据库或配置文件中获取ScheduleTaskMetadata
IEnumerable<ScheduleTaskMetadata> metadatas =
[
new ScheduleTaskMetadata(typeof(DemoTask),Constants.CronEveryNMinutes(2))
{
Description="测试的Schedule"
},
];
return Task.FromResult(metadatas);
}
}
//然后注册这个Store:
builder.Services.AddScheduleMetadataStore<DemoStore>();

所有的一切都大功告成,最后我们来跑一下Demo,成功了

当然这里是自己的固定思维设计的一个简约版,还存在一些不足,欢迎板砖轻拍指正!

提供同一时间单一运行中的任务实现

///<summary>
/// 模拟一个只能同时存在一个的任务.一分钟执行一次,但是耗时两分钟.
///</summary>
///<param name="logger"></param>
[ScheduleTask(Constants.CronEveryMinute, IsStartOnInit = true)]
public class OnlyOneTask(ILogger<OnlyOneTask> logger) : OnlyOneRunningScheduleTask
{
publicoverride Task OnAbort()
{
logger.LogWarning($"[{DateTime.Now}]任务被打断.因为有一个相同的任务正在执行!");
return Task.CompletedTask;
}
publicoverrideasync Task ExecuteAsync()
{
var now = DateTime.Now;
//模拟一个耗时2分钟的任务
await Task.Delay(TimeSpan.FromMinutes(2));
logger.LogInformation($"[{now}] ~ {DateTime.Now} 执行一个耗时两分钟的任务!");
}
}

源码地址

https://github.com/vipwan/Biwen.QuickApi

https://github.com/vipwan/Biwen.QuickApi/tree/master/Biwen.QuickApi/Scheduling

转自:万雅虎

链接:cnblogs.com/vipwan/p/18194062/biwen-quickapi-scheduletask

- EOF -

推荐阅读 点击标题可跳转

看完本文有收获?请转发分享给更多人

推荐关注「DotNet」,提升.Net技能

点赞和在看就是最大的支持❤️