一个进程间通讯同步的C#框架引荐

 0.背景简介

微软在 .NET 框架中提供了多种实用的线程同步手段,其中包括 monitor 类及 reader-writer锁。但跨进程的同步方法还是非常欠缺。另外,目前也没有方便的线程间及进程间传递消息的方法。例如C/S和SOA,又或者生产者/消费者模式中就常常需要传递消息。为此我编写了一个独立完整的框架,实现了跨线程和跨进程的同步和通讯。这框架内包含了信号量,信箱,内存映射文件,阻塞通道,及简单消息流控制器等组件。这篇文章里提到的类同属于一个开源的库项目(BSD许可),你可以从这里下载到 www.cdrnet.net/projects/threadmsg/.

这个框架的目的是:

  • 封装性:通过MSMQ消息队列发送消息的线程无需关心消息是发送到另一个线程还是另一台机器。
  • 简单性:向其他进程发送消息只需调用一个方法。

注意:我删除了本文中全部代码的XML注释以节省空间。如果你想知道这些方法和参数的详细信息,请参考附件中的代码。

1.先看一个简单例子

使用了这个库后,跨进程的消息传递将变得非常简单。我将用一个小例子来作示范:一个控制台程序,根据参数可以作为发送方也可以作为接收方运行。在发送程序里,你可以输入一定的文本并发送到信箱内(返回key),接收程序将显示所有从信箱内收到的消息。你可以运行无数个发送程序和接收程序,但是每个消息只会被具体的某一个接收程序所收到。

[Serializable]
struct Message
{
 public string Text;
}

class Test
{
 IMailBox mail;

 public Test()
 {
  mail = new ProcessMailBox("TMProcessTest",1024);
 }

 public void RunWriter()
 {
  Console.WriteLine("Writer started");
  Message msg;
  while(true)
  {
   msg.Text = Console.ReadLine();
   if(msg.Text.Equals("exit"))
    break;
   mail.Content = msg;
  }
 }

 public void RunReader()
 {
  Console.WriteLine("Reader started");
  while(true)
  {
   Message msg = (Message)mail.Content;
   Console.WriteLine(msg.Text);
  }
 }

 [STAThread]
 static void Main(string[] args)
 {
  Test test = new Test();
  if(args.Length > 0)
   test.RunWriter();
  else
   test.RunReader();
 }
}

信箱一旦创建之后(这上面代码里是 ProcessMailBox ),接收消息只需要读取 Content 属性,发送消息只需要给这个属性赋值。当没有数据时,获取消息将会阻塞当前线程;发送消息时如果信箱里已经有数据,则会阻塞当前线程。正是有了这个阻塞,整个程序是完全基于中断的,并且不会过度占用CPU(不需要进行轮询)。发送和接收的消息可以是任意支持序列化(Serializable)的类型。

然而,实际上暗地里发生的事情有点复杂:消息通过内存映射文件来传递,这是目前唯一的跨进程共享内存的方法,这个例子里我们只会在 pagefile 里面产生虚拟文件。对这个虚拟文件的访问是通过 win32 信号量来确保同步的。消息首先序列化成二进制,然后再写进该文件,这就是为什么需要声明Serializable属性。内存映射文件和 win32 信号量都需要调用 NT内核的方法。多得了 .NET 框架中的 Marshal 类,我们可以避免编写不安全的代码。我们将在下面讨论更多的细节。

2. .NET里面的跨线程/进程同步

线程/进程间的通讯需要共享内存或者其他内建机制来发送/接收数据。即使是采用共享内存的方式,也还需要一组同步方法来允许并发访问。

同一个进程内的所有线程都共享公共的逻辑地址空间(堆)。对于不同进程,从 win2000 开始就已经无法共享内存。然而,不同的进程可以读写同一个文件。WinAPI提供了多种系统调用方法来映射文件到进程的逻辑空间,及访问系统内核对象(会话)指向的 pagefile 里面的虚拟文件。无论是共享堆,还是共享文件,并发访问都有可能导致数据不一致。我们就这个问题简单讨论一下,该怎样确保线程/进程调用的有序性及数据的一致性。

2.1 线程同步

.NET 框架和 C# 提供了方便直观的线程同步方法,即 monitor 类和 lock 语句(本文将不会讨论 .NET 框架的互斥量)。对于线程同步,虽然本文提供了其他方法,我们还是推荐使用 lock 语句。

void Work1()
{
 NonCriticalSection1();
 Monitor.Enter(this);
 try
 {
  CriticalSection();
 }
 finally
 {
  Monitor.Exit(this);
 }
 NonCriticalSection2();
}

void Work2()
{
 NonCriticalSection1();
 lock(this)
 {
  CriticalSection();
 }
 NonCriticalSection2();
}

Work1 和 Work2 是等价的。在C#里面,很多人喜欢第二个方法,因为它更短,且不容易出错。

2.2 跨线程信号量

信号量是经典的同步基本概念之一(由 Edsger Dijkstra 引入)。信号量是指一个有计数器及两个操作的对象。它的两个操作是:获取(也叫P或者等待),释放(也叫V或者收到信号)。信号量在获取操作时如果计数器为0则阻塞,否则将计数器减一;在释放时将计数器加一,且不会阻塞。虽然信号量的原理很简单,但是实现起来有点麻烦。好在,内建的 monitor 类有阻塞特性,可以用来实现信号量。

public sealed class ThreadSemaphore : ISemaphore
{
 private int counter;
 private readonly int max;

 public ThreadSemaphore() : this(0, int.Max) {}
 public ThreadSemaphore(int initial) : this(initial, int.Max) {}
 public ThreadSemaphore(int initial, int max)
 {
  this.counter = Math.Min(initial,max);
  this.max = max;
 }

 public void Acquire()
 {
  lock(this)
  {
   counter--;
   if(counter < 0 && !Monitor.Wait(this))
    throw new SemaphoreFailedException();
  }
 }

 public void Acquire(TimeSpan timeout)
 {
  lock(this)
  {
   counter--;
   if(counter < 0 && !Monitor.Wait(this,timeout))
    throw new SemaphoreFailedException();
  }
 }

 public void Release()
 {
  lock(this)
  {
   if(counter >= max)
    throw new SemaphoreFailedException();
   if(counter < 0)
    Monitor.Pulse(this);
   counter++;
  }
 }
}

信号量在复杂的阻塞情景下更加有用,例如我们后面将要讨论的通道(channel)。你也可以使用信号量来实现临界区的排他性(如下面的 Work3),但是我还是推荐使用内建的 lock 语句,像上面的 Work2 那样。

请注意:如果使用不当,信号量也是有潜在危险的。正确的做法是:当获取信号量失败时,千万不要再调用释放操作;当获取成功时,无论发生了什么错误,都要记得释放信号量。遵循这样的原则,你的同步才是正确的。Work3 中的 finally 语句就是为了保证正确释放信号量。注意:获取信号量( s.Acquire() )的操作必须放到 try 语句的外面,只有这样,当获取失败时才不会调用释放操作。

ThreadSemaphore s = new ThreadSemaphore(1);
void Work3()
{
 NonCriticalSection1();
 s.Acquire();
 try
 {
  CriticalSection();
 }
 finally
 {
  s.Release();
 }
 NonCriticalSection2();
}

2.3 跨进程信号量

为了协调不同进程访问同一资源,我们需要用到上面讨论过的概念。很不幸,.NET 中的 monitor 类不可以跨进程使用。但是,win32 API提供的内核信号量对象可以用来实现跨进程同步。 Robin Galloway-Lunn 介绍了怎样将 win32 的信号量映射到 .NET 中(见 Using Win32 Semaphores in C# )。我们的实现也类似:

[DllImport("kernel32",EntryPoint="CreateSemaphore",
   SetLastError=true,CharSet=CharSet.Unicode)]
internal static extern uint CreateSemaphore(
 SecurityAttributes auth, int initialCount,
  int maximumCount, string name);

[DllImport("kernel32",EntryPoint="WaitForSingleObject",
 SetLastError=true,CharSet=CharSet.Unicode)]
internal static extern uint WaitForSingleObject(
 uint hHandle, uint dwMilliseconds);

[DllImport("kernel32",EntryPoint="ReleaseSemaphore",
 SetLastError=true,CharSet=CharSet.Unicode)]
[return : MarshalAs( UnmanagedType.VariantBool )]
internal static extern bool ReleaseSemaphore(
 uint hHandle, int lReleaseCount, out int lpPreviousCount);

[DllImport("kernel32",EntryPoint="CloseHandle",SetLastError=true,
 CharSet=CharSet.Unicode)]
[return : MarshalAs( UnmanagedType.VariantBool )]
internal static extern bool CloseHandle(uint hHandle);

public class ProcessSemaphore : ISemaphore, IDisposable
{
 private uint handle;
 private readonly uint interruptReactionTime;

 public ProcessSemaphore(string name) : this(
  name,0,int.MaxValue,500) {}
 public ProcessSemaphore(string name, int initial) : this(
  name,initial,int.MaxValue,500) {}
 public ProcessSemaphore(string name, int initial,
  int max, int interruptReactionTime)
 {
  this.interruptReactionTime = (uint)interruptReactionTime;
  this.handle = NTKernel.CreateSemaphore(null, initial, max, name);
  if(handle == 0)
   throw new SemaphoreFailedException();
 }

 public void Acquire()
 {
  while(true)
  { //looped 0.5s timeout to make NT-blocked threads interruptable.
   uint res = NTKernel.WaitForSingleObject(handle,
    interruptReactionTime);
   try {System.Threading.Thread.Sleep(0);}
   catch(System.Threading.ThreadInterruptedException e)
   {
    if(res == 0)
    { //Rollback
     int previousCount;
     NTKernel.ReleaseSemaphore(handle,1,out previousCount);
    }
    throw e;
   }
   if(res == 0)
    return;
   if(res != 258)
    throw new SemaphoreFailedException();
  }
 }

 public void Acquire(TimeSpan timeout)
 {
  uint milliseconds = (uint)timeout.TotalMilliseconds;
  if(NTKernel.WaitForSingleObject(handle, milliseconds) != 0)
   throw new SemaphoreFailedException();
 }

 public void Release()
 {
  int previousCount;
  if(!NTKernel.ReleaseSemaphore(handle, 1, out previousCount))
   throw new SemaphoreFailedException();
 }

 #region IDisposable Member
 public void Dispose()
 {
  if(handle != 0)
  {
   if(NTKernel.CloseHandle(handle))
    handle = 0;
  }
 }
 #endregion
}

有一点很重要:win32中的信号量是可以命名的。这允许其他进程通过名字来创建相应信号量的句柄。为了让阻塞线程可以中断,我们使用了一个(不好)的替代方法:使用超时和 Sleep(0)。我们需要中断来安全关闭线程。更好的做法是:确定没有线程阻塞之后才释放信号量,这样程序才可以完全释放资源并正确退出。

你可能也注意到了:跨线程和跨进程的信号量都使用了相同的接口。所有相关的类都使用了这种模式,以实现上面背景介绍中提到的封闭性。需要注意:出于性能考虑,你不应该将跨进程的信号量用到跨线程的场景,也不应该将跨线程的实现用到单线程的场景。

3. 跨进程共享内存:内存映射文件

我们已经实现了跨线程和跨进程的共享资源访问同步。但是传递/接收消息还需要共享资源。对于线程来说,只需要声明一个类成员变量就可以了。但是对于跨进程来说,我们需要使用到 win32 API 提供的内存映射文件(Memory Mapped Files,简称MMF)。使用 MMF和使用 win32 信号量差不多。我们需要先调用 CreateFileMapping 方法来创建一个内存映射文件的句柄:

[DllImport("Kernel32.dll",EntryPoint="CreateFileMapping",
   SetLastError=true,CharSet=CharSet.Unicode)]
internal static extern IntPtr CreateFileMapping(uint hFile,
 SecurityAttributes lpAttributes, uint flProtect,
 uint dwMaximumSizeHigh, uint dwMaximumSizeLow, string lpName);

[DllImport("Kernel32.dll",EntryPoint="MapViewOfFile",
 SetLastError=true,CharSet=CharSet.Unicode)]
internal static extern IntPtr MapViewOfFile(IntPtr hFileMappingObject,
 uint dwDesiredAccess, uint dwFileOffsetHigh,
 uint dwFileOffsetLow, uint dwNumberOfBytesToMap);

[DllImport("Kernel32.dll",EntryPoint="UnmapViewOfFile",
 SetLastError=true,CharSet=CharSet.Unicode)]
[return : MarshalAs( UnmanagedType.VariantBool )]
internal static extern bool UnmapViewOfFile(IntPtr lpBaseAddress);

public static MemoryMappedFile CreateFile(string name,
   FileAccess access, int size)
{
 if(size < 0)
  throw new ArgumentException("Size must not be negative","size");

 IntPtr fileMapping = NTKernel.CreateFileMapping(0xFFFFFFFFu,null,
  (uint)access,0,(uint)size,name);
 if(fileMapping == IntPtr.Zero)
  throw new MemoryMappingFailedException();

 return new MemoryMappedFile(fileMapping,size,access);
}

我们希望直接使用 pagefile 中的虚拟文件,所以我们用 -1(0xFFFFFFFF) 来作为文件句柄来创建我们的内存映射文件句柄。我们也指定了必填的文件大小,以及相应的名称。这样其他进程就可以通过这个名称来同时访问该映射文件。创建了内存映射文件后,我们就可以映射这个文件不同的部分(通过偏移量和字节大小来指定)到我们的进程地址空间。我们通过 MapViewOfFile 系统方法来指定:

public MemoryMappedFileView CreateView(int offset, int size,
   MemoryMappedFileView.ViewAccess access)
{
 if(this.access == FileAccess.ReadOnly && access ==
  MemoryMappedFileView.ViewAccess.ReadWrite)
  throw new ArgumentException(
   "Only read access to views allowed on files without write access",
   "access");
 if(offset < 0)
  throw new ArgumentException("Offset must not be negative","size");
 if(size < 0)
  throw new ArgumentException("Size must not be negative","size");
 IntPtr mappedView = NTKernel.MapViewOfFile(fileMapping,
  (uint)access,0,(uint)offset,(uint)size);
 return new MemoryMappedFileView(mappedView,size,access);
}

在不安全的代码中,我们可以将返回的指针强制转换成我们指定的类型。尽管如此,我们不希望有不安全的代码存在,所以我们使用 Marshal 类来从中读写我们的数据。偏移量参数是用来从哪里开始读写数据,相对于指定的映射视图的地址。

public byte ReadByte(int offset)
{
 return Marshal.ReadByte(mappedView,offset);
}
public void WriteByte(byte data, int offset)
{
 Marshal.WriteByte(mappedView,offset,data);
}

public int ReadInt32(int offset)
{
 return Marshal.ReadInt32(mappedView,offset);
}
public void WriteInt32(int data, int offset)
{
 Marshal.WriteInt32(mappedView,offset,data);
}

public void ReadBytes(byte[] data, int offset)
{
 for(int i=0;i<data.Length;i++)
  data[i] = Marshal.ReadByte(mappedView,offset+i);
}
public void WriteBytes(byte[] data, int offset)
{
 for(int i=0;i<data.Length;i++)
  Marshal.WriteByte(mappedView,offset+i,data[i]);
}

但是,我们希望读写整个对象树到文件中,所以我们需要支持自动进行序列化和反序列化的方法。

public object ReadDeserialize(int offset, int length)
{
 byte[] binaryData = new byte[length];
 ReadBytes(binaryData,offset);
 System.Runtime.Serialization.Formatters.Binary.BinaryFormatter formatter
  = new System.Runtime.Serialization.Formatters.Binary.BinaryFormatter();
 System.IO.MemoryStream ms = new System.IO.MemoryStream(
  binaryData,0,length,true,true);
 object data = formatter.Deserialize(ms);
 ms.Close();
 return data;
}
public void WriteSerialize(object data, int offset, int length)
{
 System.Runtime.Serialization.Formatters.Binary.BinaryFormatter formatter
  = new System.Runtime.Serialization.Formatters.Binary.BinaryFormatter();
 byte[] binaryData = new byte[length];
 System.IO.MemoryStream ms = new System.IO.MemoryStream(
  binaryData,0,length,true,true);
 formatter.Serialize(ms,data);
 ms.Flush();
 ms.Close();
 WriteBytes(binaryData,offset);
}

请注意:对象序列化之后的大小不应该超过映射视图的大小。序列化之后的大小总是比对象本身占用的内存要大的。我没有试过直接将对象内存流绑定到映射视图,那样做应该也可以,甚至可能带来少量的性能提升。

4. 信箱:在线程/进程间传递消息

这里的信箱与 Email 及 NT 中的邮件槽(Mailslots)无关。它是一个只能保留一个对象的安全共享内存结构。信箱的内容通过一个属性来读写。如果信箱内容为空,试图读取该信箱的线程将会阻塞,直到另一个线程往其中写内容。如果信箱已经有了内容,当一个线程试图往其中写内容时将被阻塞,直到另一个线程将信箱内容读取出去。信箱的内容只能被读取一次,它的引用在读取后自动被删除。基于上面的代码,我们已经可以实现信箱了。
4.1 跨线程的信箱

我们可以使用两个信号量来实现一个信箱:一个信号量在信箱内容为空时触发,另一个在信箱有内容时触发。在读取内容之前,线程先等待信箱已经填充了内容,读取之后触发空信号量。在写入内容之前,线程先等待信箱内容清空,写入之后触发满信号量。注意:空信号量在一开始时就被触发了。

public sealed class ThreadMailBox : IMailBox
{
 private object content;
 private ThreadSemaphore empty, full;

 public ThreadMailBox()
 {
  empty = new ThreadSemaphore(1,1);
  full = new ThreadSemaphore(0,1);
 }

 public object Content
 {
  get
  {
   full.Acquire();
   object item = content;
   empty.Release();
   return item;
  }
  set
  {
   empty.Acquire();
   content = value;
   full.Release();
  }
 }
}

4.2  跨进程信箱

跨进程信箱与跨线程信箱的实现基本上一样简单。不同的是我们使用两个跨进程的信号量,并且我们使用内存映射文件来代替类成员变量。由于序列化可能会失败,我们使用了一小段异常处理来回滚信箱的状态。失败的原因有很多(无效句柄,拒绝访问,文件大小问题,Serializable属性缺失等等)。

public sealed class ProcessMailBox : IMailBox, IDisposable
{
 private MemoryMappedFile file;
 private MemoryMappedFileView view;
 private ProcessSemaphore empty, full;

 public ProcessMailBox(string name,int size)
 {
  empty = new ProcessSemaphore(name+".EmptySemaphore.MailBox",1,1);
  full = new ProcessSemaphore(name+".FullSemaphore.MailBox",0,1);
  file = MemoryMappedFile.CreateFile(name+".MemoryMappedFile.MailBox",
   MemoryMappedFile.FileAccess.ReadWrite,size);
  view = file.CreateView(0,size,
   MemoryMappedFileView.ViewAccess.ReadWrite);
 }

 public object Content
 {
  get
  {
   full.Acquire();
   object item;
   try {item = view.ReadDeserialize();}
   catch(Exception e)
   { //Rollback
    full.Release();
    throw e;
   }
   empty.Release();
   return item;
  }

  set
  {
   empty.Acquire();
   try {view.WriteSerialize(value);}
   catch(Exception e)
   { //Rollback
    empty.Release();
    throw e;
   }
   full.Release();
  }
 }

 #region IDisposable Member
 public void Dispose()
 {
  view.Dispose();
  file.Dispose();
  empty.Dispose();
  full.Dispose();
 }
 #endregion
}

到这里我们已经实现了跨进程消息传递(IPC)所需要的组件。你可能需要再回头本文开头的那个例子,看看 ProcessMailBox 应该如何使用。

5.通道:基于队列的消息传递

信箱最大的限制是它们每次只能保存一个对象。如果一系列线程(使用同一个信箱)中的一个线程需要比较长的时间来处理特定的命令,那么整个系列都会阻塞。通常我们会使用缓冲的消息通道来处理,这样你可以在方便的时候从中读取消息,而不会阻塞消息发送者。这种缓冲通过通道来实现,这里的通道比信箱要复杂一些。同样,我们将分别从线程和进程级别来讨论通道的实现。
5.1 可靠性

信箱和通道的另一个重要的不同是:通道拥有可靠性。例如:自动将发送失败(可能由于线程等待锁的过程中被中断)的消息转存到一个内置的容器中。这意味着处理通道的线程可以安全地停止,同时不会丢失队列中的消息。这通过两个抽象类来实现, ThreadReliability 和 ProcessReliability。每个通道的实现类都继承其中的一个类。
5.2 跨线程的通道

跨线程的通道基于信箱来实现,但是使用一个同步的队列来作为消息缓冲而不是一个变量。得益于信号量,通道在空队列时阻塞接收线程,在队列满时阻塞发送线程。这样你就不会碰到由入队/出队引发的错误。为了实现这个效果,我们用队列大小来初始化空信号量,用0来初始化满信号量。如果某个发送线程在等待入队的时候被中断,我们将消息复制到内置容器中,并将异常往外面抛。在接收操作中,我们不需要做异常处理,因为即使线程被中断你也不会丢失任何消息。注意:线程只有在阻塞状态才能被中断,就像调用信号量的获取操作(Aquire)方法时。

public sealed class ThreadChannel : ThreadReliability, IChannel
{
 private Queue queue;
 private ThreadSemaphore empty, full;

 public ThreadChannel(int size)
 {
  queue = Queue.Synchronized(new Queue(size));
  empty = new ThreadSemaphore(size,size);
  full = new ThreadSemaphore(0,size);
 }

 public void Send(object item)
 {
  try {empty.Acquire();}
  catch(System.Threading.ThreadInterruptedException e)
  {
   DumpItem(item);
   throw e;
  }
  queue.Enqueue(item);
  full.Release();
 }

 public void Send(object item, TimeSpan timeout)
 {
  try {empty.Acquire(timeout);}
  ...
 }

 public object Receive()
 {
  full.Acquire();
  object item = queue.Dequeue();
  empty.Release();
  return item;
 }

 public object Receive(TimeSpan timeout)
 {
  full.Acquire(timeout);
  ...
 }

 protected override void DumpStructure()
 {
  lock(queue.SyncRoot)
  {
   foreach(object item in queue)
    DumpItem(item);
   queue.Clear();
  }
 }
}

5.3 跨进程通道

实现跨进程通道有点麻烦,因为你需要首先提供一个跨进程的缓冲区。一个可能的解决方法是使用跨进程信箱并根据需要将接收/发送方法加入队列。为了避免这种方案的几个缺点,我们将直接使用内存映射文件来实现一个队列。MemoryMappedArray 类将内存映射文件分成几部分,可以直接使用数组索引来访问。 MemoryMappedQueue 类,为这个数组提供了一个经典的环(更多细节请查看附件中的代码)。为了支持直接以 byte/integer 类型访问数据并同时支持二进制序列化,调用方需要先调用入队(Enqueue)/出队(Dequeue)操作,然后根据需要使用读写方法(队列会自动将数据放到正确的位置)。这两个类都不是线程和进程安全的,所以我们需要使用跨进程的信号量来模拟互斥量(也可以使用 win32 互斥量),以此实现相互间的互斥访问。除了这两个类,跨进程的通道基本上和跨线程信箱一样。同样,我们也需要在 Send() 中处理线程中断及序列化可能失败的问题。

public sealed class ProcessChannel : ProcessReliability, IChannel, IDisposable
{
 private MemoryMappedFile file;
 private MemoryMappedFileView view;
 private MemoryMappedQueue queue;
 private ProcessSemaphore empty, full, mutex;

 public ProcessChannel( int size, string name, int maxBytesPerEntry)
 {
  int fileSize = 64+size*maxBytesPerEntry;

  empty = new ProcessSemaphore(name+".EmptySemaphore.Channel",size,size);
  full = new ProcessSemaphore(name+".FullSemaphore.Channel",0,size);
  mutex = new ProcessSemaphore(name+".MutexSemaphore.Channel",1,1);
  file = MemoryMappedFile.CreateFile(name+".MemoryMappedFile.Channel",
   MemoryMappedFile.FileAccess.ReadWrite,fileSize);
  view = file.CreateView(0,fileSize,
   MemoryMappedFileView.ViewAccess.ReadWrite);
  queue = new MemoryMappedQueue(view,size,maxBytesPerEntry,true,0);
  if(queue.Length < size || queue.BytesPerEntry < maxBytesPerEntry)
   throw new MemoryMappedArrayFailedException();
 }

 public void Send(object item)
 {
  try {empty.Acquire();}
  catch(System.Threading.ThreadInterruptedException e)
  {
   DumpItemSynchronized(item);
   throw e;
  }
  try {mutex.Acquire();}
  catch(System.Threading.ThreadInterruptedException e)
  {
   DumpItemSynchronized(item);
   empty.Release();
   throw e;
  }
  queue.Enqueue();
  try {queue.WriteSerialize(item,0);}
  catch(Exception e)
  {
   queue.RollbackEnqueue();
   mutex.Release();
   empty.Release();
   throw e;
  }
  mutex.Release();
  full.Release();
 }

 public void Send(object item, TimeSpan timeout)
 {
  try {empty.Acquire(timeout);}
  ...
 }

 public object Receive()
 {
  full.Acquire();
  mutex.Acquire();
  object item;
  queue.Dequeue();
  try {item = queue.ReadDeserialize(0);}
  catch(Exception e)
  {
   queue.RollbackDequeue();
   mutex.Release();
   full.Release();
   throw e;
  }
  mutex.Release();
  empty.Release();
  return item;
 }

 public object Receive(TimeSpan timeout)
 {
  full.Acquire(timeout);
  ...
 }

 protected override void DumpStructure()
 {
  mutex.Acquire();
  byte[][] dmp = queue.DumpClearAll();
  for(int i=0;i<dmp.Length;i++)
   DumpItemSynchronized(dmp[i]);
  mutex.Release();
 }

 #region IDisposable Member
 public void Dispose()
 {
  view.Dispose();
  file.Dispose();
  empty.Dispose();
  full.Dispose();
  mutex.Dispose();
 }
 #endregion
}

6. 消息路由

我们目前已经实现了线程和进程同步及消息传递机制(使用信箱和通道)。当你使用阻塞队列的时候,有可能会遇到这样的问题:你需要在一个线程中同时监听多个队列。为了解决这样的问题,我们提供了一些小型的类:通道转发器,多用复用器,多路复用解码器和通道事件网关。你也可以通过简单的 IRunnable 模式来实现类似的通道处理器。IRunnable模式由两个抽象类SingleRunnable和 MultiRunnable 来提供(具体细节请参考附件中的代码)。
6.1 通道转发器

通道转发器仅仅监听一个通道,然后将收到的消息转发到另一个通道。如果有必要,转发器可以将每个收到的消息放到一个信封中,并加上一个数字标记,然后再转发出去(下面的多路利用器使用了这个特性)。

public class ChannelForwarder : SingleRunnable
{
 private IChannel source, target;
 private readonly int envelope;

 public ChannelForwarder(IChannel source,
  IChannel target, bool autoStart, bool waitOnStop)
  : base(true,autoStart,waitOnStop)
 {
  this.source = source;
  this.target = target;
  this.envelope = -1;
 }
 public ChannelForwarder(IChannel source, IChannel target,
  int envelope, bool autoStart, bool waitOnStop)
  : base(true,autoStart,waitOnStop)
 {
  this.source = source;
  this.target = target;
  this.envelope = envelope;
 }

 protected override void Run()
 { //NOTE: IChannel.Send is interrupt save and
   //automatically dumps the argument.
  if(envelope == -1)
   while(running)
    target.Send(source.Receive());
  else
  {
   MessageEnvelope env;
   env.ID = envelope;
   while(running)
   {
    env.Message = source.Receive();
    target.Send(env);
   }
  }
 }
}

6.2 通道多路复用器和通道复用解码器

通道多路复用器监听多个来源的通道并将接收到的消息(消息使用信封来标记来源消息)转发到一个公共的输出通道。这样就可以一次性地监听多个通道。复用解码器则是监听一个公共的输出通道,然后根据信封将消息转发到某个指定的输出通道。

public class ChannelMultiplexer : MultiRunnable
{
 private ChannelForwarder[] forwarders;

 public ChannelMultiplexer(IChannel[] channels, int[] ids,
  IChannel output, bool autoStart, bool waitOnStop)
 {
  int count = channels.Length;
  if(count != ids.Length)
   throw new ArgumentException("Channel and ID count mismatch.","ids");

  forwarders = new ChannelForwarder[count];
  for(int i=0;i<count;i++)
   forwarders[i] = new ChannelForwarder(channels[i],
    output,ids[i],autoStart,waitOnStop);

  SetRunnables((SingleRunnable[])forwarders);
 }
}

public class ChannelDemultiplexer : SingleRunnable
{
 private HybridDictionary dictionary;
 private IChannel input;

 public ChannelDemultiplexer(IChannel[] channels, int[] ids,
  IChannel input, bool autoStart, bool waitOnStop)
  : base(true,autoStart,waitOnStop)
 {
  this.input = input;

  int count = channels.Length;
  if(count != ids.Length)
   throw new ArgumentException("Channel and ID count mismatch.","ids");

  dictionary = new HybridDictionary(count,true);
  for(int i=0;i<count;i++)
   dictionary.add(ids[i],channels[i]);
 }

 protected override void Run()
 { //NOTE: IChannel.Send is interrupt save and
   //automatically dumps the argument.
  while(running)
  {
   MessageEnvelope env = (MessageEnvelope)input.Receive();
   IChannel channel = (IChannel)dictionary[env.ID];
   channel.send(env.Message);
  }
 }
}

6.3 通道事件网关

通道事件网关监听指定的通道,在接收到消息时触发一个事件。这个类对于基于事件的程序(例如GUI程序)很有用,或者在使用系统线程池(ThreadPool)来初始化轻量的线程。需要注意的是:使用 WinForms 的程序中你不能在事件处理方法中直接访问UI控件,只能调用Invoke 方法。因为事件处理方法是由事件网关线程调用的,而不是UI线程。

public class ChannelEventGateway : SingleRunnable
{
 private IChannel source;
 public event MessageReceivedEventHandler MessageReceived;

 public ChannelEventGateway(IChannel source, bool autoStart,
  bool waitOnStop) : base(true,autoStart,waitOnStop)
 {
  this.source = source;
 }

 protected override void Run()
 {
  while(running)
  {
   object c = source.Receive();
   MessageReceivedEventHandler handler = MessageReceived;
   if(handler != null)
    handler(this,new MessageReceivedEventArgs(c));
  }
 }
}

7. 比萨外卖店的例子

万事俱备,只欠东风。我们已经讨论了这个同步及消息传递框架中的大部分重要的结构和技术(本文没有讨论框架中的其他类如Rendezvous及Barrier)。就像开头一样,我们用一个例子来结束这篇文章。这次我们用一个小型比萨外卖店来做演示。下图展示了这个例子:四个并行进程相互之间进行通讯。图中展示了消息(数据)是如何使用跨进程通道在四个进程中流动的,且在每个进程中使用了性能更佳的跨线程通道和信箱。

一开始,一个顾客点了一个比萨和一些饮料。他调用了顾客(customer)接口的方法,向顾客订单(CustomerOrders)通道发送了一个下单(Order)消息。接单员,在顾客下单后,发送了两条配餐指令(分别对应比萨和饮料)到厨师指令(CookInstruction)通道。同时他通过收银(CashierOrder)通道将订单转发给收银台。收银台从价格中心获取总价并将票据发给顾客,希望能提高收银的速度 。与此同时,厨师将根据配餐指令将餐配好之后交给打包员工。打包员工处理好之后,等待顾客付款,然后将外卖递给顾客。

为了运行这个例子,打开4个终端(cmd.exe),用 "PizzaDemo.exe cook" 启动多个厨师进程(多少个都可以),用 "PizzaDemo.exe backend" 启动后端进程,用 "PizzaDemo.exe facade" 启动顾客接口门面(用你的程序名称来代替 PizzaDemo )。注意:为了模拟真实情景,某些线程(例如厨师线程)会随机休眠几秒。按下回车键就会停止和退出进程。如果你在进程正在处理数据的时候退出,你将可以在内存转存报告的结尾看到几个未处理的消息。在真实世界的程序里面,消息一般都会被转存到磁盘中,以便下次可以使用。

这个例子使用了上文中讨论过的几个机制。比如说,收银台使用一个通道复用器(ChannelMultiplexer)来监听顾客的订单和支付通道,用了两个信箱来实现价格服务。分发时使用了一个通道事件网关(ChannelEventGateway),顾客在食物打包完成之后马上会收到通知。你也可以将这些程序注册成 Windows NT 服务运行,也可以远程登录后运行。

8. 总结

本文已经讨论了C#中如何基于服务的架构及实现跨进程同步和通讯。然后,这个不是唯一的解决方案。例如:在大项目中使用那么多的线程会引来严重的问题。这个框架中缺失的是事务支持及其他的通道/信箱实现(例如命名管道和TCP sockets)。这个框架中可能也有许多不足之处。

(0)

相关推荐

  • C#程序窗体间使用回调事件方式通讯示例

    Form2: 复制代码 代码如下: //定义一个需要string类型参数的委托         publicdelegate void MyDelegate(string text);         public partial class Form2 :Form1         {                //定义该委托的事件             public event MyDelegate MyEvent;             public Form2(string te

  • C#编写ActiveX网页截图控件

    故事背景:Java组的小伙伴需要一个能在IE(还是6...)下截图并返回给网页的功能,但是IE做起来很麻烦(可能根本做不到),于是找到我写一个ActiveX控件实现此功能,想着可能还有其他小伙伴需要这个功能,于是就PO出来,供需要的人使用,当然也可以作为学习C#编写ActiveX的一个简单入门教程(VC++效果更好). 功能截图如下: 代码分为两个核心部分:1.C#屏幕截图:2.C#开发ActivX控件.  1.屏幕截图,这个在网上找到了一个只需要5行代码的实现(超级精简),当然你也可以费点功夫

  • 使用C#开发Socket通讯的方法

    下面的示例显示如何使用 Socket 类向 HTTP 服务器发送数据和接收响应. [C#]  public string DoSocketGet(string server)  {  //Sets up variables and a string to write to the server  Encoding ASCII = Encoding.ASCII;  string Get = "GET / HTTP/1.1\r\nHost: " + server +  "\r\n

  • C#用Activex实现Web客户端读取RFID功能的代码

    由于要在Web项目中采用RFID读取功能,所以有必要开发Activex,一般情况下开发Activex都采用VC,VB等,但对这两块不是很熟悉,所以采用C#编写Activex的方式实现. 本文方法参考网络 1.编写WindowsFromControls 2.发布WindowsFormControls为Activex 3.在web中使用该Activex 首先编写windows控件 如何编写不再详述(注意一个地方,GUID自己用vs工具生成一个,下面会用到.我的0CBD6597-3953-4B88-8

  • ActiveMQ在C#中的应用示例分析

    本文实例讲述了ActiveMQ在C#中的应用.分享给大家供大家参考,具体如下: ActiveMQ是个好东东,不必多说.ActiveMQ提供多种语言支持,如Java, C, C++, C#, Ruby, Perl, Python, PHP等.由于我在windows下开发GUI,比较关心C++和C#,其中C#的ActiveMQ很简单,Apache提供NMS(.Net Messaging Service)支持.Net开发,只需如下几个步骤即能建立简单的实现.C++的应用相对麻烦些,后面会有文章介绍.

  • C#窗体间通讯处理的几种方法总结

    最近做项目遇到导出Excel的问题总结一下:看代码: 复制代码 代码如下: /// <summary>        /// 生成Excel的方法        /// </summary>        /// <param name="ds">DataSet</param>        /// <param name="url">Excel存在服务器的相对地址</param>       

  • 利用thrift实现js与C#通讯的实例代码

    1.为什么要用thrift js C#? 1.1 首先,js 通过 thrift 访问C#,实际上是一种c/s模式.thrift是通信工具,js是客户端,C#是服务端. 1.2 使用js直接与thrift server通信.让web开发变得更简单.如果使用Web Service,你需要自己去实现C/S两端的序列化与反序列化操作,还需要自行处理异常,降低了开发效率.而thrift则会自动生成两端的操作类,你只需要处理方法内部的逻辑即可. 1.3 js直接与thrift server通信,可以提高性

  • C#实现同Active MQ通讯的方法

    本文实例讲述了C#实现同Active MQ通讯的方法.分享给大家供大家参考,具体如下: 内容概要: 主要以源码的形式介绍如何用C#实现同Active MQ 的通讯.本文假设你已经正确安装JDK1.6.x,了解Active MQ并有一定的编程基础. 正文: JMS 程序的最终目的是生产和消费的消息能被其他程序使用,JMS 的 Message 是一个既简单又不乏灵活性的基本格式,允许创建不同平台上符合非JMS 程序格式的消息. Message 由消息头,属性和消息体三部份组成. Active MQ支

  • C#窗体间通讯的几种常用处理方法总结

    在进行C#应用程序开发的过程中,经常需要多窗体之间进行数据通信,本文举几个例子,把几种常用的通信方式总结一下,窗体界面如下图所示: 主窗体Form1是一个ListBox,单击选中某列时,弹出窗体Form2,Form2中两个控件,一个是TextBox,显示选中的该列的文本,另一个是按钮,点击时将修改后的值回传,且在Form1中修改相应的列的文本,同时Form2关闭. 方法一:传值 最先想到的,Form2构造函数中接收一个string类型参数,即Form1中选中行的文本,将Form2的TextBox

  • 一个进程间通讯同步的C#框架引荐

     0.背景简介 微软在 .NET 框架中提供了多种实用的线程同步手段,其中包括 monitor 类及 reader-writer锁.但跨进程的同步方法还是非常欠缺.另外,目前也没有方便的线程间及进程间传递消息的方法.例如C/S和SOA,又或者生产者/消费者模式中就常常需要传递消息.为此我编写了一个独立完整的框架,实现了跨线程和跨进程的同步和通讯.这框架内包含了信号量,信箱,内存映射文件,阻塞通道,及简单消息流控制器等组件.这篇文章里提到的类同属于一个开源的库项目(BSD许可),你可以从这里下载到

  • PHP中实现进程间通讯

    PHP中实现进程间通讯 邱文宇 本文将讨论在PHP4环境下如何使用进程间通讯机制--IPC(Inter-Process-Communication).本文讨论的软件环境是linux+php4.0.4或更高版本.首先,我们假设你已经装好了PHP4和UNIX, 为了使得php4可以使用共享内存和信号量,必须在编译php4程序时激活shmop和sysvsem这两个扩展模块. 实现方法:在PHP设定(configure)时加入如下选项. --enable-shmop --enable-sysvsem  

  • C语言中进程间通讯的方式详解

    目录 一.无名管道 1.1无名管道的原理 1.2功能 1.3无名管道通信特点 1.4无名管道的实例 二.有名管道 2.1有名管道的原理 2.2有名管道的特点 2.3有名管道实例 三.信号 3.1信号的概念 3.2发送信号的函数 3.3常用的信号 3.4实例 四.IPC进程间通信 4.1IPC进程间通信的种类 4.2查看IPC进程间通信的命令 4.3消息队列 4.4共享内存 4.5信号灯集合 一.无名管道 1.1无名管道的原理 无名管道只能用于亲缘间进程的通信,无名管道的大小是64K.无名管道是内

  • Python进程间通讯与进程池超详细讲解

    目录 进程间通讯 队列Queue 管道Pipe 进程池Pool 在<多进程并发与同步>中介绍了进程创建与信息共享,除此之外python还提供了更方便的进程间通讯方式. 进程间通讯 multiprocessing中提供了Pipe(一对一)和Queue(多对多)用于进程间通讯. 队列Queue 队列是一个可用于进程间共享的Queue(内部使用pipe与锁),其接口与普通队列类似: put(obj[, block[, timeout]]):插入数据到队列(默认阻塞,且没有超时时间): 若设定了超时且

  • php进程间通讯实例分析

    本文实例讲述了php进程间通讯的方法.分享给大家供大家参考,具体如下: php单进程单线程处理批量任务太慢了,受不鸟了,但是php不能多线程,最终选择了多进程处理批量任务. php多进程主要使用for进行分裂,然后利用的unix/linux的信号量进行进程间通讯. 本例使用的是:生产者=>消费者=>收集器,的模式. <?php // ===== 全局变量 ===== // ipc进程间通讯 $key = ftok(__FILE__, "a"); $queue = ms

  • Android Studio创建AIDL文件并实现进程间通讯实例

    在Android系统中,跨进程通信是非常普遍的事情,它用到了Binder机制处理进程之间的交互.Binder机制会开放一些接口给Java层,供android开发工程师调用进程之间通信.这些接口android封装到了AIDL文件里,当我们项目用到跨进程通信时可以创建.aidl文件,.aidl文件可以协助我们达到跨进程的通信.下面简单介绍用AndroidStudio创建AIDL文件的过程. a.新建AIDL文件 1.项目文件夹右键---> new --->选择AIDL 2.自定义一个接口名称 3.

  • Python使用文件锁实现进程间同步功能【基于fcntl模块】

    本文实例讲述了Python使用文件锁实现进程间同步功能.分享给大家供大家参考,具体如下: 简介 在实际应用中,会出现这种应用场景:希望shell下执行的脚本对某些竞争资源提供保护,避免出现冲突.本文将通过fcntl模块的文件整体上锁机制来实现这种进程间同步功能. fcntl系统函数介绍 Linux系统提供了文件整体上锁(flock)和更细粒度的记录上锁(fcntl)功能,底层功能均可由fcntl函数实现. 首先来了解记录上锁.记录上锁是读写锁的一种扩展类型,它可用于有亲缘关系或无亲缘关系的进程间

  • JAVA多线程间通讯常用实现方法解析

    如何实现线程间通讯,有如下三种方法: 1.使用Semaphore (信号量)类来控制线程的等待和释放 功能:三个线程 a .b .c 并发运行,b,c 需要 a 线程的数据怎么实现 分析:考虑到多线程的不确定性, 因此我们不能确保 ThreadA 就一定先于 ThreadB 和 ThreadC 前执行,就算 ThreadA先执行了, 我们也无法保证 ThreadA 什么时候才能将变量 num 给初始化完成. 因此我们必须让 ThreadB 和 Thread去等待 ThreadA 完成任何后发出的

  • python 进程间数据共享multiProcess.Manger实现解析

    一.进程之间的数据共享 展望未来,基于消息传递的并发编程是大势所趋 即便是使用线程,推荐做法也是将程序设计为大量独立的线程集合,通过消息队列交换数据. 这样极大地减少了对使用锁定和其他同步手段的需求,还可以扩展到分布式系统中. 但进程间应该尽量避免通信,即便需要通信,也应该选择进程安全的工具来避免加锁带来的问题. 以后我们会尝试使用数据库来解决现在进程之间的数据共享问题. 1.1 Manager模块介绍 虽然进程间数据独立,但可以通过Manager实现数据共享,事实上Manager的功能远不止于

  • c#中多线程间的同步示例详解

    目录 一.引入 二.Lock 三.Monitor 四.Interlocked 五.Semaphore 六.Event 七.Barrier 八.ReaderWriterLockSlim 九.Mutex 十.ThreadLocal ,AsyncLocal,Volatile 十一.有意思的示例 总结 一.引入 先给出一个Num类的定义 internal class Num { public static int odd = 50000; public static int even = 10000;

随机推荐