进程间通信之深入消息队列的详解

最近在Hi3515上调试Qt与DVR程序,发现他们之间使用消息队列通信的,闲暇之余,就总结了一下消息队列,呵呵,自认为通俗易懂,同时,在应用中也发现了消息队列的强大之处。

关于线程的管理(互斥量和条件变量)见:Linux线程管理必备:解析互斥量与条件变量的详解

一、消息队列的特点

1.消息队列是消息的链表,具有特定的格式,存放在内存中并由消息队列标识符标识.
    2.消息队列允许一个或多个进程向它写入与读取消息.
    3.管道和命名管道都是通信数据都是先进先出的原则。
    4.消息队列可以实现消息的随机查询,消息不一定要以先进先出的次序读取,也可以按消息的类型读取.比FIFO更有优势。

    目前主要有两种类型的消息队列:POSIX消息队列以及系统V消息队列,系统V消息队列目前被大量使用。系统V消息队列是随内核持续的,只有在内核重起或者人工删除时,该消息队列才会被删除。

二、相关函数

1. 获得key值

    key_t ftok(char *pathname, int projid)

#include <sys/types.h>
#include <sys/ipc.h>
参数:
    pathname:文件名(含路径),通常设置为当前目录“.” 比如projid为'a',则为"./a"文件
    projid:项目ID,必须为非0整数(0-255).

2. 创建消息队列
    int msgget(key_t key, int msgflag)

#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>
功能:
    用于创建一个新的或打开一个已经存在的消息队列,此消息队列与key相对应。
参数:
    key:函数ftok的返回值或IPC_PRIVATE。
    msgflag:
        IPC_CREAT:创建新的消息队列。
        IPC_EXCL:与IPC_CREAT一同使用,表示如果要创建的消息队列已经存在,则返回错误。
        IPC_NOWAIT:读写消息队列要求无法满足时,不阻塞。
返回值:
    调用成功返回队列标识符,否则返回-1.

在以下两种情况下,将创建一个新的消息队列:
    1、如果没有与键值key相对应的消息队列,并且msgflag中包含了IPC_CREAT标志位。
    2、key参数为IPC_PRIVATE。

3. 消息队列属性控制
    int msgctl(int msqid,  int cmd,  struct msqid_ds *buf)
功能:

 对消息队列进行各种控制操作,操作的动作由cmd控制。
参数:
    msqid:消息队列ID,消息队列标识符,该值为msgget创建消息队列的返回值。
    cmd:
        IPC_STAT:将msqid相关的数据结构中各个元素的当前值存入到由buf指向的结构中.
        IPC_SET:将msqid相关的数据结构中的元素设置为由buf指向的结构中的对应值.
        IPC_RMID:删除由msqid指示的消息队列,将它从系统中删除并破坏相关数据结构.
buf:消息队列缓冲区
     struct msqid_ds {
               struct ipc_perm msg_perm;          /* Ownership and permissions*/
               time_t         msg_stime;                 /* Time of last msgsnd() */
               time_t         msg_rtime;                  /* Time of last msgrcv() */
               time_t         msg_ctime;                 /* Time of last change */
               unsigned long  __msg_cbytes;    /* Current number of bytes in  queue (non-standard) */
               msgqnum_t      msg_qnum;          /* Current number of messages  in queue */
               msglen_t       msg_qbytes;           /* Maximum number of bytesallowed in queue */
               pid_t          msg_lspid;                  /* PID of last msgsnd() */
               pid_t          msg_lrpid;                  /* PID of last msgrcv() */
                     };

  struct ipc_perm {
               key_t key;                        /* Key supplied to msgget() */
               uid_t uid;                         /* Effective UID of owner */
               gid_t gid;                        /* Effective GID of owner */
               uid_t cuid;                       /* Effective UID of creator */
               gid_t cgid;                      /* Effective GID of creator */
               unsigned short mode;    /* Permissions */
               unsigned short seq;       /* Sequence number */
                   };

4.发送信息到消息队列
    int msgsnd(int msqid,  struct msgbuf *msgp,  size_t msgsz,  int msgflag)
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>
功能:

将新消息添加到队列尾端,即向消息队列中发送一条消息。
参数:
    msqid:已打开的消息队列id
    msgp:存放消息的结构体指针。
    msgflag:函数的控制属性。
    消息结构msgbuf为:
    struct msgbuf
    {
         long mtype;//消息类型
         char mtext[1];//消息正文,消息数据的首地址,这个数据的最大长度为8012吧,又可把他看成是一个结构,也有类型和数据,recv时解析即可。
    }
    msgsz:消息数据的长度。
    msgflag:
         IPC_NOWAIT: 指明在消息队列没有足够空间容纳要发送的消息时,msgsnd立即返回。
         0:msgsnd调用阻塞直到条件满足为止.(一般选这个)

5. 从消息队列接收信息
    ssize_t msgrcv(int msqid,  struct msgbuf *msgp,  size_t msgsz,  long msgtype,  int msgflag)
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>
功能:

从队列中接收消息
参数:
    msqid:已打开的消息队列id
    msgp:存放消息的结构体指针。msgp->mtype与第四个参数是相同的。
    msgsz:消息的字节数,指定mtext的大小。
    msgtype:消息类型,消息类型 mtype的值。如果为0,则接受该队列中的第一条信息,如果小于0,则接受小于该值的绝对值的消息类型,如果大于0,接受指定类型的消息,即该值消息。
    msgflag:函数的控制属性。
    msgflag:
        MSG_NOERROR:若返回的消息比nbytes字节多,则消息就会截短到nbytes字节,且不通知消息发送进程.
        IPC_NOWAIT:调用进程会立即返回.若没有收到消息则返回-1.
        0:msgrcv调用阻塞直到条件满足为止.
在成功地读取了一条消息以后,队列中的这条消息将被删除。

三、相关例子

例1:消息队列之简单收发测试
#include <sys/types.h>
#include <sys/msg.h>
#include <unistd.h>
#include <string.h>
#include <stdio.h>

struct msg_buf{
     int mtype;//消息类型
     char data[255];//数据
};

int main(int argc,  char *argv[])
{
      key_t key;
      int msgid;
      int ret;
      struct msg_buf msgbuf;
      //获取key值
      key = ftok(".",  'a');
      printf("key = [%x]\n",  key);

 //创建消息队列
      msgid = msgget(key,  IPC_CREAT|0666);/*通过文件对应*/
      if(msgid == -1)
      {
            printf("creat error\n");
            return -1;
      }

   //以当前进程类型,非阻塞方式发送"test data"到消息队列
      msgbuf.mtype = getpid();
      strcpy(msgbuf.data,  "test data");
      ret = msgsnd(msgid,  &msgbuf,  sizeof(msgbuf.data),  IPC_NOWAIT);
      if(ret == -1)
      {
            printf("send message err\n");
            return -1;
      }

  //以非阻塞方式接收数据
      memset(&msgbuf,  0,  sizeof(msgbuf));
      ret = msgrcv(msgid,  &msgbuf,  sizeof(msgbuf.data), getpid(),  IPC_NOWAIT);
      if(ret == -1)
      {
            printf("receive message err\n");
            return -1;
      }
      printf("receive msg = [%s]\n",  msgbuf.data);
      return 0;
}
例2:进程间消息队列通信
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <sys/ipc.h>
#include <string.h>
#include <signal.h>

struct msgbuf{
      int mtype;
      char mtext[100];
};

int main(void)
{
      key_t key;
      pid_t pid;
      int msgid;
      struct msgbuf msg;

 key=ftok(".", 0x01);
      if ( (msgid = msgget(key,  IPC_CREAT|0666)) <0 )
      {
            perror("msgget error");
            exit(1);
      }

 //创建一个进程
      if ( (pid = fork()) < 0 )
      {
            perror("fork error");
            exit(1);
      }
      //子进程收信息
      else if (pid==0)
      {
            while(1)
            {
                  memset(msg.mtext, 0, 100);
                  msgrcv(msgid, &msg, 100, 2, 0);  //receive the msg from 2
                  printf("\receive:%s\n:", msg.mtext);
                  fflush(stdout);
            }
            exit(0);
      }
      //父进程发信息
      else
      {
            while(1)
            {
                  memset(msg.mtext, 0, 100);
                  printf("father:");
                  fgets(msg.mtext, 100, stdin);
                  if (strncmp("bye", msg.mtext, 3)==0)//如果前3个字符为bye,则退出
                  {
                        kill(pid, SIGSTOP);
                        exit(1);
                  }
                  msg.mtype=1;//send to 1
                  msg.mtext[strlen(msg.mtext)-1]='\0';
                  msgsnd(msgid, &msg, strlen(msg.mtext)+1, 0);
            }
      }

return 0;
}

 这个程序的缺点为:有发无收,有收无发。可在这2个进程中分别创建2个线程,分别负责收和发,就完成了进程间的通信。

(0)

相关推荐

  • Python复制文件操作实例详解

    本文实例讲述了Python复制文件操作用法.分享给大家供大家参考,具体如下: 这里用python实现了一个小型的自动发版本的工具.这个"自动发版本"有点虚, 只是简单地把debug 目录下的配置文件复制到指定目录,把Release下的生成文件复制到同一指定,过滤掉不需要的文件夹(.svn),然后再往这个指定目录添加几个特定的文件. 这个是我的第一个python小程序. 下面就来看其代码的实现. 首先插入必要的库: import os import os.path import shut

  • php Memcache 中实现消息队列

    对于一个很大的消息队列,频繁进行进行大数据库的序列化 和 反序列化,有太耗费.下面是我用PHP 实现的一个消息队列,只需要在尾部插入一个数据,就操作尾部,不用操作整个消息队列进行读取,与操作.但是,这个消息队列不是线程安全的,我只是尽量的避免了冲突的可能性.如果消息不是非常的密集,比如几秒钟才一个,还是可以考虑这样使用的. 如果你要实现线程安全的,一个建议是通过文件进行锁定,然后进行操作.下面是代码: 复制代码 代码如下: class Memcache_Queue { private $memc

  • 利用Python学习RabbitMQ消息队列

    RabbitMQ可以当做一个消息代理,它的核心原理非常简单:即接收和发送消息,可以把它想象成一个邮局:我们把信件放入邮箱,邮递员就会把信件投递到你的收件人处,RabbitMQ就是一个邮箱.邮局.投递员功能综合体,整个过程就是:邮箱接收信件,邮局转发信件,投递员投递信件到达收件人处. RabbitMQ和邮局的主要区别就是RabbitMQ接收.存储和发送的是二进制数据----消息. rabbitmq基本管理命令: 一步启动Erlang node和Rabbit应用:sudo rabbitmq-serv

  • PHP+memcache实现消息队列案例分享

    memche消息队列的原理就是在key上做文章,用以做一个连续的数字加上前缀记录序列化以后消息或者日志.然后通过定时程序将内容落地到文件或者数据库. php实现消息队列的用处比如在做发送邮件时发送大量邮件很费时间的问题,那么可以采取队列.方便实现队列的轻量级队列服务器是:starling支持memcache协议的轻量级持久化服务器https://github.com/starling/starlingBeanstalkd轻量.高效,支持持久化,每秒可处理3000左右的队列http://kr.gi

  • android开发教程之使用looper处理消息队列

    复制代码 代码如下: package com.yanjun; import android.app.Activity; import android.os.Bundle; import android.os.Handler; import android.os.HandlerThread; import android.os.Looper; import android.os.Message; public class HandlerActivity extends Activity { @Ov

  • python使用rabbitmq实现网络爬虫示例

    编写tasks.py 复制代码 代码如下: from celery import Celeryfrom tornado.httpclient import HTTPClientapp = Celery('tasks')app.config_from_object('celeryconfig')@app.taskdef get_html(url):    http_client = HTTPClient()    try:        response = http_client.fetch(u

  • PHP下操作Linux消息队列完成进程间通信的方法

    关于Linux系统进程通信的概念及实现可查看:http://www.ibm.com/developerworks/cn/linux/l-ipc/ 关于Linux系统消息队列的概念及实现可查看:http://www.ibm.com/developerworks/cn/linux/l-ipc/part4/ PHP的sysvmsg模块是对Linux系统支持的System V IPC中的System V消息队列函数族的封装.我们需要利用sysvmsg模块提供的函数来进进程间通信.先来看一段示例代码_1:

  • Python基于pygame实现的弹力球效果(附源码)

    本文实例讲述了Python基于pygame实现的弹力球效果.分享给大家供大家参考,具体如下: 运行效果: 代码部分如下: #A bouncing ball import sys, pygame __author__ = {'name' : 'Hongten', 'mail' : 'hongtenzone@foxmail.com', 'QQ' : '648719819', 'Version' : '1.0'} pygame.init() size = width, height = 600, 50

  • 进程间通信之深入消息队列的详解

    最近在Hi3515上调试Qt与DVR程序,发现他们之间使用消息队列通信的,闲暇之余,就总结了一下消息队列,呵呵,自认为通俗易懂,同时,在应用中也发现了消息队列的强大之处. 关于线程的管理(互斥量和条件变量)见:Linux线程管理必备:解析互斥量与条件变量的详解 一.消息队列的特点 1.消息队列是消息的链表,具有特定的格式,存放在内存中并由消息队列标识符标识.    2.消息队列允许一个或多个进程向它写入与读取消息.    3.管道和命名管道都是通信数据都是先进先出的原则.    4.消息队列可以

  • Android 消息队列模型详解及实例

    Android 消息队列模型详解及实例 Android系统的消息队列和消息循环都是针对具体线程的,一个线程可以存在(当然也可以不存在)一个消息队列(Message Queue)和一个消息循环(Looper).Android中除了UI线程(主线程),创建的工作线程默认是没有消息循环和消息队列的.如果想让该线程具有消息队列和消息循环,并具有消息处理机制,就需要在线程中首先调用Looper.prepare()来创建消息队列,然后调用Looper.loop()进入消息循环.如以下代码所示: class

  • .net msmq消息队列实例详解

    本文为大家分享了.net msmq消息队列实例代码,供大家参考,具体内容如下 1.msmq消息队列windows环境安装 控制面板---->程序和功能---->启用或关闭Windows程序---->Microsoft Message Queue(MSMQ)服务器 选中如图所示功能点击"确认"进行安装,安装好后可在 "计算机管理"中进行查看 2.创建消息队列实体对象 /// <summary> /// 消息实体 /// </summ

  • python分布式爬虫中消息队列知识点详解

    当排队等待人数过多的时候,我们需要设置一个等待区防止秩序混乱,同时再有新来的想要排队也可以呆在这个地方.那么在python分布式爬虫中,消息队列就相当于这样的一个区域,爬虫要进入这个区域找寻自己想要的资源,当然这个是一定的次序的,不然数据获取就会出现重复.就下来我们就python分布式爬虫中的消息队列进行详细解释,小伙伴们可以进一步了解一下. 实现分布式爬取的关键是消息队列,这个问题以消费端为视角更容易理解.你的爬虫程序部署到很多台机器上,那么他们怎么知道自己要爬什么呢?总要有一个地方存储了他们

  • RabbitMQ .NET消息队列使用详解

    本文实例为大家分享了RabbitMQ .NET消息队列使用方法,供大家参考,具体内容如下 首先下载安装包,我都环境是win7 64位: 去官网下载 otp_win64_19.0.exe  和rabbitmq-server-3.6.3.exe安装好 然后开始编程了: (1)创建生产者类: class Program { private static void Main() { //建立RabbitMQ连接和通道 var connectionFactory = new ConnectionFacto

  • windows消息和消息队列实例详解

    本文详细讲述了windows消息和消息队列的原理与应用方法.分享给大家供大家参考.具体分析如下: 与基于MS - DOS的应用程序不同,Windows的应用程序是事件(消息)驱动的.它们不会显式地调用函数(如C运行时库调用)来获取输入,而是等待windows向它们传递输入. windows系统把应用程序的输入事件传递给各个窗口,每个窗口有一个函数,称为窗口消息处理函数.窗口消息处理函数处理各种用户输入,处理完成后再将控制权交还给系统.窗口消息处理函数一般是在注册一个窗口的时候指定的.你可以从典型

  • iOS开发探索多线程GCD队列示例详解

    目录 引言 进程与线程 1.进程的定义 2.线程的定义 3. 进程和线程的关系 4. 多线程 5. 时间片 6. 线程池 GCD 1.任务 2.队列 3.死锁 总结 引言 在iOS开发过程中,绕不开网络请求.下载图片之类的耗时操作,这些操作放在主线程中处理会造成卡顿现象,所以我们都是放在子线程进行处理,处理完成后再返回到主线程进行展示. 多线程贯穿了我们整个的开发过程,iOS的多线程操作有NSThread.GCD.NSOperation,其中我们最常用的就是GCD. 进程与线程 在了解GCD之前

  • python 多进程队列数据处理详解

    我就废话不多说了,直接上代码吧! # -*- coding:utf8 -*- import paho.mqtt.client as mqtt from multiprocessing import Process, Queue import time, random, os import camera_person_num MQTTHOST = "172.19.4.4" MQTTPORT = 1883 mqttClient = mqtt.Client() q = Queue() # 连

  • Python并发编程线程消息通信机制详解

    目录 1 Event事件 2 Condition 3 Queue队列 4 总结一下 前面我已经向大家介绍了,如何使用创建线程,启动线程.相信大家都会有这样一个想法,线程无非就是创建一下,然后再start()下,实在是太简单了. 可是要知道,在真实的项目中,实际场景可要我们举的例子要复杂的多得多,不同线程的执行可能是有顺序的,或者说他们的执行是有条件的,是要受控制的.如果仅仅依靠前面学的那点浅薄的知识,是远远不够的. 那今天,我们就来探讨一下如何控制线程的触发执行. 要实现对多个线程进行控制,其实

  • Java Spring Boot消息服务万字详解分析

    目录 消息服务概述 为什么要使用消息服务 异步处理 应用解耦 流量削峰 分布式事务管理 常用消息中间件介绍 ActiveMQ RabbitMQ RocketMQ RabbitMQ消息中间件 RabbitMQ简介 RabbitMQ工作模式介绍 Work queues(工作队列模式) Public/Subscribe(发布订阅模式) Routing(路由模式) Topics(通配符模式) RPC Headers RabbitMQ安装以及整合环境搭建 安装RabbitMQ 下载RabbitMQ 安装R

随机推荐