5

Asp-Net-Core开发笔记:集成Hangfire实现异步任务队列和定时任务 - 程序设计实验室

 1 year ago
source link: https://www.cnblogs.com/deali/p/16754636.html
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

Asp-Net-Core开发笔记:集成Hangfire实现异步任务队列和定时任务 - 程序设计实验室 - 博客园

最近把Python写的数据采集平台往.Net Core上迁移,原本的采集任务使用多进程+线程池的方式来加快采集速度,使用Celery作为异步任务队列兼具定时任务功能,这套东西用着还行,但反正就折腾嘛,直接上C#~

本文记录 Hangfire 在实际应用里的用法,我发现网络上找到的大部分文章都是用 Hangfire 的异步任务输出个 Hello World,然后就没了。我实在不知道这样的文章写了有什么意义??除了浪费看的人的时间之外,还浪费自己写文章的时间……

.NetCore作为一个高性能的平台,自然不可能输给Python,不过我不想造轮子了(菜),找个现成的方案来用,免得踩坑~

先是调研了一下.NetCore目前的生态,发现有几个选择:

  • FreeScheduler
  • Quartz.net
  • Hangfire

第一个 FreeScheduler 和 FreeSQL 项目出自同一个作者之手,刚好我的项目也是用的 FreeSQL 作为 ORM,不过看了一下Github上stars比较少,而且文档暂时还不完善,我最关心的依赖注入功能暂时还不好搞,于是只好作罢。

然后在 Quartz.net 和 Hangfire 两者中,我选择了后者,原因是我之前做 CrawlCenterNet 项目的时候用过 Hangfire,还挺好用的,且带有一个简单的 dashboard,比较直观~

那么就开始吧~

关于后端的选择

这里的后端指的是任务队列的存储后端,也就是 Hangfire 文档中写的 Storage。

看了一下官网,大部分关系型非关系型数据库都是可以的,那我就放心了。由于目前生产环境在使用 Oracle 数据库,所以一开始我选择了 Oracle 作为 Storage,但是在同时开到2000多个任务的时候报错了,看了下原因是表空间不足,是 Oracle 的问题…

所以为了性能和稳定性,我弃坑了,接着尝试了 SQLite (仅作为本地测试),结果发现配置里面定义了 Connection String 但它不理我,直接把这个 Connection String 作为数据库的名称了,无语… 这样就没办法把 SQLite 数据库设置为异步模式,那速度就直接乌龟爬了…

再次弃坑… 这次直接上 Redis 了,为了提高性能,舍弃持久化能力~ Redis也没让我失望,几千个任务压根不带眨眼的,nice~

数据采集代码

Hangfire 组件不是一开始就引入的,这里先上最基本的数据采集代码,后面的介绍才能更清楚~

关键的代码在 Services/CrawlService.cs 文件中

public class CrawlService {
  // 依赖注入一些服务
  private readonly IBaseRepository<Proc> _repo;
  
  public async Task CrawlAllProc() {
    for(var i=1; i<2000; i++) {
      await CrawlProcList(i);
    }
  }
  
  public async Task CrawlProcList(int page) {
    // 具体代码省略了
    var procList = ; //...
    foreach (var proc in procList) {
      await CrawlProc(proc);
    }
  }
  
  public async Task CrawlProc(Proc proc) { }
}

然后,当启动采集任务的时候,直接去调用 CrawlAllProc 方法,这样就开始一页一页采集,每页又有很多的 Proc 数据,全都采集下来。

上面的代码用的是 await ,会等待异步方法完成,速度很慢,去掉 await ,在新线程中执行任务,不等待其结束,不过问题也很明显,如果出错了很难调试,这样就不好保证系统的稳定性。

接下来我们用 Hangfire 来改造。

安装 Hangfire

本项目用到了以下依赖:

  • Hangfire.Core
  • Hangfire.AspNetCore
  • Hangfire.Redis.StackExchange

直接 nuget 一把梭完事

为了跟后面的内容区分,这里先来官方的例子

注册服务:

services.AddHangfire(
  configuration => configuration
  .SetDataCompatibilityLevel(CompatibilityLevel.Version_170)
  .UseSimpleAssemblyNameTypeSerializer()
  .UseRecommendedSerializerSettings()
  // 根据实际使用的 Storage 来注册
  .UseRedisStorage();
services.AddHangfireServer();

添加中间件:

app.UseHangfireDashboard();

Hangfire 注册的时候默认是单例模式,所以在任意代码中使用其静态方法就能添加异步任务或者定时任务。

异步任务:

BackgroundJobs.Enqueue(() => Console.WriteLine("Hello world!"));

定时任务:

Hangfire的定时任务叫做 recurrent tasks,我之前一般习惯叫 scheduled task,一开始差点找不到文档~

以下代码添加一个每天执行一次的任务,如果需要其他时间,可以自定义后面的 Cron 参数,具体自行研究 Cron 语法~

RecurringJob.AddOrUpdate("easyjob", () => Console.Write("Easy!"), Cron.Daily);

OK,终于进到正文

正如我一开始说的,前面介绍的用法是远远不够的,如果只是介绍个 Hello World,那也没必要专门写篇文章了…

现在开始介绍如何将 Hangfire 结合我的 CrawlService 使用

因为 CrawlService 需要操作数据库,所以是用到了依赖注入的,所以我们需要让 Hangfire 也支持依赖注入。

官方文档有一小节是关于 IOC 容器的(https://docs.hangfire.io/en/latest/background-methods/using-ioc-containers.html),不过并没有介绍 AspNetCore 的容器,直接自己动手 丰衣足食咯~

添加 AspNetCore 的依赖注入容器

一开始搜了好久没找到,最终是在Github上找到一个例子代码,里面的 AspNetCore 版本好老,居然是1.1版本,我都没用过… 不过并不影响我节俭他的写法~

这一步需要 JobActivator 的子类

来写一个,我把它放在 Infrastructure 目录下

using Hangfire;

public class HangfireActivator : JobActivator {
    private readonly IServiceProvider _serviceProvider;

    public HangfireActivator(IServiceProvider serviceProvider) {
        _serviceProvider = serviceProvider;
    }

    public override object? ActivateJob(Type jobType) {
        return _serviceProvider.GetService(jobType);
    }
}

这里是在 HangfireActivator 的构造函数中把 AspNetCore 的 IOC 容器对象传入,并且重写 ActivateJob 方法,让 Hangfire 才激活任务的时候从 IOC 容器中获取对象,比较好理解。

修改服务注册代码

其实服务注册部分是一样的,无须修改

不过按照习惯,为了使 Program.cs 或者 Startup.cs 代码比较简洁,我还是写了扩展方法来实现这部分。

Extensions 目录中添加 ConfigureHangfire.cs

public static class ConfigureHangfire {
    public static void AddHangfirePkg(this IServiceCollection services, IConfiguration configuration) {
        services.AddHangfire(conf => conf
            .SetDataCompatibilityLevel(CompatibilityLevel.Version_170)
            .UseSimpleAssemblyNameTypeSerializer()
            .UseRecommendedSerializerSettings()
            .UseRedisStorage()
        );

        services.AddHangfireServer();
    }

    public static void UseHangfire(this WebApplication app) {
        GlobalConfiguration.Configuration.UseActivator(new HangfireActivator(app.Services));
        app.UseHangfireDashboard();
    }
}

可以看到有修改的地方就是在添加中间件之前,配置了 Activator 这行代码:

GlobalConfiguration.Configuration.UseActivator(new HangfireActivator(app.Services));

直接把 IOC 容器传入

接着在 Program.cs (我用的 .Net6)中使用这个扩展方法就完事了~

builder.Services.AddHangfirePkg(builder.Configuration);
// 中间件
app.UseHangfire();

有了依赖注入之后,创建异步任务是这样的。也就是多了个泛型参数。

BackgroundJob.Enqueue<CrawlService>(a => a.CrawlAllProc());

定时任务,每小时执行一次

RecurringJob.AddOrUpdate<CrawlService>(a => a.CrawlAllProc(), Cron.Hourly);

改造一下数据采集代码

OK,最后回到一开始的数据采集代码,做如下修改:

public class CrawlService {
  // 依赖注入一些服务
  private readonly IBaseRepository<Proc> _repo;
  
  public async Task CrawlAllProc() {
    for(var i=1; i<2000; i++) {
      // await CrawlProcList(i);
      BackgroundJob.Enqueue(() => CrawlProcList(i, 100));
    }
  }
  
  public async Task CrawlProcList(int page, int pageSize = 100) {
    // 具体代码省略了
    var procList = ; //...
    foreach (var proc in procList) {
      // await CrawlProc(proc);
      BackgroundJob.Enqueue(() => CrawlProc(proc));
    }
  }
  
  public async Task CrawlProc(Proc proc) { }
}

把原来 await 的地方注释掉,换成用 Hangfire 创建异步任务,运行起来,打开dashboard,可以看到任务噌的一下就上到几千,速度极快~

需要注意的就是 CrawlProcList 方法的第二个参数 pageSize 我们给了默认值100,在正常使用是没问题的,可以不传入这个参数,默认就是100。

BackgroundJob.Enqueue 方法里不能省略这个参数,不然会报错说编译器无法解析啥的,这个应该是C#的语言限制,具体我暂时还没去深入研究。

OK,这样就初步搞定了数据采集 & 定时采集的功能,这部分刚好是我国庆第一天加班完成的,后续的就交给时间吧~ 国庆剩下几天的假期让它跑个够,等假期结束再回去看看效果如何,到时有新的进展我也会及时更新博客。

对了,我还打算封装个异步任务和定时任务的接口(似乎 AspNetCore 没有这部分功能?),因为我不想代码和 Hangfire 有太高的耦合,封装成抽象的接口,以后如果换别的组件也没有压力。

就把这件事先加入 todo list 吧~

__EOF__


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK