基于python实现操作redis及消息队列

操作 redis

import redis
redisPool = redis.ConnectionPool(host='192.168.100.50', port=6379, db=8)
redis= redis.Redis(connection_pool=redisPool)

redis.set('key','values')
redis.get('com')
redis.append('keys','values')
redis.delete('keys')

print(redis.getset('name','Mike')) #赋值name为Mike并返回上一次的value
print(redis.mget(['name','age']))  #输出name键和age键的value
print(redis.setnx('newname','james')) #如果键值不存在,则赋值
print(redis.mset({'name1':'smith','name2':'curry'})) #批量赋值
print(redis.msetnx({'name3':'ltf','name4':'lsq'}))  #不存在才批量赋值
print(redis.incr('age',1))  #age对应的value 加1
print(redis.decr('age',5))  #age对应的value 减5
print(redis.append('name4','is a sb'))  #在name4的value后追加 is a sb 返回字符串长度
print(redis.substr('name',1,4))  #截取键 name

print(redis.sadd('tags','Book','Tea','Coffee')) #返回集合长度 3
print(redis.srem('tags','Book')) #返回删除的数据个数
print(redis.spop('tags'))  #随机删除并返回该元素
print(redis.smove('tags','tags1','Coffee'))
print(redis.scard('tags')) # 获取tags集合的元素个数
print(redis.sismember('tags', 'Book')) # 判断Book是否在tags的集合中
print(redis.sinter('tags', 'tags1')) # 返回集合tags和集合tags1的交集
print(redis.sunion('tags', 'tags1')) # 返回集合tags和集合tags1的并集
print(redis.sdiff('tags', 'tags1')) # 返回集合tags和集合tags1的差集
print(redis.smembers('tags')) # 返回集合tags的所有元素

print(redis.hset('price','cake',5)) # 向键名为price的散列表添加映射关系,返回1 即添加的映射个数
print(redis.hsetnx('price','book',6)) # 向键名为price的散列表添加映射关系,返回1 即添加的映射个数
print(redis.hget('price', 'cake')) # 获取键名为cake的值 返回5
print(redis.hmset('price',{'banana':2,'apple':3,'pear':6,'orange':7}))  #批量添加映射
print(redis.hmget('price', ['apple', 'orange'])) # 查询apple和orange的值 输出 b'3',b'7'
print(redis.hincrby('price','apple',3))  #apple映射加3 为6
print(redis.hexists('price', 'banana')) # 在price中banana是否存在 返回True
print(redis.hdel('price','banana'))  #从price中删除banana 返回1
print(redis.hlen('price')) # 输出price的长度
print(redis.hkeys('price')) # 输出所有的映射键名
print(redis.hvals('price')) # 输出所有的映射键值
print(redis.hgetall('price')) # 输出所有的映射键对

print(redis.rpush('list',1,2,3)) #向键名为list的列表尾部添加1,2,3 返回长度
print(redis.lpush('list',0))  #向键名为list的列表头部添加0 返回长度
print(redis.llen('list'))  #返回列表的长度
print(redis.lrange('list',1,3)) #返回起始索引为1 终止索引为3的索引范围对应的列表
print(redis.lindex('list',1))  #返回索引为1的元素-value
print(redis.lset('list',1,5)) #将list的列表索引为1的重新赋值为5
print(redis.lpop('list')) #删除list第一个元素
print(redis.rpop('list'))  #删除list最后一个元素
print(redis.blpop('list'))  #删除list第一个元素
print(redis.brpop('list'))  #删除最后一个元素
print(redis.rpoplpush('list','list1'))  #删除list的尾元素并将其添加到list1的头部

消息队列使用例子

import redis
import json
redisPool = redis.ConnectionPool(host='192.168.100.50', port=6379, db=8)
client = redis.Redis(connection_pool=redisPool)

# 顺序插入五条数据到redis队列,sort参数是用来验证弹出的顺序
while True:
  num = 0
  for i in range(0, 100):
    num = num + 1
    # params info
    params_dict = {"name": f"test {num}", "sort":num}

    client.rpush("test", json.dumps(params_dict))

  # 查看目标队列数据
  result = client.lrange("test", 0, 100)
  print(result)
  import time
  time.sleep(10)
import redis
import time
import multiprocessing
import time
import os
import random

redisPool = redis.ConnectionPool(host='192.168.100.50', port=6379, db=8)
client = redis.Redis(connection_pool=redisPool)

def test1(msg):
  t_start = time.time()
  print("%s开始执行,进程号为%d" % (msg, os.getpid()))
  time.sleep(random.random() * 2)
  t_stop = time.time()
  print("%s执行完成,耗时%.2f" % (msg, t_stop - t_start))

while True:
  number = client.llen('test')
  print("现在的队列任务 条数是 ", number)
  p = 100
  if number > p-1:
    print("-----start-----")
    a = []
    for i in range(p):
      result = client.lpop("test")
      a.append(result)
    print("每10条读取一次", a)
    po = multiprocessing.Pool(p)
    for i in range(0, p):
      # Pool().apply_async(要调用的目标,(传递给目标的参数元祖,))
      # 每次循环将会用空闲出来的子进程去调用目标
      po.apply_async(test1, (a[i],))
    po.close() # 关闭进程池,关闭后po不再接收新的请求
    po.join() # 等待po中所有子进程执行完成,必须放在close语句之后
    print("-----end-----")
    time.sleep(2)
  elif number < p and number > 0:
    print("-----start-----")
    a = []
    for i in range(number):
      a = []
      result = client.lpop("test")
      a.append(result)
    print("小于10条的 读取一次 ", a)
    po = multiprocessing.Pool(number)
    for i in a:
      # Pool().apply_async(要调用的目标,(传递给目标的参数元祖,))
      # 每次循环将会用空闲出来的子进程去调用目标
      po.apply_async(test1, (a,))

    po.close() # 关闭进程池,关闭后po不再接收新的请求
    po.join() # 等待po中所有子进程执行完成,必须放在close语句之后
    print("-----end-----")
    time.sleep(2)
  elif number == 0:
    print("没有任务需要处理")
    time.sleep(2)
  else:
    time.sleep(2)

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持我们。

(0)

相关推荐

  • Python中线程的MQ消息队列实现以及消息队列的优点解析

    "消息队列"是在消息的传输过程中保存消息的容器.消息队列管理器在将消息从它的源中继到它的目标时充当中间人.队列的主要目的是提供路由并保证消息的传递:如果发送消息时接收者不可用,消息队列会保留消息,直到可以成功地传递它.相信对任何架构或应用来说,消息队列都是一个至关重要的组件,下面是十个理由: Python的消息队列示例: 1.threading+Queue实现线程队列 #!/usr/bin/env python import Queue import threading import

  • 利用Python学习RabbitMQ消息队列

    RabbitMQ可以当做一个消息代理,它的核心原理非常简单:即接收和发送消息,可以把它想象成一个邮局:我们把信件放入邮箱,邮递员就会把信件投递到你的收件人处,RabbitMQ就是一个邮箱.邮局.投递员功能综合体,整个过程就是:邮箱接收信件,邮局转发信件,投递员投递信件到达收件人处. RabbitMQ和邮局的主要区别就是RabbitMQ接收.存储和发送的是二进制数据----消息. rabbitmq基本管理命令: 一步启动Erlang node和Rabbit应用:sudo rabbitmq-serv

  • Python高级编程之消息队列(Queue)与进程池(Pool)实例详解

    本文实例讲述了Python高级编程之消息队列(Queue)与进程池(Pool).分享给大家供大家参考,具体如下: Queue消息队列 1.创建 import multiprocessing queue = multiprocessing.Queue(队列长度) 2.方法 方法 描述 put 变量名.put(数据),放入数据(如队列已满,则程序进入阻塞状态,等待队列取出后再放入) put_nowait 变量名.put_nowati(数据),放入数据(如队列已满,则不等待队列信息取出后再放入,直接报

  • Python的消息队列包SnakeMQ使用初探

    一.关于snakemq的官方介绍 SnakeMQ的GitHub项目页:https://github.com/dsiroky/snakemq 1.纯python实现,跨平台 2.自动重连接 3.可靠发送--可配置的消息方式与消息超时方式 4.持久化/临时 两种队列 5.支持异步 -- poll() 6.symmetrical -- 单个TCP连接可用于双工通讯 7.多数据库支持 -- SQLite.MongoDB-- 8.brokerless - 类似ZeroMQ的实现原理 9.扩展模块:RPC,

  • python实现RabbitMQ的消息队列的示例代码

    最近在研究redis做消息队列时,顺便看了一下RabbitMQ做消息队列的实现.以下是总结的RabbitMQ中三种exchange模式的实现,分别是fanout, direct和topic. base.py: import pika # 获取认证对象,参数是用户名.密码.远程连接时需要认证 credentials = pika.PlainCredentials("admin", "admin") # BlockingConnection(): 实例化连接对象 # C

  • 详解Python操作RabbitMQ服务器消息队列的远程结果返回

    先说一下笔者这里的测试环境:Ubuntu14.04 + Python 2.7.4 RabbitMQ服务器 sudo apt-get install rabbitmq-server Python使用RabbitMQ需要Pika库 sudo pip install pika 远程结果返回 消息发送端发送消息出去后没有结果返回.如果只是单纯发送消息,当然没有问题了,但是在实际中,常常会需要接收端将收到的消息进行处理之后,返回给发送端. 处理方法描述:发送端在发送信息前,产生一个接收消息的临时队列,该队

  • Python进程间通信Queue消息队列用法分析

    本文实例讲述了Python进程间通信Queue消息队列用法.分享给大家供大家参考,具体如下: 进程间通信-Queue Process之间有时需要通信,操作系统提供了很多机制来实现进程间的通信. 1. Queue的使用 可以使用multiprocessing模块的Queue实现多进程之间的数据传递,Queue本身是一个消息列队程序,首先用一个小实例来演示下Queue的工作原理: 代码如下: #coding=utf-8 from multiprocessing import Queue #初始化一个

  • Python RabbitMQ消息队列实现rpc

    上个项目中用到了ActiveMQ,只是简单应用,安装完成后直接是用就可以了.由于新项目中一些硬件的限制,需要把消息队列换成RabbitMQ. RabbitMQ中的几种模式和机制比ActiveMQ多多了,根据业务需要,使用RPC实现功能,其中踩过的一些坑,有必要记录一下了. 上代码,目录结构分为 c_server.c_client.c_hanlder: c_server: #!/usr/bin/env python # -*- coding:utf-8 -*- import pika import

  • 基于python实现操作redis及消息队列

    操作 redis import redis redisPool = redis.ConnectionPool(host='192.168.100.50', port=6379, db=8) redis= redis.Redis(connection_pool=redisPool) redis.set('key','values') redis.get('com') redis.append('keys','values') redis.delete('keys') print(redis.get

  • Java利用Redis实现消息队列的示例代码

    本文介绍了Java利用Redis实现消息队列的示例代码,分享给大家,具体如下: 应用场景 为什么要用redis? 二进制存储.java序列化传输.IO连接数高.连接频繁 一.序列化 这里编写了一个java序列化的工具,主要是将对象转化为byte数组,和根据byte数组反序列化成java对象; 主要是用到了ByteArrayOutputStream和ByteArrayInputStream; 注意:每个需要序列化的对象都要实现Serializable接口; 其代码如下: package Utils

  • php+redis实现消息队列功能示例

    本文实例讲述了php+redis实现消息队列功能.分享给大家供大家参考,具体如下: 个人理解在项目中使用消息队列一般是有如下几个原因: 把瞬间服务器的请求处理换成异步处理,缓解服务器的压力 实现数据顺序排列获取 redis实现消息队列步骤如下: 1).redis函数rpush,lpop 2).建议定时任务入队列 3)创建定时任务出队列 文件:demo.php插入数据到redis队列 <?php $redis = new Redis(); $redis->connect('127.0.0.1',

  • SpringBoot集成Redis实现消息队列的方法

    list 原理说明 Redis 的 list 是按照插入顺序排序的字符串链表. 如图所示,可以通过 lpush 和 rpop 或者 rpush 和 lpop 实现消息队列. 1 lpush 和 rpop 2 rpush 和 lpop 消息队列功能实现 引入 Redis 依赖 <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data

  • SpringBoot利用redis集成消息队列的方法

    一.pom文件依赖 <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-redis</artifactId> </dependency> 二.创建消息接收者 变量.方法及构造函数进行标注,完成自动装配的工作. 通过 @Autowired的使用来消除 set ,get方法. @Autowired pub

  • Java实现Redis延时消息队列

    目录 什么是延时任务 延时任务的特点 实现思路: 代码实现 1.消息模型 2.RedisMq 消息队列实现类 3.消息生产者 4.消息消费者 5. 消息执接口 6. 任务类型的实现类:可以根据自己的情况去实现对应的队列需求 什么是延时任务 延时任务,顾名思义,就是延迟一段时间后才执行的任务.举个例子,假设我们有个发布资讯的功能,运营需要在每天早上7点准时发布资讯,但是早上7点大家都还没上班,这个时候就可以使用延时任务来实现资讯的延时发布了.只要在前一天下班前指定第二天要发送资讯的时间,到了第二天

  • springboot整合redis之消息队列

    目录 一.项目准备 二.配置类 三.redis中list数据类型 定时器监听队列 运行即监控队列 四.发布/订阅模式 五.ZSet实现延迟队列 一.项目准备 依赖 <!-- RedisTemplate --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-redis</artifactId> &

  • go+redis实现消息队列发布与订阅的详细过程

    在做项目过程中,实现websocket得时候,不知道哪里写的不太合适,客户端消息收到一定程度,剩下的消息收不到,修改了缓冲区大小,还是没有解决问题,后面因为项目结束期比较紧张,没有时间调试消息的时候,改用了redis队列去做了消息得暂存,客户端轮询去拿对应的消息. 1.生产者随机发布消息,用rpush发布.2.消费者用lpop订阅消费,一旦没有消息,随机休眠.redis做消息队列的缺点:没有持久化.一旦消息没有人消费,积累到一定程度后就会丢失 package main import ( "fmt

  • 基于Java代码操作Redis过程详解

    Jedis简介 实际开发中,我们需要用Redis的连接工具连接Redis然后操作Redis, 对于主流语言,Redis都提供了对应的客户端: 提供了很多客户端 官方推荐的是Jedis 托管地址:https://github.com/xetorthio/jedis 要使用redis首先得下载pom依赖 <dependency> <groupId>redis.clients</groupId> <artifactId>jedis</artifactId&g

随机推荐