79

Asp.Net Core 快速邮件队列设计与实现

 5 years ago
source link: http://www.10tiao.com/html/391/201807/2654071060/3.html
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

发送邮件几乎是软件系统中必不可少的功能,在Asp.Net Core 中我们可以使用MailKit发送邮件,MailKit发送邮件比较简单,网上有许多可以参考的文章,但是应该注意附件名长度,和附件名不能出现中文的问题,如果你遇到了这样的问题可以参考我之前写的这篇博客Asp.Net Core MailKit 完美附件(中文名、长文件名)。

在我们简单搜索网络,并成功解决了附件的问题之后,我们已经能够发送邮件啦!不过另一个问题显现出来——发送邮件太慢了,没错,在我使用QQ邮箱发送时,单封邮件发送大概要用1.5秒左右,用户可能难以忍受请求发生1.5秒的延迟。

所以,我们必须解决这个问题,我们的解决办法就是使用邮件队列来发送邮件

设计邮件队列

Ok, 第一步就是规划我们的邮件队列有什么

EmailOptions

我们得有一个邮件Options类,来存储邮件相关的选项

/// <summary>
/// 邮件选项
/// </summary>
public class EmailOptions{  
 public bool DisableOAuth { get; set; }  
   public string DisplayName { get; set; }  
     public string Host { get; set; } // 邮件主机地址    public string Password { get; set; }  
      public int Port { get; set; }  
       public string UserName { get; set; }  
        public int SleepInterval { get; set; } = 3000;    ...

SleepInterval 是睡眠间隔,因为目前我们实现的队列是进程内的独立线程,发送器会循环读取队列,当队列是空的时候,我们应该让线程休息一会,不然无限循环会消耗大量CPU资源

然后我们还需要的就是 一个用于存储邮件的队列,或者叫队列提供器,总之我们要将邮件存储起来。以及一个发送器,发送器不断的从队列中读取邮件并发送。还需要一个邮件写入工具,想要发送邮件的代码使用写入工具将邮件转储到队列中。

那么我们设计的邮件队列事实上就有了三个部分:

  • 队列存储提供器(邮件的事实存储)

  • 邮件发送机 (不断读取队列中的邮件,并发送)

  • 邮件服务 (想法送邮件时,调用邮件服务,邮件服务会将邮件写入队列)

队列存储提供器设计

那么我们设计的邮件队列提供器接口如下:

public interface IMailQueueProvider{   
 void Enqueue(MailBox mailBox);  
   bool TryDequeue(out MailBox mailBox);  
    int Count { get; }  
      bool IsEmpty { get; }    ...

四个方法,入队、出队、队列剩余邮件数量、队列是否是空,我们对队列的基本需求就是这样。

MailBox是对邮件的封装,并不复杂,稍后会介绍到

邮件服务设计

public interface IMailQueueService{   
     void Enqueue(MailBox box);

对于想要发送邮件的组件或者代码部分来讲,只需要将邮件入队,这就足够了

邮件发送机(兼邮件队列管理器)设计

public interface IMailQueueManager{  
 void Run();  
   void Stop();  
   bool IsRunning { get; }  
    int Count { get; }    

启动队列,停止队列,队列运行中状态,邮件计数

现在,三个主要部分就设计好了,我们先看下MailBox,接下来就去实现这三个接口

MailBox

MailBox 如下:

public class MailBox{ 
   public IEnumerable<IAttachment> Attachments { get; set; }
      public string Body { get; set; }  
        public IEnumerable<string> Cc { get; set; }
            public bool IsHtml { get; set; }
               public string Subject { get; set; }
                  public IEnumerable<string> To { get; set; }    ...

这里面没什么特殊的,大家一看便能理解,除了IEnumerable<IAttachment> Attachments { get; set; }

附件的处理

在发送邮件中最复杂的就是附件了,因为附件体积大,往往还涉及非托管资源(例如:文件),所以附件处理一定要小心,避免留下漏洞和bug。

在MailKit中附件实际上是流Stream,例如下面的代码:

attachment = new MimePart(contentType)
{
    Content = new MimeContent(fs),
    ContentDisposition = new ContentDisposition(ContentDisposition.Attachment),
    ContentTransferEncoding = ContentEncoding.Base64,
};
  
            

其中new MimeContent(fs)是创建的Content,fs是Stream,MimeContent的构造函数如下:

public MimeContent(Stream stream, ContentEncoding encoding = ContentEncoding.Default)

所以我们的设计的附件是基于Stream的。

一般情况附件是磁盘上的文件,或者内存流MemoryStream或者 byte[]数据。附件需要实际的文件的流Stream和一个附件名,所以附件接口设计如下:

public interface IAttachment : IDisposable{    Stream GetFileStream();    string GetName();

那么我们默认实现了两中附件类型 物理文件附件内存文件附件,byte[]数据可以轻松的转换成 内存流,所以没有写这种

MemoryStreamAttechment

public class MemoryStreamAttechment : IAttachment{  
 private readonly MemoryStream _stream;  
  private readonly string _fileName;  
   public MemoryStreamAttechment(MemoryStream stream, string fileName)    {        _stream = stream;        _fileName = fileName;    }  
   
   public void Dispose()        => _stream.Dispose();
   public Stream GetFileStream()        => _stream;  
   public string GetName()        => _fileName;

内存流附件实现要求在创建时传递一个 MemoryStream和附件名称,比较简单

物理文件附件

public class PhysicalFileAttachment : IAttachment{   
 public PhysicalFileAttachment(string absolutePath)    {  
 
      if (!File.Exists(absolutePath))        {          
       throw new FileNotFoundException("文件未找到", absolutePath);        }        AbsolutePath = absolutePath;    }    
       private FileStream _stream;  
         public string AbsolutePath { get; }  
           public void Dispose()    {        _stream.Dispose();    }  
        public Stream GetFileStream()    {    
            if (_stream == null)        {            _stream = new FileStream(AbsolutePath, FileMode.Open);        }        return _stream;    }    public string GetName()    {      
              return System.IO.Path.GetFileName(AbsolutePath);    ...    

这里,我们要注意的是创建FileStream的时机,是在请求GetFileStream方法时,而不是构造函数中,因为创建FileStreamFileStream会占用文件,如果我们发两封邮件使用了同一个附件,那么会抛出异常。而写在GetFileStream方法中相对比较安全(除非发送器是并行的)

实现邮件队列

在我们这篇文章中,我们实现的队列提供器是基于内存的,日后呢我们还可以实现其它的基于其它存储模式的,比如数据库,外部持久性队列等等,另外基于内存的实现不是持久的,一旦程序崩溃。未发出的邮件就会boom然后消失 XD...

邮件队列提供器IMailQueueProvider实现

代码如下:

public class MailQueueProvider : IMailQueueProvider{  
 private static readonly ConcurrentQueue<MailBox> _mailQueue = new ConcurrentQueue<MailBox>();    public int Count => _mailQueue.Count;    public bool IsEmpty => _mailQueue.IsEmpty;    public void Enqueue(MailBox mailBox)    {        _mailQueue.Enqueue(mailBox);    }    public bool TryDequeue(out MailBox mailBox)    {        return _mailQueue.TryDequeue(out mailBox);    }

本文的实现是一个 ConcurrentQueue

邮件服务IMailQueueService实现

代码如下:

public class MailQueueService : IMailQueueService{    private readonly IMailQueueProvider _provider;    /// <summary>
    /// 初始化实例
    /// </summary>
    /// <param name="provider"></param>
    public MailQueueService(IMailQueueProvider provider)    {
        _provider = provider;
    }    /// <summary>
    /// 入队
    /// </summary>
    /// <param name="box"></param>
    public void Enqueue(MailBox box)    {
        _provider.Enqueue(box);
    }    

这里,我们的服务依赖于IMailQueueProvider,使用了其入队功能

邮件发送机IMailQueueManager实现

这个相对比较复杂,我们先看下完整的类,再逐步解释:

public class MailQueueManager : IMailQueueManager{    private readonly SmtpClient _client;    private readonly IMailQueueProvider _provider;    private readonly ILogger<MailQueueManager> _logger;    private readonly EmailOptions _options;    private bool _isRunning = false;    private bool _tryStop = false;    private Thread _thread;    /// <summary>
    /// 初始化实例
    /// </summary>
    /// <param name="provider"></param>
    /// <param name="options"></param>
    /// <param name="logger"></param>
    public MailQueueManager(IMailQueueProvider provider, IOptions<EmailOptions> options, ILogger<MailQueueManager> logger)    {
        _options = options.Value;

        _client = new SmtpClient
        {            // For demo-purposes, accept all SSL certificates (in case the server supports STARTTLS)
            ServerCertificateValidationCallback = (s, c, h, e) => true
        };        // Note: since we don't have an OAuth2 token, disable
        // the XOAUTH2 authentication mechanism.

        if (_options.DisableOAuth)
        {
            _client.AuthenticationMechanisms.Remove("XOAUTH2");
        }

        _provider = provider;
        _logger = logger;
    }    /// <summary>
    /// 正在运行
    /// </summary>
    public bool IsRunning => _isRunning;    /// <summary>
    /// 计数
    /// </summary>
    public int Count => _provider.Count;    /// <summary>
    /// 启动队列
    /// </summary>
    public void Run()    {        if (_isRunning || (_thread != null && _thread.IsAlive))
        {
            _logger.LogWarning("已经运行,又被启动了,新线程启动已经取消");            return;
        }
        _isRunning = true;
        _thread = new Thread(StartSendMail)
        {
            Name = "PmpEmailQueue",
            IsBackground = true,
        };
        _logger.LogInformation("线程即将启动");
        _thread.Start();
        _logger.LogInformation("线程已经启动,线程Id是:{0}", _thread.ManagedThreadId);
    }    /// <summary>
    /// 停止队列
    /// </summary>
    public void Stop()    {        if (_tryStop)
        {            return;
        }
        _tryStop = true;
    }    private void StartSendMail()    {        var sw = new Stopwatch();        try
        {            while (true)
            {                if (_tryStop)
                {                    break;
                }                if (_provider.IsEmpty)
                {
                    _logger.LogTrace("队列是空,开始睡眠");
                    Thread.Sleep(_options.SleepInterval);                    continue;
                }                if (_provider.TryDequeue(out MailBox box))
                {
                    _logger.LogInformation("开始发送邮件 标题:{0},收件人 {1}", box.Subject, box.To.First());
                    sw.Restart();
                    SendMail(box);
                    sw.Stop();
                    _logger.LogInformation("发送邮件结束标题:{0},收件人 {1},耗时{2}", box.Subject, box.To.First(), sw.Elapsed.TotalSeconds);
                }
            }
        }        catch (Exception ex)
        {
            _logger.LogError(ex, "循环中出错,线程即将结束");
            _isRunning = false;
        }

        _logger.LogInformation("邮件发送线程即将停止,人为跳出循环,没有异常发生");
        _tryStop = false;
        _isRunning = false;
    }    private void SendMail(MailBox box)    {        if (box == null)
        {            throw new ArgumentNullException(nameof(box));
        }        try
        {
            MimeMessage message = ConvertToMimeMessage(box);
            SendMail(message);
        }        catch (Exception exception)
        {
            _logger.LogError(exception, "发送邮件发生异常主题:{0},收件人:{1}", box.Subject, box.To.First());
        }        finally
        {            if (box.Attachments != null && box.Attachments.Any())
            {                foreach (var item in box.Attachments)
                {
                    item.Dispose();
                }
            }
        }
    }    private MimeMessage ConvertToMimeMessage(MailBox box)    {        var message = new MimeMessage();        var from = InternetAddress.Parse(_options.UserName);        from.Name = _options.DisplayName;

        message.From.Add(from);        if (!box.To.Any())
        {            throw new ArgumentNullException("to必须含有值");
        }
        message.To.AddRange(box.To.Convert());        if (box.Cc != null && box.Cc.Any())
        {
            message.Cc.AddRange(box.Cc.Convert());
        }

        message.Subject = box.Subject;        var builder = new BodyBuilder();        if (box.IsHtml)
        {
            builder.HtmlBody = box.Body;
        }        else
        {
            builder.TextBody = box.Body;
        }        if (box.Attachments != null && box.Attachments.Any())
        {            foreach (var item in GetAttechments(box.Attachments))            {
                builder.Attachments.Add(item);
            }
        }

        message.Body = builder.ToMessageBody();        return message;
    }    private void SendMail(MimeMessage message)    {        if (message == null)
        {            throw new ArgumentNullException(nameof(message));
        }        try
        {
            _client.Connect(_options.Host, _options.Port, false);            // Note: only needed if the SMTP server requires authentication
            if (!_client.IsAuthenticated)
            {
                _client.Authenticate(_options.UserName, _options.Password);
            }
            _client.Send(message);
        }        finally
        {
            _client.Disconnect(false);
        }
    }    private AttachmentCollection GetAttechments(IEnumerable<IAttachment> attachments)    {        if (attachments == null)
        {            throw new ArgumentNullException(nameof(attachments));
        }

        AttachmentCollection collection = new AttachmentCollection();
        List<Stream> list = new List<Stream>(attachments.Count());        foreach (var item in attachments)
        {            var fileName = item.GetName();            var fileType = MimeTypes.GetMimeType(fileName);            var contentTypeArr = fileType.Split('/');            var contentType = new ContentType(contentTypeArr[0], contentTypeArr[1]);
            MimePart attachment = null;
            Stream fs = null;            try
            {
                fs = item.GetFileStream();
                list.Add(fs);
            }            catch (Exception ex)
            {
                _logger.LogError(ex, "读取文件流发生异常");
                fs?.Dispose();                continue;
            }

            attachment = new MimePart(contentType)
            {
                Content = new MimeContent(fs),
                ContentDisposition = new ContentDisposition(ContentDisposition.Attachment),
                ContentTransferEncoding = ContentEncoding.Base64,
            };            var charset = "UTF-8";
            attachment.ContentType.Parameters.Add(charset, "name", fileName);
            attachment.ContentDisposition.Parameters.Add(charset, "filename", fileName);            foreach (var param in attachment.ContentDisposition.Parameters)
            {
                param.EncodingMethod = ParameterEncodingMethod.Rfc2047;
            }            foreach (var param in attachment.ContentType.Parameters)
            {
                param.EncodingMethod = ParameterEncodingMethod.Rfc2047;
            }

            collection.Add(attachment);
        }        return collection;
    }
}

在构造函数中请求了另外三个服务,并且初始化了SmtpClient(这是MailKit中的)

    public MailQueueManager(        IMailQueueProvider provider,        IOptions<EmailOptions> options,        ILogger<MailQueueManager> logger)    {
        _options = options.Value;

        _client = new SmtpClient
        {            // For demo-purposes, accept all SSL certificates (in case the server supports STARTTLS)
            ServerCertificateValidationCallback = (s, c, h, e) => true
        };        // Note: since we don't have an OAuth2 token, disable
        // the XOAUTH2 authentication mechanism.

        if (_options.DisableOAuth)
        {
            _client.AuthenticationMechanisms.Remove("XOAUTH2");
        }

        _provider = provider;
        _logger = logger;
    }

启动队列时创建了新的线程,并且将线程句柄保存起来:

    public void Run()    {        if (_isRunning || (_thread != null && _thread.IsAlive))
        {
            _logger.LogWarning("已经运行,又被启动了,新线程启动已经取消");            return;
        }
        _isRunning = true;
        _thread = new Thread(StartSendMail)
        {
            Name = "PmpEmailQueue",
            IsBackground = true,
        };
        _logger.LogInformation("线程即将启动");
        _thread.Start();
        _logger.LogInformation("线程已经启动,线程Id是:{0}", _thread.ManagedThreadId);
    }

线程启动时运行了方法StartSendMail

    private void StartSendMail()    {        var sw = new Stopwatch();        try
        {            while (true)
            {                if (_tryStop)
                {                    break;
                }                if (_provider.IsEmpty)
                {
                    _logger.LogTrace("队列是空,开始睡眠");
                    Thread.Sleep(_options.SleepInterval);                    continue;
                }                if (_provider.TryDequeue(out MailBox box))
                {
                    _logger.LogInformation("开始发送邮件 标题:{0},收件人 {1}", box.Subject, box.To.First());
                    sw.Restart();
                    SendMail(box);
                    sw.Stop();
                    _logger.LogInformation("发送邮件结束标题:{0},收件人 {1},耗时{2}", box.Subject, box.To.First(), sw.Elapsed.TotalSeconds);
                }
            }
        }        catch (Exception ex)
        {
            _logger.LogError(ex, "循环中出错,线程即将结束");
            _isRunning = false;
        }

        _logger.LogInformation("邮件发送线程即将停止,人为跳出循环,没有异常发生");
        _tryStop = false;
        _isRunning = false;
    }    

这个方法不断的从队列读取邮件并发送,当 遇到异常,或者_tryStoptrue时跳出循环,此时线程结束,注意我们会让线程睡眠,在适当的时候。

接下来就是方法SendMail了:

    private void SendMail(MailBox box)    {        if (box == null)
        {            throw new ArgumentNullException(nameof(box));
        }        try
        {
            MimeMessage message = ConvertToMimeMessage(box);
            SendMail(message);
        }        catch (Exception exception)
        {
            _logger.LogError(exception, "发送邮件发生异常主题:{0},收件人:{1}", box.Subject, box.To.First());
        }        finally
        {            if (box.Attachments != null && box.Attachments.Any())
            {                foreach (var item in box.Attachments)
                {
                    item.Dispose();
                ...                

这里有一个特别要注意的就是在发送之后释放附件(非托管资源):

foreach (var item in box.Attachments)
{
    item.Dispose();
    ...

发送邮件的核心代码只有两行:

MimeMessage message = ConvertToMimeMessage(box);SendMail(message);

第一行将mailbox转换成 MailKit使用的MimeMessage实体,第二步切实的发送邮件

为什么,我们的接口中没有直接使用MimeMessage而是使用MailBox?

因为MimeMessage比较繁杂,而且附件的问题不易处理,所以我们设计接口时单独封装MailBox简化了编程接口

转换一共两步,1是主体转换,比较简单。二是附件的处理这里涉及到附件名中文编码的问题。

    private MimeMessage ConvertToMimeMessage(MailBox box)    {        var message = new MimeMessage();        var from = InternetAddress.Parse(_options.UserName);        from.Name = _options.DisplayName;

        message.From.Add(from);        if (!box.To.Any())
        {            throw new ArgumentNullException("to必须含有值");
        }
        message.To.AddRange(box.To.Convert());        if (box.Cc != null && box.Cc.Any())
        {
            message.Cc.AddRange(box.Cc.Convert());
        }

        message.Subject = box.Subject;        var builder = new BodyBuilder();        if (box.IsHtml)
        {
            builder.HtmlBody = box.Body;
        }        else
        {
            builder.TextBody = box.Body;
        }        if (box.Attachments != null && box.Attachments.Any())
        {            foreach (var item in GetAttechments(box.Attachments))            {
                builder.Attachments.Add(item);
            }
        }

        message.Body = builder.ToMessageBody();        return message;
    }    private AttachmentCollection GetAttechments(IEnumerable<IAttachment> attachments)    {        if (attachments == null)
        {            throw new ArgumentNullException(nameof(attachments));
        }

        AttachmentCollection collection = new AttachmentCollection();
        List<Stream> list = new List<Stream>(attachments.Count());        foreach (var item in attachments)
        {            var fileName = item.GetName();            var fileType = MimeTypes.GetMimeType(fileName);            var contentTypeArr = fileType.Split('/');            var contentType = new ContentType(contentTypeArr[0], contentTypeArr[1]);
            MimePart attachment = null;
            Stream fs = null;            try
            {
                fs = item.GetFileStream();
                list.Add(fs);
            }            catch (Exception ex)
            {
                _logger.LogError(ex, "读取文件流发生异常");
                fs?.Dispose();                continue;
            }

            attachment = new MimePart(contentType)
            {
                Content = new MimeContent(fs),
                ContentDisposition = new ContentDisposition(ContentDisposition.Attachment),
                ContentTransferEncoding = ContentEncoding.Base64,
            };            var charset = "UTF-8";
            attachment.ContentType.Parameters.Add(charset, "name", fileName);
            attachment.ContentDisposition.Parameters.Add(charset, "filename", fileName);            foreach (var param in attachment.ContentDisposition.Parameters)
            {
                param.EncodingMethod = ParameterEncodingMethod.Rfc2047;
            }            foreach (var param in attachment.ContentType.Parameters)
            {
                param.EncodingMethod = ParameterEncodingMethod.Rfc2047;
            }

            collection.Add(attachment);
        }        return collection;
    }

在转化附件时下面的代码用来处理附件名编码问题:

var charset = "UTF-8";
attachment.ContentType.Parameters.Add(charset, "name", fileName);
attachment.ContentDisposition.Parameters.Add(charset, "filename", fileName);foreach (var param in attachment.ContentDisposition.Parameters)
{
    param.EncodingMethod = ParameterEncodingMethod.Rfc2047;
}foreach (var param in attachment.ContentType.Parameters)
{
    param.EncodingMethod = ParameterEncodingMethod.Rfc2047;
}

到这了我们的邮件队列就基本完成了,接下来就是在程序启动后,启动队列,找到 Program.cs文件,并稍作改写如下:

var host = BuildWebHost(args);var provider = host.Services;
provider.GetRequiredService<IMailQueueManager>().Run();
host.Run();

这里在host.Run()主机启动之前,我们获取了IMailQueueManager并启动队列(别忘了注册服务)。

运行程序我们会看到控制台每隔3秒就会打出日志:

info: Microsoft.AspNetCore.DataProtection.KeyManagement.XmlKeyManager[0]
      User profile is available. Using 'C:\Users\Administrator\AppData\Local\ASP.NET\DataProtection-Keys' as key repository and Windows DPAPI to encrypt keys at rest.info: MailQueueManager[0]
      线程即将启动info: MailQueueManager[0]
      线程已经启动,线程Id是:9trce: MailQueueManager[0]
      队列是空,开始睡眠
Hosting environment: Development
Content root path: D:\publish
Now listening on: http://[::]:5000Application started. Press Ctrl+C to shut down.trce: MailQueueManager[0]
      队列是空,开始睡眠trce: MailQueueManager[0]
      队列是空,开始睡眠

到此,我们的邮件队列就完成了! :D

原文地址http://www.cnblogs.com/rocketRobin/p/9294845.html

.NET社区新闻,深度好文,欢迎访问公众号文章汇总 http://www.csharpkit.com


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK