C# 基于消息发布订阅模型的示例(上)

  在我们的开发过程中,我们经常会遇到这样的场景就是一个对象的其中的一些状态依赖于另外的一个对象的状态,而且这两个对象之间彼此是没有关联的,及两者之间的耦合性非常低,特别是在这种基于容器模型的开发中遇到的会非常多,比如Prism框架或者MEF这种框架中,而我们会发现在这样的系统中我们经常使用一种Publish和Subscribe的模式来进行交互,这种交互有什么好处呢?基于带着这些问题的思考,我们来一步步来剖析!

  首先第一步就是定义一个叫做IEventAggregator的接口,里面定义了一些重载的Subscribe和Publish方法,我们具体来看一看这个接口:

/// <summary>
   ///   Enables loosely-coupled publication of and subscription to events.
   /// </summary>
   public interface IEventAggregator
   {
       /// <summary>
       ///   Gets or sets the default publication thread marshaller.
       /// </summary>
       /// <value>
       ///   The default publication thread marshaller.
       /// </value>
       Action<System.Action> PublicationThreadMarshaller { get; set; }
 
       /// <summary>
       ///   Subscribes an instance to all events declared through implementations of <see cref = "IHandle{T}" />
       /// </summary>
       /// <param name = "instance">The instance to subscribe for event publication.</param>
       void Subscribe(object instance);
 
       /// <summary>
       ///   Unsubscribes the instance from all events.
       /// </summary>
       /// <param name = "instance">The instance to unsubscribe.</param>
       void Unsubscribe(object instance);
 
       /// <summary>
       ///   Publishes a message.
       /// </summary>
       /// <param name = "message">The message instance.</param>
       /// <remarks>
       ///   Uses the default thread marshaller during publication.
       /// </remarks>
       void Publish(object message);
 
       /// <summary>
       ///   Publishes a message.
       /// </summary>
       /// <param name = "message">The message instance.</param>
       /// <param name = "marshal">Allows the publisher to provide a custom thread marshaller for the message publication.</param>
       void Publish(object message, Action<System.Action> marshal);
   }

  有了这个接口,接下来就是怎样去实现这个接口中的各种方法,我们来看看具体的实现过程。

/// <summary>
    ///   Enables loosely-coupled publication of and subscription to events.
    /// </summary>
    public class EventAggregator : IEventAggregator
    {
        /// <summary>
        ///   The default thread marshaller used for publication;
        /// </summary>
        public static Action<System.Action> DefaultPublicationThreadMarshaller = action => action();
 
        readonly List<Handler> handlers = new List<Handler>();
 
        /// <summary>
        ///   Initializes a new instance of the <see cref = "EventAggregator" /> class.
        /// </summary>
        public EventAggregator()
        {
            PublicationThreadMarshaller = DefaultPublicationThreadMarshaller;
        }
 
        /// <summary>
        ///   Gets or sets the default publication thread marshaller.
        /// </summary>
        /// <value>
        ///   The default publication thread marshaller.
        /// </value>
        public Action<System.Action> PublicationThreadMarshaller { get; set; }
 
        /// <summary>
        ///   Subscribes an instance to all events declared through implementations of <see cref = "IHandle{T}" />
        /// </summary>
        /// <param name = "instance">The instance to subscribe for event publication.</param>
        public virtual void Subscribe(object instance)
        {
            lock(handlers)
            {
                if (handlers.Any(x => x.Matches(instance)))
                {
                    return;
                }                   
                handlers.Add(new Handler(instance));
            }
        }
 
        /// <summary>
        ///   Unsubscribes the instance from all events.
        /// </summary>
        /// <param name = "instance">The instance to unsubscribe.</param>
        public virtual void Unsubscribe(object instance)
        {
            lock(handlers)
            {
                var found = handlers.FirstOrDefault(x => x.Matches(instance));
                if (found != null)
                {
                   handlers.Remove(found);
                }                  
            }
        }
 
        /// <summary>
        ///   Publishes a message.
        /// </summary>
        /// <param name = "message">The message instance.</param>
        /// <remarks>
        ///   Does not marshall the the publication to any special thread by default.
        /// </remarks>
        public virtual void Publish(object message)
        {
            Publish(message, PublicationThreadMarshaller);
        }
 
        /// <summary>
        ///   Publishes a message.
        /// </summary>
        /// <param name = "message">The message instance.</param>
        /// <param name = "marshal">Allows the publisher to provide a custom thread marshaller for the message publication.</param>
        public virtual void Publish(object message, Action<System.Action> marshal)
        {
            Handler[] toNotify;
            lock (handlers)
            {
                toNotify = handlers.ToArray();
            }
            marshal(() =>
            {
                var messageType = message.GetType();
                var dead = toNotify
                    .Where(handler => !handler.Handle(messageType, message))
                    .ToList();
 
                if(dead.Any())
                {
                    lock(handlers)
                    {
                        foreach(var handler in dead)
                        {
                            handlers.Remove(handler);
                        }
                    }
                }
            });
        }
 
        protected class Handler
        {
            readonly WeakReference reference;
            readonly Dictionary<Type, MethodInfo> supportedHandlers = new Dictionary<Type, MethodInfo>();
 
            public Handler(object handler)
            {
                reference = new WeakReference(handler);
 
                var interfaces = handler.GetType().GetInterfaces()
                    .Where(x => typeof(IHandle).IsAssignableFrom(x) && x.IsGenericType);
 
                foreach(var @interface in interfaces)
                {
                    var type = @interface.GetGenericArguments()[0];
                    var method = @interface.GetMethod("Handle");
                    supportedHandlers[type] = method;
                }
            }
 
            public bool Matches(object instance)
            {
                return reference.Target == instance;
            }
 
            public bool Handle(Type messageType, object message)
            {
                var target = reference.Target;
                if(target == null)
                    return false;
 
                foreach(var pair in supportedHandlers)
                {
                    if(pair.Key.IsAssignableFrom(messageType))
                    {
                        pair.Value.Invoke(target, new[] { message });
                        return true;
                    }
                }
                return true;
            }
        }
    }

  首先在EventAggregator的内部维护了一个LIst<Handler>的List对象,用来存放一系列的Handle,那么这个嵌套类Handler到底起什么作用呢?

  我们会发现在每一次当执行这个Subscribe的方法的时候,会将当前object类型的参数instance传入到Handler这个对象中,在Handler这个类的构造函数中,首先将这个instance放入到一个弱引用中去,然后再获取这个对象所有继承的接口,并查看是否继承了IHandle<TMessage>这个泛型的接口,如果能够获取到,那么就通过反射获取到当前instance中定义的Handle方法,并获取到其中定义的表示泛型类型的类型实参或泛型类型定义的类型形参,并把这两个对象放到内部定义的一个Dictionary<Type, MethodInfo>字典之中,这样就把这样一个活得具体的处理方法的Handler对象放到了一个List<Handler>集合中,这个就是订阅消息的核心部分,所以当前的对象要想订阅一个消息,那么必须实现泛型接口IHandle<TMessage>,并且实现接口中的方法,同时最重要的就是在当前对象的构造函数函数中去订阅消息(即执行Subscribe(this),我们来看一看这个泛型接口IHandle<TMessage> 

public interface IHandle {}
 
/// <summary>
///   Denotes a class which can handle a particular type of message.
/// </summary>
/// <typeparam name = "TMessage">The type of message to handle.</typeparam>
public interface IHandle<TMessage> : IHandle
{
    /// <summary>
    ///   Handles the message.
    /// </summary>
    /// <param name = "message">The message.</param>
    void Handle(TMessage message);
}

  在看完了Subscribe这个方法后,后面我们就来看看Unsubscribe方法吧,这个思路其实很简单就是找到List<Handler>中的这个对象,并且移除当前的对象就可以了,那么下面我们关注的重点就是Publish这个方法中到底实现了什么?首先来看看代码,然后再来做一步步分析。 

/// <summary>
        ///   Publishes a message.
        /// </summary>
        /// <param name = "message">The message instance.</param>
        /// <param name = "marshal">Allows the publisher to provide a custom thread marshaller for the message publication.</param>
        public virtual void Publish(object message, Action<System.Action> marshal)
        {
            Handler[] toNotify;
            lock (handlers)
            {
                toNotify = handlers.ToArray();
            }
            marshal(() =>
            {
                var messageType = message.GetType();
                var dead = toNotify
                    .Where(handler => !handler.Handle(messageType, message))
                    .ToList();
 
                if(dead.Any())
                {
                    lock(handlers)
                    {
                        foreach(var handler in dead)
                        {
                            handlers.Remove(handler);
                        }
                    }
                }
            });
        }

  我们看到,在发布一个object类型的message的时候,必然对应着另外的一个对象来处理这个消息,那么怎样找到这个消息的处理这呢?

  对,我们在Subscribe一个对象的时候不是已经通过反射将订阅这个消息的对象及方法都存在了一个List<Handler>中去了吗?那么我们只需要在这个List中找到对应的和message类型一致的那个对象并执行里面的Handle方法不就可以了吗?确实是一个很好的思路,这里我们看代码也是这样实行的。

  这里面还有一个要点就是,如果执行的方法返回了false,就是执行不成功,那么就从当前的List<Handler>中移除掉这个对象,因为这样的操作是没有任何意义的,通过这样的过程我们就能够完没地去实现两个对象之间的消息传递了,另外我们通过总结以后就能够发现,这个思路实现的重点包括以下方面:

  1 所有消息订阅的对象必须实现统一的接口IHandle<TMessage>,并实现里面的Handel方法。

  2 整个EventAggregator必须是单实例或者是静态的,这样才能够在统一的集合中去实现上述的各种操作。

  最后还是按照之前的惯例,最后给出一个具体的实例来做相关的说明,请点击此处进行下载,在下篇中我们将介绍一种简单版的基于事件的发布和订阅模式的例子。

以上就是C# 基于消息发布订阅模型的示例(上)的详细内容,更多关于c# 发布订阅模型的资料请关注我们其它相关文章!

(0)

相关推荐

  • C# API中模型与它们的接口设计详解

    关键要点 可变模型应该具备自我验证的能力,并实现验证接口. 在共享对象时(特别是在跨线程共享时),考虑使用不可变模型. 考虑支持MVVM风格UI的单层和多层撤消. 在实现属性变更通知时避免不必要的内存分配. 不要覆盖模型的Equals和GetHashCode方法. 在传统的MVC.MVP.MVVM.Web MVC这些UI模式中,模型是一个公共元素.虽然有很多文章讨论这些架构中的视图和控制器,但几乎无一涉及模型.在本文中,我们将讨论模型本身以及相应的.NET接口. 我想先定义一些术语,这些术语在其

  • 支持多类型数据库的c#数据库模型示例

    DataAccess.cs 复制代码 代码如下: using System;using System.Collections.Generic;using System.Text; namespace DynamicFramework{    public abstract class DataAccess : MarshalByRefObject    {        protected System.Data.Common.DbConnection connection;        p

  • C#开发之Socket网络编程TCP/IP层次模型、端口及报文等探讨

    1.TCP/IP层次模型 当然这里我们只讨论重要的四层 01,应用层(Application):应用层是个很广泛的概念,有一些基本相同的系统级TCP/IP应用以及应用协议,也有许多的企业应用和互联网应用.http协议在应用层运行. 02,传输层(Tanspot):传输层包括UDP和TCP,UDP几乎不对报文进行检查,而TCP提供传输保证. 03,网络层(Netwok):网络层协议由一系列协议组成,包括ICMP.IGMP.RIP.OSPF.IP(v4,v6)等. 04,链路层(Link):又称为物

  • 带着问题读CLR via C#(笔记一)CLR的执行模型

    Q1: 什么是CLR? A1: CLR (Common Language Runtime) 是一个可以由多种编程语言使用的"运行时". Q2: CLR的核心功能有哪些? A2: 1)内存管理:2)程序集加载:3)安全性:4)异常处理:5)线程同步 Q3: CLR与使用的编程语言有关吗? A3: 无关.只要编译器是面向CLR的就行. Q4: 选用不同编程语言经过面向CLR的编译器编译后生成的结果相同吗? A4: 相同.无论选择什么语言,相应的编译器变异的结果都是一个托管模块,即一个标准的

  • C#获取Visio模型信息的简单方法示例

    前言 Office Visio 是绘制各种类型图表的一个很好的工具,可以绘制业务流程的流程图.网络图.工作流图.数据库模型图.软件图,以及家居设计图等等,可用于可视化和简化业务流程.跟踪项目和资源.绘制组织结构图.映射网络.绘制建筑地图以及优化系统. 在最近的一个项目,要求导出Visio图纸,因为是建筑类的,所以,需要设置墙壁,门,房间等信息的参数. 拿墙壁为例,选中墙壁模型,右键属性,会弹出以下对话框. 需要设置墙长.墙壁厚度等一些列信息. 现在C#操作Visio里例子比较少,所以,花了好久,

  • C# 基于消息发布订阅模型的示例(下)

    一 背景 在上面的一篇文章中我们介绍了一个完整地基于消息发布和订阅的模型,这篇文章我将介绍一种简单的基于消息的发布和订阅模型,在这个模型中我们将通过构建一个Publisher类来完成对特定的事件和事件订阅进行封装,这个是一个更加轻量级别的方式,使用这个的主要目的是降低类之间彼此的耦合程度,从而方便代码的扩展和访问,最终使代码结构更加合理. 我们首先来看看具体的Publisher类的构成,后面我们将会对这个类做一个详细的讲解和分析. using System; using System.Colle

  • C# 基于消息发布订阅模型的示例(上)

    在我们的开发过程中,我们经常会遇到这样的场景就是一个对象的其中的一些状态依赖于另外的一个对象的状态,而且这两个对象之间彼此是没有关联的,及两者之间的耦合性非常低,特别是在这种基于容器模型的开发中遇到的会非常多,比如Prism框架或者MEF这种框架中,而我们会发现在这样的系统中我们经常使用一种Publish和Subscribe的模式来进行交互,这种交互有什么好处呢?基于带着这些问题的思考,我们来一步步来剖析! 首先第一步就是定义一个叫做IEventAggregator的接口,里面定义了一些重载的S

  • springboot基于Redis发布订阅集群下WebSocket的解决方案

    一.背景 单机节点下,WebSocket连接成功后,可以直接发送消息.而多节点下,连接时通过nginx会代理到不同节点. 假设一开始用户连接了node1的socket服务.触发消息发送的条件的时候也通过nginx进行代理,假如代理转到了node2节点上,那么node2节点的socket服务就发送不了消息,因为一开始用户注册的是node1节点.这就导致了消息发送失败. 为了解决这一方案,消息发送时,就需要一个中间件来记录,这样,三个节点都可以获取消息,然后在根据条件进行消息推送. 二.解决方案(s

  • Vue组件之事件总线和消息发布订阅详解

    目录 简介 事件总线 消息的发布订阅 总结 简介 主要介绍事件总线的定义和编写方法和Vue是如何实现消息的订阅与发布的. 事件总线 事件总线是组件间通信的一种方式,适用于任意组件间的通信,比如毫不相干的两个组件.父子组件间.后代组件等等,都能通信. 事件总线有两个特性: 是一个vue组件实例或者一个vue实例,充当一个消息中转站,如果A.B组件想要通信,那么A组件存消息到中转站,B消息拿,或者反过来. 所有组件都要能获取到事件总线. 如果A.B组件间通信,如果A发送数据给B的情况下,需要以下步骤

  • C++基于消息队列的多线程实现示例代码

    前言 实现消息队列的关键因素是考量不同线程访问消息队列的同步问题.本实现涉及到几个知识点 std::lock_guard 介绍 std::lock_gurad 是 C++11 中定义的模板类.定义如下: template <class Mutex> class lock_guard; lock_guard 对象通常用于管理某个锁(Lock)对象,因此与 Mutex RAII 相关,方便线程对互斥量上锁,即在某个 lock_guard 对象的声明周期内,它所管理的锁对象会一直保持上锁状态:而 l

  • SpringBoot整合Redis实现消息发布与订阅的示例代码

    当我们在多个集群应用中使用到本地缓存时,在数据库数据得到更新后,为保持各个副本当前被修改的数据与数据库数据保持同步,在数据被操作后向其他集群应用发出被更新数据的通知,使其删除;下次当其他应用请求该被更新的数据时,应用会到数据库去取,也就是最新的数据,从而使得被更新数据与数据库保持同步! 能实现发送与接收信息的中间介有很多,比如:RocketMQ.RabbitMQ.ActiveMQ.Kafka等,本次主要简单介绍Redis的推送与订阅功能并集成Spring Boot的实现. 1.添加SpringB

  • kafka 消息队列中点对点与发布订阅的区别说明

    目录 背景知识 1.JMS中定义 2.二者分析与区别 2.1 点对点模式 2.2 发布订阅模式 3.流行的消息队列模型比较 3.1 RabbitMQ 3.2 Kafka 背景知识 JMS一个在 Java标准化组织(JCP)内开发的标准(代号JSR 914).2001年6月25日,Java消息服务发布JMS 1.0.2b,2002年3月18日Java消息服务发布 1.1. Java消息服务(Java Message Service,JMS)应用程序接口是一个Java平台中关于面向消息中间件(MOM

  • Redis实现消息的发布订阅原理分析

    目录 一.什么是发布和订阅 二.Redis的发布和订阅 三.redis 发布订阅常用命令 四.命令实战 1.基本使用 2.订阅符合要求的频道 3.查看活跃频道 五.发布订阅原理 1.订阅频道原理 2.发布信息原理 3.退订信息原理 一.什么是发布和订阅 Redis 发布订阅 (pub/sub) 是一种消息通信模式:发送者 (pub) 发送消息,订阅者 (sub) 接收消息. 特点:Redis 客户端可以订阅任意数量的频道. 这就好比粉丝们关注了我,当我写完文章发布的时候,你们打开CSDN也会接收

  • JS前端设计模式之发布订阅模式详解

    目录 引言 例子1: version1: version2: 总结 引言 昨天我发布了一篇关于策略模式和代理模式的文章,收到的反响还不错,于是今天我们继续来学习前端中常用的设计模式之一:发布-订阅模式. 说到发布订阅模式大家应该都不陌生,它在我们的日常学习和工作中出现的频率简直不要太高,常见的有EventBus.框架里的组件间通信.鉴权业务等等......话不多说,让我们一起进入今天的学习把!!! 发布-订阅模式又叫观察者模式,它定义对象间的一种一对多的依赖关系 当一个对象的状态发生改变时,所有

  • ASP.NET Core实现单体程序的事件发布/订阅详解

    背景 事件发布/订阅是一种非常强大的模式,它可以帮助业务组件间实现完全解耦,不同的业务组件只依赖事件,只关注哪些事件是需要自己处理的,而不用关注谁来处理自己发布事件,事件追溯(Event Sourcing)也是基于事件发布/订阅的.在微服务架构中,事件发布/订阅有非常多的应用场景.今天我给大家分享一个基于ASP.NET Core的单体程序使用事件发布/订阅的例子,针对分布式项目的事件发布/订阅比较复杂,难点是事务处理,后续我会另写一篇博文来演示. 案例说明 当前我们有一个基于ASP.NET Co

随机推荐