c#基于Redis实现轻量级消息组件的步骤

最近在开发一个轻量级ASP.NET MVC开发框架,需要加入日志记录,邮件发送,短信发送等功能,为了保持模块的独立性,所以需要通过消息通信的方式进行处理,为了保持框架在部署,使用,二次开发过程中的简易便捷性,所以没有选择传统的MQ,而是基于Redis的订阅发布实现一个系统内部消息组件,话不多说,上码!

数据结构定义

消息实体包含几个部分,订阅通道名称,信息头,信息体,信息差异化额外信息字典,信息头主要包含消息标识,消息日期,信息体包含信息内容,信息实体类型等

public class Message
 {
     public string MessageChannel { set; get; }
     public MessageHead @MessageHead { set; get; }
     public MessageBody @MessageBody { set; get; }
 
     [JsonExtensionData]
     public Dictionary<string,Object> @MessageExtra { set; get; }
 
     public Message()
     {
 
     }
 
     public void AddExtra(string Name, string Value)
     {
         if (@MessageExtra == null)
         {
             @MessageExtra = new Dictionary<string, object>();
         }
         @MessageExtra.Add(Name, Value);
     }
 
     public Object GetExtra(string Name)
     {
         return @MessageExtra[Name];
     }
 }
 
 public class MessageHead
 {
     public string MessageID { set; get; }
     public DateTime MessageDate { set; get; }
 
     public MessageHead()
     {
         MessageID = CommonUtil.CreateCommonGuid();
         MessageDate = DateTime.Now;
     }
 }
 
 public class MessageBody
 {
     public string MessageJsonContent { set; get; }
     public Type MessageMapperType { set; get; }
 }

注:因为消息订阅发布传递过程中,我是通过Json序列化传输的,使用过程中可能需要一些额外的键值对信息,这里在对象中定义的是Dictinary对象,但是Dictinary本身是不支持序列化的,所以需要加上注解JsonExtensionData

订阅通道声明

我们需要达到的效果是,在系统启动时,所有消息通道可以根据系统中的应用自动订阅,这里就需要一个注解来标识我们的订阅通道接收消息的实现类

[AttributeUsage(AttributeTargets.Class)]
    public class MessageChanelAttribute : Attribute
    {
        private string _ChannleName;
        public string ChannelName
        {
            get
            {
                return this._ChannleName;
            }
            set
            {
                this._ChannleName = value;
            }
 
        }
    }

消息的个性化策略处理

Redis的三方库我这里使用的是StackExchange.Redis.dll,在消息订阅时,需要为Channel指定接收到消息时的处理委托,我们在自动订阅的过程中肯定也要收集好各类消息处理类并与Channel一一对应,这时候我们就需要一个基类FastDefaultMessageHandler,我们的具体的消息处理类继承自FastDefaultMessageHandler,重写处理方法即可

[Component]
   [MessageChanelAttribute(ChannelName = "DefaultMessage")]
   public class FastDefaultMessageHandler : IFastMessageHandle
   {
       [AutoWired]
       public DBUtil @DBUtil;
 
       public void HandleMessage(RedisChannel ChannelName, RedisValue Message)
       {
           FastExecutor.Message.Design.Message Entity = JsonConvert.DeserializeObject<FastExecutor.Message.Design.Message>(Message);
           try
           {
               if (!CheckMessageIsConsume(Entity))
               {
                   this.CustomHandle(Entity);
               }
           }
           catch (Exception e)
           {
               StringBuilder ExceptionLog = new StringBuilder();
               ExceptionLog.AppendFormat("异常Message所属Channel:{0}", Entity.MessageChannel + Environment.NewLine);
               ExceptionLog.AppendFormat("异常Message插入时间:{0}", Entity.MessageHead.MessageDate.ToString() + Environment.NewLine);
               ExceptionLog.AppendFormat("异常Message内容:{0}", Message + Environment.NewLine);
               ExceptionLog.AppendFormat("异常信息:{0}", e.Message + Environment.NewLine);
               LogUtil.WriteLog("Logs/MessageErrorLog", "log_", ExceptionLog.ToString() + Environment.NewLine);
               ExceptionLog.AppendFormat("========================================================================================================================================================================" + Environment.NewLine);
               MessageACK.MoveMessageToExceptionChannel(Entity.MessageChannel, Entity);
           }
           finally
           {
               MessageACK.ConfirmMessageFinish(Entity.MessageChannel, Entity.MessageHead.MessageID);
           }
 
       }
 
       public virtual void CustomHandle(FastExecutor.Message.Design.Message @Message)
       {
 
       }
 
       public virtual bool CheckMessageIsConsume(FastExecutor.Message.Design.Message @Message)
       {
           return false;
       }
   }

其中的HandleMessage方法就是我们在订阅Channel时对应的委托,会调用类中的CustomHandle的虚方法,子类继承重写该方法就会基于多态进行策略调用,CheckMessageIsConsume方法是用于确认消息是否重复消费的,也可以被重写,下面看一个访问日志类的实例,使用MessageChanelAttribute标注声明该实现类需要订阅发布的Channel名称为Visit,CustomHandle方法中实现了插入数据库操作,CheckMessageIsConsume方法判断该条日志数据是否已消费(已经存在于数据库)

[MessageChanelAttribute(ChannelName = "Visit")]
public class VisitLog : FastDefaultMessageHandler
{
    public override void CustomHandle(Message.Design.Message Message)
    {
        Frame_VisitLog LogEntity = JsonConvert.DeserializeObject<Frame_VisitLog>(Message.MessageBody.MessageJsonContent);
        @DBUtil.Insert(LogEntity);
        base.CustomHandle(Message);
    }
 
    public override bool CheckMessageIsConsume(Message.Design.Message Message)
    {
        Frame_VisitLog LogEntity = JsonConvert.DeserializeObject<Frame_VisitLog>(Message.MessageBody.MessageJsonContent);
        DBRow Row = new DBRow("Frame_VisitLog", "RowGuid", LogEntity.RowGuid);
        if (Row.IsExist())
        {
            return true;
        }
        else
        {
            return false;
        }
    }
}

消息自动订阅

我们希望系统在启动时就寻找出定义好Channel和实现类,自动实现订阅,这里就需要用到IOC容器,启动系统时将所有的消息处理类放入容器中,在自动订阅时全部取出来,根据消息处理类中声明的Channel名称进行自动订阅

public void Init()
      {
          List<Type> HandlerTypeList = InjectUtil.Container.GetRegistType(typeof(IFastMessageHandle));
          foreach (Type HandlerType in HandlerTypeList)
          {
              MessageChanelAttribute Channel = Attribute.GetCustomAttribute(HandlerType, typeof(MessageChanelAttribute)) as MessageChanelAttribute;
              RedisUtil.Subscribe(Channel.ChannelName, ((FastDefaultMessageHandler)InjectUtil.Container.Resolve(HandlerType)).HandleMessage);
          }
      }

注:

1.这里的IOC容器是我自己实现的,地址:https://gitee.com/grassprogramming/FastIOC,大家可以用AutoFac代替

2.RedisUtil是对StackExchange.Redis.dll封装的处理类,地址:https://gitee.com/grassprogramming/FastUtil

消息发送

消息只需要调用Redis的发布方法即可,将Channel名称与定义好的数据实体类传入,序列化为Json

public void SendMessage<T>(string ChannleName, T CustomMessageEntity, Dictionary<string, string> ExtraData = null)
   {
       FastExecutor.Message.Design.Message MessageEntity = new Design.Message();
       MessageEntity.MessageChannel = ChannleName;
       MessageHead Head = new MessageHead();
       MessageBody Body = new MessageBody();
       Body.MessageMapperType = typeof(T);
       Body.MessageJsonContent = JsonConvert.SerializeObject(CustomMessageEntity);
       MessageEntity.MessageHead = Head;
       MessageEntity.MessageBody = Body;
       if (ExtraData != null)
       {
           foreach (var item in ExtraData)
           {
               MessageEntity.AddExtra(item.Key, item.Value);
           }
       }
       RedisUtil.Publish(ChannleName, MessageEntity);
       MessageACK.CopyMessageToACKList(ChannleName, MessageEntity);
   }

消息确认与存储

Redis作订阅发布模式作为消息组件的问题有两方面

问题:消息消费完没有确认机制

解决方案

基于Redis的Hash存储方式建立一个消息存储字段,在发送消息时拷贝到消息Hash字典中,消费完毕后再删除,对应SendMessage中的MessageACK.CopyMessageToACKList方法和FastDefaultMessageHandler中的MessageACK.ConfirmMessageFinish方法,本质就是对Hash字典的增加与删除功能

问题:消息处理端挂了再次重启消息会丢失

解决方案

确认机制已经保证了消息即使没有被消费完但是处理端宕机消息也不会丢失,需要注意的是,消息没有丢失仅仅是Hash字典中有存储,但是消息通道中不存在了,所以我们在系统每次启动时扫描这个Hash字典,重新发布消息到Channel,这样可能导致重复消费,所以需要靠FastDefaultMessageHandler中的CheckMessageIsConsume方法判断,同时消息处理者本身处理异常我们也需要记录下来,比如发短信供应商接口有问题,消息处理异常会进入Redis的ChannelException通道,我们可以根据需求实现一个可视化界面决定是否通过手动恢复

最后

Message组件相关代码地址:https://gitee.com/grassprogramming/FastExecutor/tree/master/code/FastExecutor/FastExecutor.Message

存在不足问题:如果消息是单纯记录日志问题,没办法确认消息是否消费了

如果大家有什么好的建议,可留言一起交流学习,共同进步

以上就是c#基于Redis实现轻量级消息组件的步骤的详细内容,更多关于c#基于Redis实现消息组件的资料请关注我们其它相关文章!

(0)

相关推荐

  • C# Redis学习系列(一)Redis下载安装使用

    下一篇:C# Redis学习系列二:Redis基本设置 一.认识Redis 1. Redis 是一个高性能的key-value数据库. 2. 它支持存储的value类型相对更多,包括string(字符串).list(链表).set(集合).zset(sorted set --有序集合)和hash(哈希类型). 3.周期性的把更新的数据写入磁盘或者把修改操作写入追加的记录文件 4.别人说的 比我好 Redis百度百科 二.下载 为了匹配我的教程,我推荐下载 redis-2.8.2400 三.如何安

  • c#操作Redis的5种基本类型汇总

    前言 在我们的项目中,通常会把数据存储到关系型数据库中,比如Oracle,SQL Server,Mysql等,但是关系型数据库对于并发的支持并不是很强大,这样就会造成系统的性能不佳,而且存储的数据多为结构化数据,对于非结构数据(比如文本)和半结构化数据(比如JSon) 就显得不够灵活,而非关系型数据库则很好的弥补了这两点, 我们通常把读操作频繁的数据写入Redis中,以Key-value的方式存储来提高性能. Redis支持5种数据类型:string(字符串),hash(哈希),list(列表)

  • C#中如何使用redis

    redis 是一个非关系型高性能的key-value数据库.和Memcached类似,它支持存储的value类型相对更多,包括string(字符串).list(链表).set(集合).zset(sorted set --有序集合)和hash(哈希类型).这些数据类型都支持push/pop.add/remove及取交集并集和差集及更丰富的操作,而且这些操作都是原子性的.在此基础上,redis支持各种不同方式的排序.与memcached一样,为了保证效率,数据都是缓存在内存中.区别的是redis会周

  • Redis总结笔记(二):C#连接Redis简单例子

    注:C#在调用Redis是不要使用ServiceStack.Redis驱动的4.0版本,因为这个版本已经商业化了,会出现每小时6000条数据的限制 1.引用驱动 复制代码 代码如下: using ServiceStack.Redis; 2.数据库连接 复制代码 代码如下: RedisClient client;             //连接服务器   6379是redis的默认端口             client = new RedisClient("127.0.0.1",

  • C#使用Redis的基本操作

    一,引入dll 1.ServiceStack.Common.dll 2.ServiceStack.Interfaces.dll 3.ServiceStack.Redis.dll 4.ServiceStack.Text.dll 二,修改配置文件 在你的配置文件中加入如下的代码: <appSettings> <add key="RedisPath" value="127.0.0.1:6379"/> todo:这里配置自己redis的ip地址和端口

  • c#使用csredis操作redis的示例

    现在流行的redis连接客户端有StackExchange.Redis和ServiceStack.Redis,为什么选择csredis而不是这两个? .net 最有名望的 ServiceStack.Redis 早已沦为商业用途,在 .NETCore 中使用只能充值: 后来居上的 StackExchange.Redis 虽然能用,但线上各种 Timeout 错误把人坑到没脾气,两年多两年多两年多都不解决,最近发布的 2.0 版本不知道是否彻底解决了底层. csredis支持.net40/.net4

  • 在c#中使用servicestackredis操作redis的实例代码

    声明一个客户端对象: 复制代码 代码如下: protected RedisClient Redis = new RedisClient("127.0.0.1", 6379);//redis服务IP和端口 一 .基本KEY/VALUE键值对操作: 1. 添加/获取: List<string> storeMembers = new List<string>(); storeMembers.ForEach(x => Redis.AddItemToList(&q

  • C# Redis学习系列(二)Redis基本设置

    上一篇:C# Redis学习系列一:Redis的认识.下载.安装.使用 一.redis 设置密码 使用下载好的 redis-cli.exe 指令: 1.设置密码:config set requirepass 123456 2.查看:info(验证无法通过) 3.授权登陆 auth 123456 二.Redis 更改端口(如从 6379 改到 6820) 1.打开下图:redis.conf 2.将 6379 替代为 6820 保存 3.如何开启?直接打开 redis-server.exe 你会发现

  • C# 通过ServiceStack 操作Redis

    作       者 : 明志德道 1.引用Nuget包 ServiceStack.Redis 我这里就用别人已经封装好的Reids操作类来和大家一起参考了下,看看怎么使用ServiceStack.Redis 操作Redis数据 RedisConfigInfo--redis配置文件信息 /// <summary> /// redis配置文件信息 /// 也可以放到配置文件去 /// </summary> public sealed class RedisConfigInfo { //

  • C#实现redis读写的方法

    最近做一个C#项目,需要对radis进行读写. 首先引入System.Configuration,如下 实现代码如下: public class ManualSuggestRedisHelper { private static IRedisClient GetManualSuggestClient() { var config = ConfigurationManager.ConnectionStrings["REDIS_MANUAL_VIDEO_LIST"].ConnectionS

  • 基于C# 写一个 Redis 数据同步小工具

    概念 Redis是一个开源的使用ANSI C语言编写.支持网络.可基于内存亦可持久化的日志型.Key-Value数据库,和Memcached类似,它支持存储的value类型相对更多,包括string(字符串).list(链表).set(集合).zset(sorted set --有序集合)和hash(哈希类型).在此基础上,redis支持各种不同方式的排序.与memcached一样,为了保证效率,数据都是缓存在内存中.区别的是redis会周期性的把更新的数据写入磁盘或者把修改操作写入追加的记录文

随机推荐