实现posix消息队列示例分享

mqueue.h

代码如下:

//
//  mqueue.h
//  UNIX_C
//
//  Created by 周凯 on 14-2-9.
//  Copyright (c) 2014年 zk. All rights reserved.
//

#ifndef __PS_MQUEUE_H
#define __PS_MQUEUE_H

#include <unistd.h>
#include <sys/types.h>

typedef struct mq_info     *mqd_t;
typedef struct mq_attr    mq_attr;

#ifdef __cplusplus
extern "C" {
#endif

mqd_t   mq_open(const char *name, int flag, .../*mode_t mode, struct mq_attr *attr*/);
    int     mq_close(mqd_t mqdes);
    int     mq_unlink(const char *name);

int     mq_getattr(mqd_t mqdes,mq_attr *attr);
    int     mq_setattr(mqd_t mqdes,const mq_attr *attr,mq_attr *old);

int     mq_send(mqd_t mqdes,const char *ptr,size_t len,unsigned int prio);
    int     mq_receive(mqd_t mqdes,char *ptr,size_t len,unsigned int *priop);

//
    void    mq_info_test(mqd_t mqdes);

#ifdef __cplusplus
}
#endif
#endif

多进程,多线程创建同一个队列测试

代码如下:

#include <wrap_ext.h>
#include <mqueue.h>

void *create_mq(void *name){
    mqd_t mq;
    mq = mq_open("/tmp/mqfile", O_CREAT,FILE_MODE,0);

if (mq == (mqd_t) -1) {
        err_ret(errno, "mq_open() error");
        return 0;
    }

mq_info_test(mq);

mq_close(mq);

return 0;
}

int main(){

mq_unlink("/tmp/mqfile");

if (Fork() == 0) {
        create_mq("/tmp/mqfile");
        exit(0);
    }

Create_detach_thread(create_mq, "/tmp/mqfile");
    Create_detach_thread(create_mq, "/tmp/mqfile");

sleep(50);

//mq_unlink("/tmp/mqfile");

return 0;
}

测试结果

代码如下:

create,start create...
create,start init...
exists,wait get...
exists,wait get...
create,end init...
mq_hdr.mqh_free:116 bytes
msghdr size:268 bytesmap file size:3332 bytes
next msg offset and msg length:
[384,0];[652,0];[920,0];[1188,0];[1456,0];
[1724,0];[1992,0];[2260,0];[2528,0];exists,start get...
[2796,0];
[3064,0];[0,0];
end,start get...
exists,start get...
mq_hdr.mqh_free:116 bytes
msghdr size:268 bytesmap file size:3332 bytes
next msg offset and msg length:
[384,0];[652,0];[920,0];[1188,0];[1456,0];
[1724,0];[1992,0];[2260,0];[2528,0];[2796,0];
[3064,0];[0,0];
end,start get...
mq_hdr.mqh_free:116 bytes
msghdr size:268 bytesmap file size:3332 bytes
next msg offset and msg length:
[384,0];[652,0];[920,0];[1188,0];[1456,0];
[1724,0];[1992,0];[2260,0];[2528,0];[2796,0];
[3064,0];[0,0];
Program ended with exit code: 0

属性设置、获取测试

代码如下:

#include <wrap_ext.h>
#include <mqueue.h>

void print_attr(mq_attr *attr){
    assert(attr);

err_msg(" mq_attr mq_flag:0x%0x"
            " mq_curmsgs:%d"
            " mq_msgsize:%d"
            " mq_maxmsg:%d"
            ,attr->mq_flags
            ,attr->mq_curmsgs
            ,attr->mq_msgsize
            ,attr->mq_maxmsg);
}

void *create_mq(void *name){
    pthread_t tid;
    mq_attr attr,old;
    mqd_t mq;
    int flag;

flag = O_CREAT;

tid = pthread_self();

if ((long)tid % 2 != 0) {
        flag = O_NONBLOCK;
    }

mq = mq_open("/tmp/mqfile", flag | O_CREAT,FILE_MODE,0);

if (mq == (mqd_t) -1) {
        err_ret(errno, "mq_open() error");
        return 0;
    }

if ((long)tid % 2 == 0) {
        attr.mq_flags = O_NONBLOCK;
        mq_setattr(mq, &attr, &old);
    }
    else
        mq_getattr(mq, &old);

print_attr(&old);

//mq_info_test(mq);

mq_close(mq);

return 0;
}

int main(){
    pid_t pid;

mq_unlink("/tmp/mqfile");

if ((pid=Fork()) == 0) {
        create_mq("/tmp/mqfile3");
        Create_detach_thread(create_mq, "/tmp/mqfile1");
        Create_detach_thread(create_mq, "/tmp/mqfile2");
        sleep(1);
        exit(0);
    }

Create_detach_thread(create_mq, "/tmp/mqfile1");
    Create_detach_thread(create_mq, "/tmp/mqfile2");
    create_mq("/tmp/mqfile3");

wait(0);

sleep(5);

//mq_unlink("/tmp/mqfile");

return 0;
}

测试注册通知规则

代码如下:

#include <wrap_ext.h>
#include <mqueue.h>

int main(){
    pid_t pid;
    Init_wait();
    mqd_t mq;

sigevent_t sige;

mq_unlink("/tmp/mqfile");
    mq = mq_open("/tmp/mqfile", O_CREAT,FILE_MODE,0);

Signal(SIGCHLD, SIG_DFL);

if (mq == (mqd_t) -1) {
        err_sys(errno, "mq_open() error");
    }
    if ((pid=Fork()) == 0) {

if (mq_notify(mq, &sige) == -1)
            err_ret(errno, "mq_notify() error");
        Tell_parent();

Wait_parent();

End_wait();
        sleep(1);
        exit(0);
    }

Wait_child();
    /*子进程已注册,测试是否能注册、取消通知*/
    if (mq_notify(mq, 0) == -1)
        err_ret(errno, "mq_notify() error");
    if (mq_notify(mq, &sige) == -1)
        err_ret(errno, "mq_notify() error");
    Tell_child(pid);
    End_wait();

wait(0);

sleep(1);
    /*子进程已结束,测试是否能注册通知*/
    if (mq_notify(mq, &sige) == -1)
        err_ret(errno, "mq_notify() error");

//mq_unlink("/tmp/mqfile");

return 0;
}

mqueue.c

代码如下:

//
//  File.c
//  UNIX_C
//
//  Created by 周凯 on 14-2-9.
//  Copyright (c) 2014年 zk. All rights reserved.
//

#include "mqueue.h"
#include <wrap_ext.h>

#if !defined(_LINUX_)
#define va_mode_t   int
#else
#define va_mode_t   mode_t
#endif

typedef struct mq_info  mq_info;
typedef struct mq_hdr   mq_hdr;
//typedef struct mq_attr  mq_attr;
typedef struct mq_msg   mq_msg;

struct mq_hdr{
    mq_attr mqh_attr;
    long    mqh_head;
    long    mqh_free;

pthread_cond_t  mqh_conn;
    pthread_mutex_t mqh_mutex;
    sigevent_t      mqh_sigevent;
    pid_t   mqh_pid;
};

struct mq_msg{
    long    msg_next;/*从映射内存的地址起,到下一个消息的偏移值*/
    ssize_t msg_len;
    int     msg_prio;
};

struct mq_info{
    mq_hdr *mqi_hdr;
    long long   mqi_magic;
    int     mqi_flag;
};

#define MQ_MAXMSG   12
#define MQ_MSGSIZE  256
#define MQ_MAGIC    0x9235167840
/*
 防止以下情况:
    一个进程或线程以创建模式打开一个队列,
    随后CPU切换当前进程或线程到另一个正
    在打开此前创建的队列,但是该队列并未
    初始化完毕,故使用一个记录锁加一个线
    程锁,进行同步。
 注:
    该实现不是异步调用安全,即不能在信号处理函数中调用队列打开(创建)函数
 */
#define MQ_LOCK_FILE    "/tmp/mq_lock_file"
static struct mq_attr def_attr = {0,MQ_MAXMSG,MQ_MSGSIZE,0};
static pthread_once_t __mq_once = PTHREAD_ONCE_INIT;
static pthread_mutex_t __mq_lock;
static pthread_key_t __mq_key;

static void __mq_once_init();
static int  __mq_get_filelock();
static void *__mq_mmap_file(int fd,mq_attr *attr);
static int  __mq_init_mmap(void *ptr,mq_attr *attr);
static void __mq_unmap(const char *name,void *ptr);

static void __mq_once_init(){
    pthread_mutexattr_t mattr;

Pthread_mutexattr_init(&mattr);
    Pthread_mutexattr_settype(&mattr, PTHREAD_MUTEX_RECURSIVE);
    Pthread_mutex_init(&__mq_lock, &mattr);
    Pthread_mutexattr_destroy(&mattr);

Pthread_key_create(&__mq_key, 0);
}

static int  __mq_get_filelock(){
    int fd,tmp;

Pthread_mutex_lock(&__mq_lock);
    if ((fd = (int)Pthread_getspecific(__mq_key)) == 0) {
        fd = open(MQ_LOCK_FILE, O_CREAT | O_EXCL | O_WRONLY, FILE_MODE);
        if (fd == -1 && errno != EEXIST)
            err_sys(errno, "mq_open(),__mq_get_filelock() error");
        else
            fd =Open(MQ_LOCK_FILE, O_WRONLY, 0);
        if (fd == 0) {
            tmp = Open(MQ_LOCK_FILE, O_WRONLY, 0);
            close(fd);
            fd = tmp;
        }
        Pthread_setspecific(__mq_key, (void*)fd);
    }
    Pthread_mutex_unlock(&__mq_lock);

return fd;
}

static void *__mq_mmap_file(int fd,mq_attr *attr){
    size_t filesize;
    void *ptr;

if (attr == 0) {
        attr = &def_attr;
    }

if (attr->mq_maxmsg <= 0 || attr->mq_msgsize <= 0) {
        errno = EINVAL;
        return MAP_FAILED;
    }

filesize = sizeof(mq_hdr)+(sizeof(mq_msg)+ALIGN_VAL(attr->mq_msgsize, sizeof(long)))*attr->mq_maxmsg;

if(lseek(fd, filesize - 1, SEEK_SET)<0)
        return MAP_FAILED;
    if(write(fd,"",1)!=1)
        return MAP_FAILED;

ptr = mmap(NULL, filesize, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);

return ptr;
}

static void __mq_unmap(const char *name,void *ptr){
    size_t filesize;
    stat_t fstat;

assert(name);

if (stat(name, &fstat) == -1) {
        return;
    }

filesize = (size_t)fstat.st_size;
    unlink(name);
    if (ptr == MAP_FAILED) {
        return;
    }
    munmap(ptr, filesize);

return;
}

static int  __mq_init_mmap(void *ptr,mq_attr *attr){
    char *tmp;
    size_t index,i;
    int flag;
    mq_hdr *mqhdr;
    mq_msg *mqmsg;
    pthread_condattr_t cattr;
    pthread_mutexattr_t mattr;

assert(ptr);
    if (attr == 0) {
        attr = &def_attr;
    }

if (attr->mq_maxmsg <= 0 || attr->mq_msgsize <= 0) {
        errno = EINVAL;
        return -1;
    }

tmp = ptr;
    mqhdr = (mq_hdr*)tmp;
    mqhdr->mqh_attr.mq_flags = 0;
    mqhdr->mqh_attr.mq_curmsgs = 0;
    mqhdr->mqh_attr.mq_maxmsg = attr->mq_maxmsg;
    mqhdr->mqh_attr.mq_msgsize = attr->mq_msgsize;

flag = pthread_condattr_init(&cattr);
    if (flag) {
        errno = flag;
        return -1;
    }

flag = pthread_condattr_setpshared(&cattr, PTHREAD_PROCESS_SHARED);
    if (flag) {
        errno = flag;
        return -1;
    }

flag = pthread_cond_init(&mqhdr->mqh_conn, &cattr);
    if (flag) {
        errno = flag;
        return -1;
    }

flag = pthread_condattr_destroy(&cattr);
    if (flag) {
        errno = flag;
        return -1;
    }

flag = pthread_mutexattr_init(&mattr);
    if (flag) {
        errno = flag;
        return -1;
    }

flag = pthread_mutexattr_setpshared(&mattr, PTHREAD_PROCESS_SHARED);
    if (flag) {
        errno = flag;
        return -1;
    }

flag = pthread_mutex_init(&mqhdr->mqh_mutex, &mattr);
    if (flag) {
        errno = flag;
        return -1;
    }

flag = pthread_mutexattr_destroy(&mattr);
    if (flag) {
        errno = flag;
        return -1;
    }

index = mqhdr->mqh_free = sizeof(mq_hdr);
    mqmsg = (mq_msg*)(tmp+index);

for (i = 0; i < attr->mq_maxmsg - 1; i++) {
        mqmsg->msg_next = sizeof(mq_msg) + ALIGN_VAL(attr->mq_msgsize, sizeof(long)) + index;
        index = mqmsg->msg_next;
        mqmsg ++;
        //mqmsg = (mq_msg*)(tmp+index);
    }
    mqmsg->msg_next = 0;

return 0;
}

mqd_t   mq_open(const char *name,int flag,...){
    int fd, nonblock, lockfile_fd, err;
    void *ptr;
    mq_attr *mqattr;
    mqd_t mqdesc;
    stat_t filestat;

debug_assert("Invalid pointer", "mq_open()", name);

Pthread_once(&__mq_once, __mq_once_init);

nonblock = flag & O_NONBLOCK;
    mqattr = NULL;
    mqdesc = NULL;
    ptr = MAP_FAILED;
__again:
    if (flag & O_CREAT) {
        va_list vp;
        mode_t mode;

/*分析可变参数*/
        va_start(vp, flag);
        mode = va_arg(vp, va_mode_t);
        mqattr = va_arg(vp, mq_attr *);
        va_end(vp);

Pthread_mutex_lock(&__mq_lock);
        lockfile_fd = __mq_get_filelock();
        write_lock_wait(lockfile_fd, SEEK_SET, 0, 0);

fd = open(name, flag | O_CREAT | O_EXCL | O_RDWR, mode);
        if (fd < 0) {
            /*如果指定了O_EXCL,并且文件已存在,则等待其他进程或线程完成初始化*/
            if (errno == EEXIST && (flag & O_EXCL) == 1) {
                return (mqd_t)-1;
            }
            goto __exists_wait_init;
        }
        /*初始化内存映射文件*/

err_msg("create,start init...");
        /*初始化映射文件大小(注意必须使文件长度达到映射的大小),且映射文件到内存*/
        ptr = __mq_mmap_file(fd, mqattr);
        //sleep(1);
        if (ptr == MAP_FAILED) {
            goto __err;
        }

/*初始化映射内存的内容*/
        if (__mq_init_mmap(ptr, mqattr) < 0) {
            goto __err;
        }

mqdesc = (mqd_t)calloc(1, sizeof(mq_hdr));
        if (mqdesc == 0) {
            goto __err;
        }

mqdesc->mqi_hdr = (mq_hdr*)ptr;
        mqdesc->mqi_flag = nonblock;
        mqdesc->mqi_magic = MQ_MAGIC;

err_msg("create,end init...");

file_unlock(lockfile_fd, SEEK_SET, 0, 0);
        Pthread_mutex_unlock(&__mq_lock);

return mqdesc;
    }
__exists_wait_init:
    fd = open(name, O_RDWR, 0);
    if (fd < 0 ) {
        if (errno == ENOENT && (flag & O_CREAT)) {
            goto __again;
        }
        goto __err;
    }

err_msg("exists,start get...");

if (stat(name, &filestat) == -1) {
        if (errno == ENOENT && (flag & O_CREAT)) {
            goto __again;
        }
        goto __err;
    }

ptr = mmap(0, (size_t)filestat.st_size, PROT_READ|PROT_WRITE, MAP_SHARED, fd, 0);

if (ptr == MAP_FAILED) {
        goto __err;
    }

mqdesc = (mqd_t)calloc(1, sizeof(mq_hdr));
    if (mqdesc == 0) {
        goto __err;
    }

mqdesc->mqi_hdr = (mq_hdr*)ptr;
    mqdesc->mqi_flag = nonblock;
    mqdesc->mqi_magic = MQ_MAGIC;

close(fd);

file_unlock(lockfile_fd, SEEK_SET, 0, 0);
    Pthread_mutex_unlock(&__mq_lock);

err_msg("end,start get...");

return mqdesc;

__err:
    file_unlock(lockfile_fd, SEEK_SET, 0, 0);
    Pthread_mutex_unlock(&__mq_lock);

err = errno;
    __mq_unmap(name, ptr);
    close(fd);
    if (mqdesc)
        free(mqdesc);
    errno = err;
    return (mqd_t)-1;
}

int     mq_close(mqd_t mqdes){
    size_t filesize;
    mq_attr *mattr;
    int flag;

assert(mqdes);

if (mqdes->mqi_magic != MQ_MAGIC) {
        errno = EBADF;
        return -1;
    }

mattr = &mqdes->mqi_hdr->mqh_attr;
    filesize = mattr->mq_maxmsg * (sizeof(mq_msg)* ALIGN_VAL(mattr->mq_msgsize, sizeof(long))) + sizeof(mq_hdr);
    flag = munmap((void*)mqdes->mqi_hdr, filesize);

mqdes->mqi_magic = 0;
    free(mqdes);

return flag;
}

int     mq_unlink(char const *name){
    assert(name);
    return unlink(name);
}

int     mq_getattr(mqd_t mqdes,mq_attr *attr){
    int flag;
    mq_attr *tmp;

assert(mqdes);
    assert(attr);

if (mqdes->mqi_magic != MQ_MAGIC) {
        errno = EBADF;
        return -1;
    }

tmp = &mqdes->mqi_hdr->mqh_attr;

/*防止其他进程或线程在改变属性值*/
    flag = pthread_mutex_lock(&mqdes->mqi_hdr->mqh_mutex);
    if (flag > 0) {
        errno = flag;
        return -1;
    }

bcopy(&mqdes->mqi_hdr->mqh_attr, attr, sizeof(mq_attr));
    attr->mq_flags = mqdes->mqi_flag;

flag = pthread_mutex_unlock(&mqdes->mqi_hdr->mqh_mutex);
    if (flag > 0) {
        errno = flag;
        return -1;
    }

return 0;
}

int     mq_setattr(mqd_t mqdes,const mq_attr *attr,mq_attr *old){
    int flag;
    mq_attr *tmp;

assert(mqdes);
    assert(attr);

if (mqdes->mqi_magic != MQ_MAGIC) {
        errno = EBADF;
        return -1;
    }

tmp = &mqdes->mqi_hdr->mqh_attr;

/*防止其他进程或线程在读取属性值*/
    flag = pthread_mutex_lock(&mqdes->mqi_hdr->mqh_mutex);
    if (flag > 0) {
        errno = flag;
        return -1;
    }
    if (old != NULL) {
        bcopy(&mqdes->mqi_hdr->mqh_attr, old, sizeof(mq_attr));
        old->mq_flags = mqdes->mqi_flag;
    }
    /*创建后,只有文件标识可以改变*/
    //bcopy(attr, &mqdes->mqi_hdr->mqh_attr, sizeof(mq_attr));

/*只有O_NONBLOCK标志可以存储*/
    if (attr->mq_flags & O_NONBLOCK) {
        mqdes->mqi_flag |= O_NONBLOCK;
    }
    else {
        mqdes->mqi_flag &= ~O_NONBLOCK;
    }

flag = pthread_mutex_unlock(&mqdes->mqi_hdr->mqh_mutex);
    if (flag > 0) {
        errno = flag;
        return -1;
    }

return 0;
}

int     mq_notify(mqd_t mqdes,const struct sigevent *notification){
    sigevent_t *old;
    pid_t pid;
    int flag;

assert(mqdes);

if (mqdes->mqi_magic != MQ_MAGIC) {
        errno = EBADF;
        return -1;
    }

flag = pthread_mutex_lock(&mqdes->mqi_hdr->mqh_mutex);
    if (flag > 0) {
        errno = flag;
        return -1;
    }

pid = mqdes->mqi_hdr->mqh_pid;

/*已设置*/
    if (pid != 0) {
        /*发送一个0信号给注册的进程,如果能发送,或者不能发送但不是返回没有进程的错误(可能权限不够),则不能再次注册通知*/

/*有效进程*/
        if (kill(pid, 0) != -1 || errno != ESRCH) {

if (notification == 0) {
                if (pid != getpid()) {
                    errno = EPERM;
                    flag = -1;
                }
                else {
                    mqdes->mqi_hdr->mqh_pid = 0;
                    flag = 0;
                }
            }
            else {
                errno = EBUSY;
                flag = -1;
            }
            goto __return;
        }
        /*无效进程*/
    }
    /*未设置*/
    if (notification != 0) {
        mqdes->mqi_hdr->mqh_pid = getpid();
        old = &mqdes->mqi_hdr->mqh_sigevent;
        bcopy(notification, old, sizeof(sigevent_t));
    }

flag = 0;

__return:
    pthread_mutex_unlock(&mqdes->mqi_hdr->mqh_mutex);

return flag;
}

void    mq_info_test(mqd_t mqdes){
    size_t i,msgsize,index;
    mq_msg *msg;
    mq_attr *mattr;
    assert(mqdes);

mattr = &mqdes->mqi_hdr->mqh_attr;
    msgsize = sizeof(mq_msg) + ALIGN_VAL(mattr->mq_msgsize, sizeof(long));
    index = mqdes->mqi_hdr->mqh_free;
    err_msg("mq_hdr.mqh_free:%ld bytes\n"
            "msghdr size:%u bytes"
            "map file size:%u bytes"
            , index
            , msgsize
            , mattr->mq_maxmsg * msgsize + index);
    err_msg("next msg offset and msg length:");
    msg = (mq_msg*)&((char*)mqdes->mqi_hdr)[index];
    for (i = 0; i < mattr->mq_maxmsg; i++) {
        fprintf(stderr, "[%ld,%ld];", msg->msg_next, msg->msg_len);
        if ((i+1)%5 == 0) {
            fprintf(stderr,"\n");
        }
        msg ++ ;
    }
    if ((i+1)%5 != 0) {
        fprintf(stderr,"\n");
    }

return;
}

(0)

相关推荐

  • POSIX 风格和兼容 Perl 风格两种正则表达式主要函数的类比(preg_match, preg_replace, ereg, ereg_replace)

    首先来看看 POSIX 风格正则表达式的两个主要函数: ereg 函数:(正则表达式匹配) 格式:int ereg ( string pattern, string string [, array &regs] ) 注意:使用 Perl 兼容正则表达式语法的 preg_match() 函数通常是比 ereg() 更快的替代方案.(一般的话还是使用 preg_match() ,比较好勒~~) 以区分大小写的方式在 string 中寻找与给定的正则表达式 pattern 所匹配的子串.如果找到与 p

  • 解析posix与perl标准的正则表达式区别

    正则表达式(Regular Expression,缩写为regexp,regex或regxp),又称正规表达式.正规表示式或常规表达式或正规化表示法或正规表示法,是指一个用 来描述或者匹配一系列符合某个句法规则的字符串的单个字符串.在很多文本编辑器或其他工具里,正则表达式通常被用来检索和/或替换那些符合某个模式的文本 内容.许多程序设计语言都支持利用正则表达式进行字符串操作.例如,在Perl中就内建了一个功能强大的在正则表达式引擎.正则表达式这个概念最初是由 Unix中的工具软件(例如sed和g

  • php的POSIX 函数以及进程测试的深入分析

    复制代码 代码如下: <?phpecho posix_getpid(); //8805sleep(10);?> 再用 #ps -ax 这个时候如果多开开个浏览器请求,就会发现Apache自动增加了几个新的进程 我们发现并非一直请求同一个进程 重启apache# /usr/local/apache2/bin/apachectl restart我们发现Apache又恢复到默认进程数.

  • 实现posix消息队列示例分享

    mqueue.h 复制代码 代码如下: ////  mqueue.h//  UNIX_C////  Created by 周凯 on 14-2-9.//  Copyright (c) 2014年 zk. All rights reserved.// #ifndef __PS_MQUEUE_H#define __PS_MQUEUE_H #include <unistd.h>#include <sys/types.h> typedef struct mq_info     *mqd_t

  • PHP+memcache实现消息队列案例分享

    memche消息队列的原理就是在key上做文章,用以做一个连续的数字加上前缀记录序列化以后消息或者日志.然后通过定时程序将内容落地到文件或者数据库. php实现消息队列的用处比如在做发送邮件时发送大量邮件很费时间的问题,那么可以采取队列.方便实现队列的轻量级队列服务器是:starling支持memcache协议的轻量级持久化服务器https://github.com/starling/starlingBeanstalkd轻量.高效,支持持久化,每秒可处理3000左右的队列http://kr.gi

  • PHP+MySQL实现消息队列的方法分析

    本文实例讲述了PHP+MySQL实现消息队列的方法.分享给大家供大家参考,具体如下: 最近遇到一个批量发送短信的需求,短信接口是第三方提供的.刚开始想到,获取到手机号之后,循环调用接口发送不就可以了吗? 但很快发现问题:当短信数量很大时,不仅耗时,而且成功率很低. 于是想到,用PHP和MySQL实现一个消息队列,一条一条的发送短信.下面介绍具体的实现方法: 首先,建立一个数据表sms,包含以下字段: id, phone, //手机号 content //短信内容 将需要发送的短信和手机号存入sm

  • Python中线程的MQ消息队列实现以及消息队列的优点解析

    "消息队列"是在消息的传输过程中保存消息的容器.消息队列管理器在将消息从它的源中继到它的目标时充当中间人.队列的主要目的是提供路由并保证消息的传递:如果发送消息时接收者不可用,消息队列会保留消息,直到可以成功地传递它.相信对任何架构或应用来说,消息队列都是一个至关重要的组件,下面是十个理由: Python的消息队列示例: 1.threading+Queue实现线程队列 #!/usr/bin/env python import Queue import threading import

  • 进程间通信之深入消息队列的详解

    最近在Hi3515上调试Qt与DVR程序,发现他们之间使用消息队列通信的,闲暇之余,就总结了一下消息队列,呵呵,自认为通俗易懂,同时,在应用中也发现了消息队列的强大之处. 关于线程的管理(互斥量和条件变量)见:Linux线程管理必备:解析互斥量与条件变量的详解 一.消息队列的特点 1.消息队列是消息的链表,具有特定的格式,存放在内存中并由消息队列标识符标识.    2.消息队列允许一个或多个进程向它写入与读取消息.    3.管道和命名管道都是通信数据都是先进先出的原则.    4.消息队列可以

  • Java利用Redis实现消息队列的示例代码

    本文介绍了Java利用Redis实现消息队列的示例代码,分享给大家,具体如下: 应用场景 为什么要用redis? 二进制存储.java序列化传输.IO连接数高.连接频繁 一.序列化 这里编写了一个java序列化的工具,主要是将对象转化为byte数组,和根据byte数组反序列化成java对象; 主要是用到了ByteArrayOutputStream和ByteArrayInputStream; 注意:每个需要序列化的对象都要实现Serializable接口; 其代码如下: package Utils

  • php-beanstalkd消息队列类实例分享

    本文实例为大家分享了php beanstalkd消息队列类的具体代码,供大家参考,具体内容如下 <?php namespace Common\Business; /** * beanstalk: A minimalistic PHP beanstalk client. * * Copyright (c) 2009-2015 David Persson * * Distributed under the terms of the MIT License. * Redistributions of

  • PHP使用redis消息队列发布微博的方法示例

    本文实例讲述了PHP使用redis消息队列发布微博的方法.分享给大家供大家参考,具体如下: 在一些用户发布内容应用中,可能出现1秒上万个用户同时发布消息的情况,此时使用mysql可能会出现" too many connections"错误,当然把Mysql的max_connections参数设置为更大数,不过这是一个治标不治本的方法.而使用redis的消息队列,把用户发布的消息暂时存储在消息队列中,然后使用多个cron程序把消息队列中的数据插入到Mysql.这样就有效的降低了Mysql

  • Python tornado队列示例-一个并发web爬虫代码分享

    Queue Tornado的tornado.queue模块为基于协程的应用程序实现了一个异步生产者/消费者模式的队列.这与python标准库为多线程环境实现的queue模块类似. 一个协程执行到yieldqueue.get会暂停,直到队列中有条目.如果queue有上限,一个协程执行yieldqueue.put将会暂停,直到队列中有空闲的位置. 在一个queue内部维护了一个未完成任务的引用计数,每调用一次put操作便会增加引用计数,而调用task_done操作将会减少引用计数. 下面是一个简单的

  • php+redis实现消息队列功能示例

    本文实例讲述了php+redis实现消息队列功能.分享给大家供大家参考,具体如下: 个人理解在项目中使用消息队列一般是有如下几个原因: 把瞬间服务器的请求处理换成异步处理,缓解服务器的压力 实现数据顺序排列获取 redis实现消息队列步骤如下: 1).redis函数rpush,lpop 2).建议定时任务入队列 3)创建定时任务出队列 文件:demo.php插入数据到redis队列 <?php $redis = new Redis(); $redis->connect('127.0.0.1',

随机推荐