php的memcache类分享(memcache队列)

memcacheQueue.class.php


代码如下:

<?php
/**
 * PHP memcache 队列类
 * @author LKK/lianq.net
 * @version 0.3
 * @修改说明:
 * 1.放弃了之前的AB面轮值思路,使用类似数组的构造,重写了此类.
 * 2.队列默认先进先出,但增加了反向读取功能.
 * 3.感谢网友FoxHunter提出的宝贵意见.
 * @example:
 * $obj = new memcacheQueue('duilie');
 * $obj->add('1asdf');
 * $obj->getQueueLength();
 * $obj->read(10);
 * $obj->get(8);
 */
class memcacheQueue{
 public static $client;   //memcache客户端连接
 public   $access;   //队列是否可更新
 private   $expire;   //过期时间,秒,1~2592000,即30天内
 private   $sleepTime;   //等待解锁时间,微秒
 private   $queueName;   //队列名称,唯一值
 private   $retryNum;   //重试次数,= 10 * 理论并发数
 public   $currentHead;  //当前队首值
 public   $currentTail;  //当前队尾值

const MAXNUM  = 20000;    //最大队列数,建议上限10K
 const HEAD_KEY = '_lkkQueueHead_';  //队列首kye
 const TAIL_KEY = '_lkkQueueTail_';  //队列尾key
 const VALU_KEY = '_lkkQueueValu_';  //队列值key
 const LOCK_KEY = '_lkkQueueLock_';  //队列锁key

/**
     * 构造函数
     * @param string $queueName  队列名称
     * @param int $expire 过期时间
     * @param array $config  memcache配置
     *
     * @return <type>
     */
 public function __construct($queueName ='',$expire=0,$config =''){
  if(empty($config)){
   self::$client = memcache_pconnect('127.0.0.1',11211);
  }elseif(is_array($config)){//array('host'=>'127.0.0.1','port'=>'11211')
   self::$client = memcache_pconnect($config['host'],$config['port']);
  }elseif(is_string($config)){//"127.0.0.1:11211"
   $tmp = explode(':',$config);
   $conf['host'] = isset($tmp[0]) ? $tmp[0] : '127.0.0.1';
   $conf['port'] = isset($tmp[1]) ? $tmp[1] : '11211';
   self::$client = memcache_pconnect($conf['host'],$conf['port']);
  }
  if(!self::$client) return false;

ignore_user_abort(true);//当客户断开连接,允许继续执行
  set_time_limit(0);//取消脚本执行延时上限

$this->access = false;
  $this->sleepTime = 1000;
  $expire = empty($expire) ? 3600 : intval($expire)+1;
  $this->expire = $expire;
  $this->queueName = $queueName;
  $this->retryNum = 1000;

$this->head_key = $this->queueName . self::HEAD_KEY;
  $this->tail_key = $this->queueName . self::TAIL_KEY;
  $this->lock_key = $this->queueName . self::LOCK_KEY;

$this->_initSetHeadNTail();
 }

/**
     * 初始化设置队列首尾值
     */
 private function _initSetHeadNTail(){
  //当前队列首的数值
  $this->currentHead = memcache_get(self::$client, $this->head_key);
  if($this->currentHead === false) $this->currentHead =0;

//当前队列尾的数值
  $this->currentTail = memcache_get(self::$client, $this->tail_key);
  if($this->currentTail === false) $this->currentTail =0;
 }

/**
     * 当取出元素时,改变队列首的数值
  * @param int $step 步长值
     */
 private function _changeHead($step=1){
  $this->currentHead += $step;
  memcache_set(self::$client, $this->head_key,$this->currentHead,false,$this->expire);
 }

/**
     * 当添加元素时,改变队列尾的数值
  * @param int $step 步长值
     * @param bool $reverse 是否反向
     * @return null
     */
 private function _changeTail($step=1, $reverse =false){
  if(!$reverse){
   $this->currentTail += $step;
  }else{
   $this->currentTail -= $step;
  }

memcache_set(self::$client, $this->tail_key,$this->currentTail,false,$this->expire);
 }

/**
     * 队列是否为空
     * @return bool
     */
 private function _isEmpty(){
  return (bool)($this->currentHead === $this->currentTail);
 }

/**
     * 队列是否已满
     * @return bool
     */
 private function _isFull(){
  $len = $this->currentTail - $this->currentHead;
  return (bool)($len === self::MAXNUM);
 }

/**
     * 队列加锁
     */
 private function _getLock(){
  if($this->access === false){
   while(!memcache_add(self::$client, $this->lock_key, 1, false, $this->expire) ){
    usleep($this->sleepTime);
    @$i++;
    if($i > $this->retryNum){//尝试等待N次
     return false;
     break;
    }
   }

$this->_initSetHeadNTail();
   return $this->access = true;
  }

return $this->access;
 }

/**
     * 队列解锁
     */
 private function _unLock(){
  memcache_delete(self::$client, $this->lock_key, 0);
  $this->access = false;
 }

/**
     * 获取当前队列的长度
     * 该长度为理论长度,某些元素由于过期失效而丢失,真实长度<=该长度
     * @return int
     */
 public function getQueueLength(){
  $this->_initSetHeadNTail();
  return intval($this->currentTail - $this->currentHead);
 }

/**
     * 添加队列数据
     * @param void $data 要添加的数据
     * @return bool
     */
 public function add($data){
  if(!$this->_getLock()) return false;

if($this->_isFull()){
   $this->_unLock();
   return false;
  }

$value_key = $this->queueName . self::VALU_KEY . strval($this->currentTail +1);
  $result = memcache_set(self::$client, $value_key, $data, MEMCACHE_COMPRESSED, $this->expire);
  if($result){
   $this->_changeTail();
  }

$this->_unLock();
  return $result;
 }

/**
     * 读取队列数据
     * @param int $length 要读取的长度(反向读取使用负数)
     * @return array
     */
 public function read($length=0){
  if(!is_numeric($length)) return false;
  $this->_initSetHeadNTail();

if($this->_isEmpty()){
   return false;
  }

if(empty($length)) $length = self::MAXNUM;//默认所有
  $keyArr = array();
  if($length >0){//正向读取(从队列首向队列尾)
   $tmpMin = $this->currentHead;
   $tmpMax = $tmpMin + $length;
   for($i= $tmpMin; $i<=$tmpMax; $i++){
    $keyArr[] = $this->queueName . self::VALU_KEY . $i;
   }
  }else{//反向读取(从队列尾向队列首)
   $tmpMax = $this->currentTail;
   $tmpMin = $tmpMax + $length;
   for($i= $tmpMax; $i >$tmpMin; $i--){
    $keyArr[] = $this->queueName . self::VALU_KEY . $i;
   }
  }

$result = @memcache_get(self::$client, $keyArr);

return $result;
 }

/**
     * 取出队列数据
     * @param int $length 要取出的长度(反向读取使用负数)
     * @return array
     */
 public function get($length=0){
  if(!is_numeric($length)) return false;
  if(!$this->_getLock()) return false;

if($this->_isEmpty()){
   $this->_unLock();
   return false;
  }

if(empty($length)) $length = self::MAXNUM;//默认所有
  $length = intval($length);
  $keyArr = array();
  if($length >0){//正向读取(从队列首向队列尾)
   $tmpMin = $this->currentHead;
   $tmpMax = $tmpMin + $length;
   for($i= $tmpMin; $i<=$tmpMax; $i++){
    $keyArr[] = $this->queueName . self::VALU_KEY . $i;
   }
   $this->_changeHead($length);
  }else{//反向读取(从队列尾向队列首)
   $tmpMax = $this->currentTail;
   $tmpMin = $tmpMax + $length;
   for($i= $tmpMax; $i >$tmpMin; $i--){
    $keyArr[] = $this->queueName . self::VALU_KEY . $i;
   }
   $this->_changeTail(abs($length), true);
  }
  $result = @memcache_get(self::$client, $keyArr);

foreach($keyArr as $v){//取出之后删除
   @memcache_delete(self::$client, $v, 0);
  }

$this->_unLock();

return $result;
 }

/**
     * 清空队列
     */
 public function clear(){
  if(!$this->_getLock()) return false;

if($this->_isEmpty()){
   $this->_unLock();
   return false;
  }

$tmpMin = $this->currentHead--;
  $tmpMax = $this->currentTail++;

for($i= $tmpMin; $i<=$tmpMax; $i++){
   $tmpKey = $this->queueName . self::VALU_KEY . $i;
   @memcache_delete(self::$client, $tmpKey, 0);
  }

$this->currentTail = $this->currentHead = 0;
  memcache_set(self::$client, $this->head_key,$this->currentHead,false,$this->expire);
  memcache_set(self::$client, $this->tail_key,$this->currentTail,false,$this->expire);

$this->_unLock();
 }

/*
  * 清除所有memcache缓存数据
  */
 public function memFlush(){
  memcache_flush(self::$client);
 }

}//end class

(0)

相关推荐

  • 队列在编程中的实际应用(php)

    一:队列的概念.数据结构 队列(Queue)是运算受到限制的一种线性表.只允许在表的一端进行插入,而在另一端进行删除元素的线性表.队尾(rear)是允许插入的一端.队头(front)是允许删除的一端.空队列是不含元素的空表. 假设有个队列Q=(a1,a2,-,an),则a1为队头元素,an为队尾元素.元素入队的次序为a1,a2,-,an,而出队的次序为a1,a2,-,an.可见队列的操作是按照先进先出的原则进行的. 其他详细的介绍请在网上搜索很多资料. 二:PHP的队列 在PHP中队列以数组的形

  • PHP下操作Linux消息队列完成进程间通信的方法

    关于Linux系统进程通信的概念及实现可查看:http://www.ibm.com/developerworks/cn/linux/l-ipc/ 关于Linux系统消息队列的概念及实现可查看:http://www.ibm.com/developerworks/cn/linux/l-ipc/part4/ PHP的sysvmsg模块是对Linux系统支持的System V IPC中的System V消息队列函数族的封装.我们需要利用sysvmsg模块提供的函数来进进程间通信.先来看一段示例代码_1:

  • php Memcache 中实现消息队列

    对于一个很大的消息队列,频繁进行进行大数据库的序列化 和 反序列化,有太耗费.下面是我用PHP 实现的一个消息队列,只需要在尾部插入一个数据,就操作尾部,不用操作整个消息队列进行读取,与操作.但是,这个消息队列不是线程安全的,我只是尽量的避免了冲突的可能性.如果消息不是非常的密集,比如几秒钟才一个,还是可以考虑这样使用的. 如果你要实现线程安全的,一个建议是通过文件进行锁定,然后进行操作.下面是代码: 复制代码 代码如下: class Memcache_Queue { private $memc

  • PHP队列用法实例

    本文实例讲述了PHP队列用法.分享给大家供大家参考.具体分析如下: 什么是队列,是先进先出的线性表,在具体应用中通常用链表或者数组来实现,队列只允许在后端进行插入操作,在前端进行删除操作. 什么情况下会用了队列呢,并发请求又要保证事务的完整性的时候就会用到队列,当然不排除使用其它更好的方法,知道的不仿说说看. 队列还可以用于减轻数据库服务器压力,我们可以将不是即时数据放入到队列中,在数据库空闲的时候或者间隔一段时间后执行.比如访问计数器,没有必要即时的执行访问增加的Sql,在没有使用队列的时候s

  • php实现的双向队列类实例

    本文实例讲述了php实现的双向队列类及其用法,对于PHP数据结构与算法的学习有不错的参考价值.分享给大家供大家参考.具体分析如下: (deque,全名double-ended queue)是一种具有队列和栈的性质的数据结构.双向队列中的元素可以从两端弹出,其限定插入和删除操作在表的两端进行. 在实际使用中,还可以有输出受限的双向队列(即一个端点允许插入和删除,另一个端点只允许插入的双向队列)和输入受限的双向队列(即一个端点允许插入和删除,另一个端点只允许删除的双向队列).而如果限定双向队列从某个

  • PHP+memcache实现消息队列案例分享

    memche消息队列的原理就是在key上做文章,用以做一个连续的数字加上前缀记录序列化以后消息或者日志.然后通过定时程序将内容落地到文件或者数据库. php实现消息队列的用处比如在做发送邮件时发送大量邮件很费时间的问题,那么可以采取队列.方便实现队列的轻量级队列服务器是:starling支持memcache协议的轻量级持久化服务器https://github.com/starling/starlingBeanstalkd轻量.高效,支持持久化,每秒可处理3000左右的队列http://kr.gi

  • PHP实现的memcache环形队列类实例

    本文实例讲述了PHP实现的memcache环形队列类.分享给大家供大家参考.具体如下: 这里介绍了PHP实现的memcache环形队列类.没咋学过数据结构,因为业务需要,所以只是硬着头皮模拟的! 参考PHP memcache 队列代码.为使队列随时可入可出,且不受int长度越界危险(单链采取Head自增的话不作处理有越界可能),所以索性改写成环形队列.可能还有BUG,忘见谅! <?php /** * PHP memcache 环形队列类 * 原作者 LKK/lianq.net * 修改 FoxH

  • php中使用redis队列操作实例代码

    例1,入队操作: 复制代码 代码如下: <?php$redis = new Redis();$redis->connect('127.0.0.1',6379);while(True){  try{    $value = 'value_'.date('Y-m-d H:i:s');    $redis->LPUSH('key1',$value);    sleep(rand()%3);    echo $value."\n";  }catch(Exception $e)

  • PHP消息队列用法实例分析

    本文实例讲述了PHP消息队列用法.分享给大家供大家参考,具体如下: 该消息队列用于linux下,进程通信 #根据路径和后缀创建一个id $key = ftok(__DIR__, 'R'); #获取队列中的消息 $q = msg_get_queue($key); #删除队列 msg_remove_queue($q); #获取队列的状态信息 $status = msg_stat_queue($q); print_r($status); echo "\n"; for($i=0;$i<1

  • php的memcache类分享(memcache队列)

    memcacheQueue.class.php 复制代码 代码如下: <?php/** * PHP memcache 队列类 * @author LKK/lianq.net * @version 0.3 * @修改说明: * 1.放弃了之前的AB面轮值思路,使用类似数组的构造,重写了此类. * 2.队列默认先进先出,但增加了反向读取功能. * 3.感谢网友FoxHunter提出的宝贵意见. * @example: * $obj = new memcacheQueue('duilie'); * $

  • winform开发使用通用多线程基类分享(以队列形式)

    复制代码 代码如下: /// <summary>    /// 队列多线程,T 代表处理的单个类型~    /// </summary>    /// <typeparam name="T"></typeparam>    public abstract class QueueThreadBase<T>    {        #region 变量&属性        /// <summary>      

  • C#通过Semaphore类控制线程队列的方法

    本文实例讲述了C#通过Semaphore类控制线程队列的方法.分享给大家供大家参考.具体实现方法如下: using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.IO; using System.Diagnostics; using System.Threading; using System.ComponentModel; using System.Col

  • C++利用链表模板类实现简易队列

    本文实例为大家分享了C++利用链表模板类实现一个队列的具体代码,供大家参考,具体内容如下 设计思想:MyQueue.h中对模板类进行声明和实现.首先定义结点的结构体,包含数据和指针域两部分.队列类定义中声明和实现了元素入队,出队,打印队首元素和队列等方法. 注意: 1)模板类的声明和定义不能分开(即不能分别放在.h和.cpp文件里). 2)声明新节点时,如果声明的节点是辅助操作的,可以不用new关键字,例如在析构函数中,直接用:Node<T>* temp:定义即可.如果声明一个新节点加入队列,

  • Java_int、double型数组常用操作工具类(分享)

    学了数组之后,感觉有好多操作需要经常去写,很不方便,因此自己做了一个工具类,方便调用,方法可能不全,希望大家可以添加,让我使用也方便一点儿. public class ArrayUtils { //求数组的最大值(int) public static int getMax(int[] arr){ int max = arr[0]; for(int i = 0;i<arr.length;i++){ if(max<arr[i]){ max = arr[i]; } } return max; } /

  • php备份数据库类分享

    php备份数据库类分享 <?php /** * * @name php备份数据库 * @param string $DbHost 连接主机 * @param string $DbUser 用户名 * @param string $DbPwd 连接密码 * @param string $DbName 要备份的数据库 * @param string $saveFileName 要保存的文件名, 默认文件保存在当前文件夹中,以日期作区分 * @return Null * @example backup

  • 史上最全Java8日期时间工具类(分享)

    这是我总结的Java8日期工具类,应该是比较全面的,满足日常开发绝大部分需求,分享给大家,有错误之处,还望大神指教. /** * Java8日期时间工具类 * * @author JourWon * @date 2020/12/13 */ public class LocalDateUtils { /** * 显示年月日时分秒,例如 2015-08-11 09:51:53. */ public static final String DATETIME_PATTERN = "yyyy-MM-dd

  • java并发编程工具类PriorityBlockingQueue优先级队列

    目录 前言 1.PriorityBlockingQueue特性 2.PriorityBlockingQueue应用实例 3.使用Java8Comparator做优先级排序的实例 前言 在之前的文章中已经为大家介绍了java并发编程的工具:BlockingQueue接口.ArrayBlockingQueue.DelayQueue.LinkedBlockingQueue,本文为系列文章第五篇. Java PriorityBlockingQueue队列是BlockingQueue接口的实现类,它根据p

  • nodejs读取memcache示例分享

    复制代码 代码如下: var memcache = require('memcache')    , http = require('http')    , url = require('url')    , qs = require('querystring')    , memsettings = { port: 2000, host: '10.6.0.6' }    , httpsettings = { port: 3000 }    , cacheObject = {}    , htt

随机推荐