基于条件变量的消息队列 说明介绍

条件变量是线程之前同步的另一种机制。条件变量给多线程提供了一种会和的场所。当条件变量和互斥锁一起使用时,允许线程以无竞争的方式等待特定的条件发生。这样大大减少了锁竞争引起的线程调度和线程等待。

消息队列是服务器端开发过程中绕不开的一道坎,前面,我已经实现了一个基于互斥锁和三队列的消息队列,性能很不错。博客园中的其他园主也实现了很多基于环形队列和lock-free的消息队列,很不错,今天我们将要实现一个基于双缓冲、互斥锁和条件变量的消息队列;这个大概也参考了一下java的blockingqueue,在前面一个博客中有简单介绍!!基于三缓冲的队列,虽然最大限度上解除了线程竞争,但是在玩家很少,消息很小的时候,需要添加一些buff去填充数据,这大概也是其一个缺陷吧!

消息队列在服务器开发过程中主要用于什么对象呢?

1: 我想大概就是通信层和逻辑层之间的交互,通信层接受到的网络数据,验证封包之后,通过消息队列传递给逻辑层,逻辑层将处理结果封包再传递给通信层!

2:逻辑线程和数据库IO线程的分离;数据库IO线程负责对数据库的读写更新,逻辑层对数据库的操作,封装成消息去请求数据库IO线程,数据库IO线程处理完之后,再交回给逻辑层。

3:日志;处理模式与方式2 类似。不过日志大概是不需要返回的!

给出源代码:

BlockingQueue.h文件

代码如下:

/*
 * BlockingQueue.h
 *
 *  Created on: Apr 19, 2013
 *      Author: archy_yu
 */

#ifndef BLOCKINGQUEUE_H_
#define BLOCKINGQUEUE_H_

#include <queue>
#include <pthread.h>

typedef void* CommonItem;

class BlockingQueue
{
public:
    BlockingQueue();

virtual ~BlockingQueue();

int peek(CommonItem &item);

int append(CommonItem item);

private:

pthread_mutex_t _mutex;

pthread_cond_t _cond;

std::queue<CommonItem> _read_queue;

std::queue<CommonItem> _write_queue;

};

#endif /* BLOCKINGQUEUE_H_ */

BlockingQueue.cpp 文件代码


代码如下:

/*
 * BlockingQueue.cpp
 *
 *  Created on: Apr 19, 2013
 *      Author: archy_yu
 */

#include "BlockingQueue.h"

BlockingQueue::BlockingQueue()
{
    pthread_mutex_init(&this->_mutex,NULL);
    pthread_cond_init(&this->_cond,NULL);
}

BlockingQueue::~BlockingQueue()
{
    pthread_mutex_destroy(&this->_mutex);
    pthread_cond_destroy(&this->_cond);
}

int BlockingQueue::peek(CommonItem &item)
{

if( !this->_read_queue.empty() )
    {
        item = this->_read_queue.front();
        this->_read_queue.pop();
    }
    else
    {
        pthread_mutex_lock(&this->_mutex);

while(this->_write_queue.empty())
        {
            pthread_cond_wait(&this->_cond,&this->_mutex);
        }

while(!this->_write_queue.empty())
        {
            this->_read_queue.push(this->_write_queue.front());
            this->_write_queue.pop();
        }

pthread_mutex_unlock(&this->_mutex);
    }

return 0;
}

int BlockingQueue::append(CommonItem item)
{
    pthread_mutex_lock(&this->_mutex);
    this->_write_queue.push(item);
    pthread_cond_signal(&this->_cond);
    pthread_mutex_unlock(&this->_mutex);
    return 0;
}

测试代码:


代码如下:

BlockingQueue _queue;

void* process(void* arg)
{

int i=0;
    while(true)
    {
        int *j = new int();
        *j = i;
        _queue.append((void *)j);
        i ++;
    }
    return NULL;
}

int main(int argc,char** argv)
{
    pthread_t pid;
    pthread_create(&pid,0,process,0);

long long int start = get_os_system_time();
    int i = 0;
    while(true)
    {
        int* j = NULL;
        _queue.peek((void* &)j);

i ++;

if(j != NULL && (*j) == 100000)
        {
            long long int end = get_os_system_time();
            printf("consume %d\n",end - start);
            break;
        }
    }

return 0;
}

(0)

相关推荐

  • 基于条件变量的消息队列 说明介绍

    条件变量是线程之前同步的另一种机制.条件变量给多线程提供了一种会和的场所.当条件变量和互斥锁一起使用时,允许线程以无竞争的方式等待特定的条件发生.这样大大减少了锁竞争引起的线程调度和线程等待. 消息队列是服务器端开发过程中绕不开的一道坎,前面,我已经实现了一个基于互斥锁和三队列的消息队列,性能很不错.博客园中的其他园主也实现了很多基于环形队列和lock-free的消息队列,很不错,今天我们将要实现一个基于双缓冲.互斥锁和条件变量的消息队列:这个大概也参考了一下java的blockingqueue

  • 浅谈互斥锁为什么还要和条件变量配合使用

    mutex体现的是一种竞争,我离开了,通知你进来. cond体现的是一种协作,我准备好了,通知你开始吧. 互斥锁一个明显的缺点是它只有两种状态:锁定和非锁定.而条件变量通过允许线程阻塞和等待另一个线程发送信号的方法弥补了互斥锁的不足,它常和互斥锁一起配合使用.使用时,条件变量被用来阻塞一个线程,当条件不满足时,线程往往解开相应的互斥锁并等待条件发生变化.一旦其他的某个线程改变了条件变量,他将通知相应的条件变量唤醒一个或多个正被此条件变量阻塞的线程.这些线程将重新锁定互斥锁并重新测试条件是否满足.

  • 详解C++11中的线程锁和条件变量

    线程 std::thread类, 位于<thread>头文件,实现了线程操作.std::thread可以和普通函数和 lambda 表达式搭配使用.它还允许向线程的执行函数传递任意多参数. #include <thread> void func() { // do some work } int main() { std::thread t(func); t.join(); return 0; } 上面的例子中,t是一个线程实例,函数func()在该线程运行.调用join()函数是

  • C++基于消息队列的多线程实现示例代码

    前言 实现消息队列的关键因素是考量不同线程访问消息队列的同步问题.本实现涉及到几个知识点 std::lock_guard 介绍 std::lock_gurad 是 C++11 中定义的模板类.定义如下: template <class Mutex> class lock_guard; lock_guard 对象通常用于管理某个锁(Lock)对象,因此与 Mutex RAII 相关,方便线程对互斥量上锁,即在某个 lock_guard 对象的声明周期内,它所管理的锁对象会一直保持上锁状态:而 l

  • 大数据Kafka:消息队列和Kafka基本介绍

    目录 一.什么是消息队列 二.消息队列的应用场景 异步处理 应用耦合 限流削峰 消息驱动系统 三.消息队列的两种方式 点对点模式 发布/订阅模式 四.常见的消息队列的产品 1) RabbitMQ 2) activeMQ: 3) RocketMQ 4) kafka 五.Kafka的基本介绍 一.什么是消息队列 消息队列,英文名:Message Queue,经常缩写为MQ.从字面上来理解,消息队列是一种用来存储消息的队列 .来看一下下面的代码 上述代码,创建了一个队列,先往队列中添加了一个消息,然后

  • 消息队列应用场景介绍

    一.什么是队列 队列(Queue)是一种常见的数据结构,其最大的特点就是先进先出(First In First Out),作为最基础的数据结构,队列应用很广泛.比如火车站排队买票等等.可以用下图表示队列: 其中a1.a2.an表示队列中的数据.数据从队尾入队列,然后从队头出队列. 二.什么是消息队列 消息队列(Message Queue)是一种使用队列(Queue)作为底层存储数据结构,可以用于解决不同进程与应用程序之间通讯的分布式消息容器,也可以称为消息中间件. 目前比较常用的消息队列有Act

  • PHP基于Redis消息队列实现发布微博的方法

    本文实例讲述了PHP基于Redis消息队列实现发布微博的方法.分享给大家供大家参考,具体如下: phpRedisAdmin :github地址  图形化管理界面 git clone [url]https://github.com/ErikDubbelboer/phpRedisAdmin.git[/url] cd phpRedisAdmin git clone [url]https://github.com/nrk/predis.git[/url] vendor 首先安装上述的Redis图形化管理

  • php基于Redis消息队列实现的消息推送的方法

    基本知识点 重点用到了以下命令实现我们的消息推送 brpop 阻塞模式 从队列右边获取值之后删除 brpoplpush 从队列A的右边取值之后删除,从左侧放置到队列B中 逻辑分析 在普通的任务脚本中写入push_queue队列要发送消息的目标,并为目标设置一个要推送的内容,永不过期 RedisPushQueue中brpoplpush处理,处理后的值放到temp_queue,主要防止程序崩溃造成推送失败 RedisAutoDeleteTempqueueItems处理temp_queue,这里用到了

  • 基于python实现操作redis及消息队列

    操作 redis import redis redisPool = redis.ConnectionPool(host='192.168.100.50', port=6379, db=8) redis= redis.Redis(connection_pool=redisPool) redis.set('key','values') redis.get('com') redis.append('keys','values') redis.delete('keys') print(redis.get

  • SpringBoot利用redis集成消息队列的方法

    一.pom文件依赖 <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-redis</artifactId> </dependency> 二.创建消息接收者 变量.方法及构造函数进行标注,完成自动装配的工作. 通过 @Autowired的使用来消除 set ,get方法. @Autowired pub

随机推荐