PHP实现基于Redis的MessageQueue队列封装操作示例

本文实例讲述了PHP实现基于Redis的MessageQueue队列封装操作。分享给大家供大家参考,具体如下:

Redis的链表List可以用来做链表,高并发的特性非常适合做分布式的并行消息传递。

项目地址:https://github.com/huyanping/Zebra-PHP-Framework

左进右出

$redis->lPush($key, $value);
$redis->rPop($key);

以下程序已在生产环境中正式使用。

基于Redis的PHP消息队列封装

<?php
/**
 * Created by PhpStorm.
 * User: huyanping
 * Date: 14-8-19
 * Time: 下午12:10
 *
 * 基于Redis的消息队列封装
 */
namespace Zebra\MessageQueue;
class RedisMessageQueue implements IMessageQueue
{
  protected $redis_server;
  protected $server;
  protected $port;
  /**
   * @var 消息队列标志
   */
  protected $key;
  /**
   * 构造队列,创建redis链接
   * @param $server_config
   * @param $key
   * @param bool $p_connect
   */
  public function __construct($server_config = array('IP' => '127.0.0.1', 'PORT' => '6379'), $key = 'redis_message_queue', $p_connect = false)
  {
    if (empty($key))
      throw new \Exception('message queue key can not be empty');
    $this->server = $server_config['IP'];
    $this->port = $server_config['PORT'];
    $this->key = $key;
    $this->check_environment();
    if ($p_connect) {
      $this->pconnect();
    } else {
      $this->connect();
    }
  }
  /**
   * 析构函数,关闭redis链接,使用长连接时,最好主动调用关闭
   */
  public function __destruct()
  {
    $this->close();
  }
  /**
   * 短连接
   */
  private function connect()
  {
    $this->redis_server = new \Redis();
    $this->redis_server->connect($this->server, $this->port);
  }
  /**
   * 长连接
   */
  public function pconnect()
  {
    $this->redis_server = new \Redis();
    $this->redis_server->pconnect($this->server, $this->port);
  }
  /**
   * 关闭链接
   */
  public function close()
  {
    $this->redis_server->close();
  }
  /**
   * 向队列插入一条信息
   * @param $message
   * @return mixed
   */
  public function put($message)
  {
    return $this->redis_server->lPush($this->key, $message);
  }
  /**
   * 向队列中插入一串信息
   * @param $message
   * @return mixed
   */
  public function puts(){
    $params = func_get_args();
    $message_array = array_merge(array($this->key), $params);
    return call_user_func_array(array($this->redis_server, 'lPush'), $message_array);
  }
  /**
   * 从队列顶部获取一条记录
   * @return mixed
   */
  public function get()
  {
    return $this->redis_server->lPop($this->key);
  }
  /**
   * 选择数据库,可以用于区分不同队列
   * @param $database
   */
  public function select($database)
  {
    $this->redis_server->select($database);
  }
  /**
   * 获得队列状态,即目前队列中的消息数量
   * @return mixed
   */
  public function size()
  {
    return $this->redis_server->lSize($this->key);
  }
  /**
   * 获取某一位置的值,不会删除该位置的值
   * @param $pos
   * @return mixed
   */
  public function view($pos)
  {
    return $this->redis_server->lGet($this->key, $pos);
  }
  /**
   * 检查Redis扩展
   * @throws Exception
   */
  protected function check_environment()
  {
    if (!\extension_loaded('redis')) {
      throw new \Exception('Redis extension not loaded');
    }
  }
}

如果需要一次写入多个队列,可以使用如下调用方式:

<?php
$redis = new RedisMessageQueue();
$redis->puts(1, 2, 3, 4);
$redis->puts(5, 6, 7, 8, 9);

模仿HTTPSQS输出结果的封装如下,提供了写入位置和读取位置记录的功能:

<?php
/**
 * Created by PhpStorm.
 * User: huyanping
 * Date: 14-9-5
 * Time: 下午2:16
 *
 * 附加了队列状态信息的RedisMessageQueue
 */
namespace Zebra\MessageQueue;
class RedisMessageQueueStatus extends RedisMessageQueue {
  protected $record_status;
  protected $put_position;
  protected $get_position;
  public function __construct(
    $server_config = array('IP' => '127.0.0.1', 'PORT' => '6379'),
    $key = 'redis_message_queue',
    $p_connect = false,
    $record_status=true
  ){
    parent::__construct($server_config, $key, $p_connect);
    $this->record_status = $record_status;
    $this->put_position = $this->key . '_put_position';
    $this->get_position = $this->key . '_get_position';
  }
  public function get(){
    if($queue = parent::get()){
      $incr_result = $this->redis_server->incr($this->get_position);
      if(!$incr_result) throw new \Exception('can not mark get position,please check the redis server');
      return $queue;
    }else{
      return false;
    }
  }
  public function put($message){
    if(parent::put($message)){
      $incr_result = $this->redis_server->incr($this->put_position);
      if(!$incr_result) throw new \Exception('can not mark put position,please check the redis server');
      return true;
    }else{
      return false;
    }
  }
  public function puts_status(){
    $message_array = func_get_args();
    $result = call_user_func_array(array($this, 'puts'), $message_array);
    if($result){
      $this->redis_server->incrBy($this->put_position, count($message_array));
      return true;
    }
    return false;
  }
  public function size(){
    return $this->redis_server->lSize($this->key);
  }
  public function status(){
    $status['put_position'] = ($put_position = $this->redis_server->get($this->put_position)) ? $put_position : 0;
    $status['get_position'] = ($get_position = $this->redis_server->get($this->get_position)) ? $get_position : 0;
    $status['unread_queue'] = $this->size();
    $status['queue_name'] = $this->key;
    $status['server'] = $this->server;
    $status['port'] = $this->port;
    return $status;
  }
  public function status_normal(){
    $status = $this->status();
    $message = 'Redis Message Queue' . PHP_EOL;
    $message .= '-------------------' . PHP_EOL;
    $message .= 'Message queue name:' . $status['queue_name'] . PHP_EOL;
    $message .= 'Put position of queue:' . $status['put_position'] . PHP_EOL;
    $message .= 'Get position of queue:' . $status['get_position'] . PHP_EOL;
    $message .= 'Number of unread queue:' . $status['unread_queue'] . PHP_EOL;
    return $message;
  }
  public function status_json(){
    return \json_encode($this->status());
  }
}

更多关于PHP相关内容感兴趣的读者可查看本站专题:《php+redis数据库程序设计技巧总结》、《php面向对象程序设计入门教程》、《PHP基本语法入门教程》、《PHP数组(Array)操作技巧大全》、《php字符串(string)用法总结》、《php+mysql数据库操作入门教程》及《php常见数据库操作技巧汇总》

希望本文所述对大家PHP程序设计有所帮助。

(0)

相关推荐

  • phpredis提高消息队列的实时性方法(推荐)

    数据库存贮都用list形式 要存2个队列 1个用作消息队列保存到数据 还有个 就是用来实时读取数据在redis $redis->lpush($queenkey, json_encode($array)); $redis->lpush($listkey, json_encode($array)); /*消息队列实例*/ public function insertinfo() { $infos = array('info1' => mt_rand(10,100), 'info2' =>

  • PHP+Redis 消息队列 实现高并发下注册人数统计的实例

    前言 现在越来越多的网站开始注重统计和用户行为分析,作为网站经常使用的功能,如何让统计性能更加高,这也是我们需要考虑的事情.本篇通过Redis来优化统计功能(以注册人数统计为例). 传统的统计功能都是直接操作数据库把数据插入表中.这样做,对数据库的性能消耗就会比较大. 思路: 这里我们用到了redis的队列,注册的时候先添加到队列,然后在处理的时候出队,并且把人数添加redis里. 代码: <?php //register.php $redis = new Redis(); $redis->c

  • php中使用redis队列操作实例代码

    例1,入队操作: 复制代码 代码如下: <?php$redis = new Redis();$redis->connect('127.0.0.1',6379);while(True){  try{    $value = 'value_'.date('Y-m-d H:i:s');    $redis->LPUSH('key1',$value);    sleep(rand()%3);    echo $value."\n";  }catch(Exception $e)

  • PHP实现电商订单自动确认收货redis队列

    一.场景 之前做的电商平台,用户在收到货之后,大部分都不会主动的点击确认收货,导致给商家结款的时候,商家各种投诉,于是就根据需求,要做一个订单在发货之后的x天自动确认收货.所谓的订单自动确认收货,就是在在特定的时间,执行一条update语句,改变订单的状态. 二.思路 最笨重的做法,通过linux后台定时任务,查询符合条件的订单,然后update.最理想情况下,如果每分钟都有需要update的订单,这种方式也还行.奈何平台太小,以及卖家发货时间大部分也是密集的,不会分散在24小时的每分钟.那么,

  • php+redis实现消息队列功能示例

    本文实例讲述了php+redis实现消息队列功能.分享给大家供大家参考,具体如下: 个人理解在项目中使用消息队列一般是有如下几个原因: 把瞬间服务器的请求处理换成异步处理,缓解服务器的压力 实现数据顺序排列获取 redis实现消息队列步骤如下: 1).redis函数rpush,lpop 2).建议定时任务入队列 3)创建定时任务出队列 文件:demo.php插入数据到redis队列 <?php $redis = new Redis(); $redis->connect('127.0.0.1',

  • PHP使用php-resque库配合Redis实现MQ消息队列的教程

    消息队列处理后台任务带来的问题 项目中经常会有后台运行任务的需求,比如发送邮件时,因为要连接邮件服务器,往往需要5-10秒甚至更长时间,如果能先给用户一个成功的提示信息,然后在后台慢慢处理发送邮件的操作,显然会有更好的用户体验. 为了实现类似的需求,Web项目中一般的实现方法是使用消息队列(Message Queue),比如MemcacheQ,RabbitMQ等等,都是很著名的产品. 消息队列说白了就是一个最简单的先进先出队列,队列的一个成员就是一段文本.正是因为消息队列实在太简单了,当拿着消息

  • PHP基于Redis消息队列实现发布微博的方法

    本文实例讲述了PHP基于Redis消息队列实现发布微博的方法.分享给大家供大家参考,具体如下: phpRedisAdmin :github地址  图形化管理界面 git clone [url]https://github.com/ErikDubbelboer/phpRedisAdmin.git[/url] cd phpRedisAdmin git clone [url]https://github.com/nrk/predis.git[/url] vendor 首先安装上述的Redis图形化管理

  • php基于Redis消息队列实现的消息推送的方法

    基本知识点 重点用到了以下命令实现我们的消息推送 brpop 阻塞模式 从队列右边获取值之后删除 brpoplpush 从队列A的右边取值之后删除,从左侧放置到队列B中 逻辑分析 在普通的任务脚本中写入push_queue队列要发送消息的目标,并为目标设置一个要推送的内容,永不过期 RedisPushQueue中brpoplpush处理,处理后的值放到temp_queue,主要防止程序崩溃造成推送失败 RedisAutoDeleteTempqueueItems处理temp_queue,这里用到了

  • redis 队列操作的例子(php)

    入队操作 复制代码 代码如下: <?php $redis = new Redis(); $redis->connect('127.0.0.1',6379); while(True){ try{ $value = 'value_'.date('Y-m-d H:i:s'); $redis->LPUSH('key1',$value); sleep(rand()%3); echo $value."\n"; }catch(Exception $e){ echo $e->g

  • PHP使用redis消息队列发布微博的方法示例

    本文实例讲述了PHP使用redis消息队列发布微博的方法.分享给大家供大家参考,具体如下: 在一些用户发布内容应用中,可能出现1秒上万个用户同时发布消息的情况,此时使用mysql可能会出现" too many connections"错误,当然把Mysql的max_connections参数设置为更大数,不过这是一个治标不治本的方法.而使用redis的消息队列,把用户发布的消息暂时存储在消息队列中,然后使用多个cron程序把消息队列中的数据插入到Mysql.这样就有效的降低了Mysql

  • php+redis消息队列实现抢购功能

    本文实例为大家分享了php+redis消息队列实现抢购的具体代码,供大家参考,具体内容如下 实现功能: 1. 基于redis队列,防止高并发的超卖 2. 基于mysql的事务加排它锁,防止高并发的超卖 基于redis队列工作流程: 1. 管理员根据goods表中的库存,创建redis商品库存队列 2. 客户端访问秒杀API 3. web服务器先从redis的商品库存队列中查询剩余库存重点内容 4. redis队列中有剩余,则在mysql中创建订单,去库存,抢购成功 5. redis队列中没有剩余

  • 详解thinkphp+redis+队列的实现代码

    1,安装Redis,根据自己的PHP版本安装对应的redis扩展(此步骤简单的描述一下) 1.1,安装 php_igbinary.dll,php_redis.dll扩展此处需要注意你的php版本如图: 1.2,php.ini文件新增 extension=php_igbinary.dll;extension=php_redis.dll两处扩展 ok此处已经完成第一步redis环境搭建完成看看phpinfo 项目中实际使用redis 2.1,第一步配置redis参数如下,redis安装的默认端口为6

随机推荐