Python调用Redis的示例代码

#!/usr/bin/env python
# -*- coding:utf-8 -*-
# *************************************
# @Time  : 2019/8/12
# @Author : Zhang Fan
# @Desc  : Library
# @File  : MyRedis.py
# @Update : 2019/8/23
# *************************************
import redis

class MyRedis(object):
  """
  ===================================================================
  =====================    MyRedis    ========================
  ===================================================================
  """
  def __init__(self):
    self.redis_conn = None
    self.redis_db = None

  def connect_to_redis(self, redis_host, redis_port=6379, db=0, password=None):
    """
    连接到Redis服务器
    """
    self.redis_db = db
    print('Executing : Connect To Redis | host={0}, port={1}, db={2}, password={3}'
           .format(redis_host, redis_port, self.redis_db, password))
    try:
      self.redis_conn = redis.StrictRedis(
        host=redis_host, port=redis_port, db=self.redis_db, password=password)
    except Exception as ex:
      logger.error(str(ex))
      raise Exception(str(ex))

  def redis_key_should_be_exist(self, name):
    """
    验证redis存在指定键
    """
    if not self.redis_conn.exists(name):
      logger.error(("Redis of db%s doesn't exist in key [ %s ]." % (self.redis_db, name)))
      raise AssertionError

  def redis_key_should_not_be_exist(self, name):
    """
    验证redis不存在指定键
    """
    if self.redis_conn.exists(name):
      logger.error(("Redis of db%s exist in key [ %s ]." % (self.redis_db, name)))
      raise AssertionError

  def getkeys_from_redis_bypattern(self, pattern, field=None):
    """
    获取redis所有键值
    """
    keys_list = list()
    print('Executing : Getall Key | %s' % pattern)
    if field is None:
      return self.redis_conn.keys(pattern)
    else:
      keys = self.redis_conn.keys(pattern)
      for key in keys:
        if not self.redis_conn.hget(key, field) is None:
          keys_list.append(key)
      return keys_list

  # ========================== String Type =============================
  def get_from_redis(self, name):
    """
    获取redis数据
    """
    print('Executing : Get Key | %s' % name)
    return self.redis_conn.get(name)

  def del_from_redis(self, name):
    """
    删除redis中的任意数据类型
    """
    return self.redis_conn.delete(name)

  def set_to_redis(self, name, data, expire_time=0):
    """
    设置redis执行key的值
    """
    return self.redis_conn.set(name, data, expire_time)

  def append_to_redis(self, name, value):
    """
    添加数据到redis
    """
    return self.redis_conn.append(name, value)

    # ========================== Hash Type ==========================
  def hgetall_from_redis(self, name):
    """
    获取redis hash所有数据
    """
    print('Executing : Hgetall Key | %s' % name)
    return self.redis_conn.hgetall(name)

  def hget_from_redis(self, name, key):
    """
    获取redis hash指定key数据
    """
    print('Executing : Hget Key | %s' % name)
    return self.redis_conn.hget(name, key)

  def hset_to_redis(self, name, key, data):
    """
    设置redis指定key的值
    """
    print(('Executing : Hset Redis | name={0}, key={1}, data={2}'
           .format(name, key, data)))
    return self.redis_conn.hset(name, key, data)

  def hdel_to_redis(self, name, *keys):
    """
    删除redis指定key的值
    """
    print('Executing : Hdel Key | ', *keys)
    self.redis_conn.hdel(name, *keys)

  # ========================= ZSet Type ================================
  def get_from_redis_zscore(self, name, values):
    """
    获取name对应有序集合中 value 对应的分数
    """
    try:
      return int(self.redis_conn.zscore(name, values))
    except:
      return self.redis_conn.zscore(name, values)

  def get_from_redis_zrange(self, name, start=0, end=10):
    """
    按照索引范围获取name对应的有序集合的元素
    """
    return self.redis_conn.zrange(name, start, end, desc=False, withscores=True, score_cast_func=int)

  def del_from_redis_zrem(self, name, values):
    """
    删除name对应的有序集合中值是values的成员
    """
    return self.redis_conn.zrem(name, values)

  def add_from_redis_zadd(self, name, value, score):
    """
    在name对应的有序集合中添加一条。若值存在,则修改对应分数。
    """
    return self.redis_conn.zadd(name, {value: score})

  def count_from_redis_zcard(self, name):
    """
    获取name对应的有序集合元素的数量
    """
    return self.redis_conn.zcard(name)

if __name__ == '__main__':
  print('This is test.')
  mr = MyRedis()

以上就是Python调用Redis的示例代码的详细内容,更多关于Python调用Redis的资料请关注我们其它相关文章!

(0)

相关推荐

  • Python redis操作实例分析【连接、管道、发布和订阅等】

    本文实例讲述了Python redis操作.分享给大家供大家参考,具体如下: 一.redis redis是一个key-value存储系统.和Memcached类似,它支持存储的value类型相对更多,包括string(字符串).list(链表).set(集合).zset(sorted set --有序集合)和hash(哈希类型).这些数据类型都支持push/pop.add/remove及取交集并集和差集及更丰富的操作,而且这些操作都是原子性的.在此基础上,redis支持各种不同方式的排序.与me

  • Python安装并操作redis实现流程详解

    Redis redis是一个key-value存储系统.和Memcached类似,它支持存储的value类型相对更多,包括string(字符串).list(链表).set(集合).zset(sorted set --有序集合)和hash(哈希类型).这些数据类型都支持push/pop.add/remove及取交集并集和差集及更丰富的操作,而且这些操作都是原子性的.在此基础上,redis支持各种不同方式的排序.与memcached一样,为了保证效率,数据都是缓存在内存中.区别的是redis会周期性

  • Python操作redis实例小结【String、Hash、List、Set等】

    本文实例总结了Python操作redis方法.分享给大家供大家参考,具体如下: 这里介绍详细使用 1.String 操作 redis中的String在在内存中按照一个name对应一个value来存储 set() #在Redis中设置值,默认不存在则创建,存在则修改 r.set('name', 'zhangsan') '''参数: set(name, value, ex=None, px=None, nx=False, xx=False) ex,过期时间(秒) px,过期时间(毫秒) nx,如果设

  • Python获取Redis所有Key以及内容的方法

    一.获取所有Key # -*- encoding: UTF-8 -*- __author__ = "Sky" import redis pool=redis.ConnectionPool(host='127.0.0.1',port=6379,db=0) r = redis.StrictRedis(connection_pool=pool) keys = r.keys() print type(keys) print keys 运行结果: <type 'list'> ['fa

  • python redis连接 有序集合去重的代码

    python redis连接 有序集合去重的代码如下所述: # -*- coding: utf-8 -*- import redis from constant import redis_ip, redis_db, redis_pw, redis_zset_name pool = redis.ConnectionPool(host=redis_ip, db=redis_db, password=redis_pw) # pool = redis.ConnectionPool(db=6, passw

  • python利用跳板机ssh远程连接redis的方法

    公司服务器的mysql和redis连接都需要有跳板机,网上有很多python ssh远程连接mysql的,那天我研究了下,利用sshtunnel模块连接上了redis,具体如下: from sshtunnel import SSHTunnelForwarder # ssh连接库 import redis # redis模块 server = SSHTunnelForwarder( ssh_address_or_host= , # ssh地址 ssh_username= , # ssh连接的用户名

  • python 通过SSHTunnelForwarder隧道连接redis的方法

    背景:我司Redis服务器使用的亚马逊服务,本地需要通过跳板机,然后才有权限访问Redis服务. 连接原理:使用SSHTunnelForwarder模块,通过本地22端口ssh到跳板机,然后本地开启一个转发端口给跳板机远程Redis服务使用. 两种思路: 1.通过SSHTunnelForwarder,paramiko模块,先ssh到跳板机,然后在跳板机上(或者内部服务器上),获取到权限,然后远程Redis. 2.使用SSHTunnelForwarder模块,通过本地22端口ssh到跳板机,然后本

  • Python+Redis实现布隆过滤器

    布隆过滤器是什么 布隆过滤器(Bloom Filter)是1970年由布隆提出的.它实际上是一个很长的二进制向量和一系列随机映射函数.布隆过滤器可以用于检索一个元素是否在一个集合中.它的优点是空间效率和查询时间都比一般的算法要好的多,缺点是有一定的误识别率和删除困难. 布隆过滤器的基本思想 通过一种叫作散列表(又叫哈希表,Hash table)的数据结构.它可以通过一个Hash函数将一个元素映射成一个位阵列(Bit array)中的一个点.这样一来,我们只要看看这个点是不是1就可以知道集合中有没

  • Python定时从Mysql提取数据存入Redis的实现

    设计思路: 1.程序一旦run起来,python会把mysql中最近一段时间的数据全部提取出来 2.然后实例化redis类,将数据简单解析后逐条传入redis队列 3.定时器设计每天凌晨12点开始跑 ps:redis是个内存数据库,做后台消息队列的缓存时有很大的用处,有兴趣的小伙伴可以去查看相关的文档. # -*- coding:utf-8 -*- import MySQLdb import schedule import time import datetime import random i

  • python操作redis数据库的三种方法

    安装依赖 pip3 install redis 使用的三种方式 直接使用 import redis r = redis.Redis(host='127.0.0.1', port=6379, db=1, password=None, decode_responses=True) 连接池使用 import redis pool = redis.ConnectionPool(host='127.0.0.1', port=6379, db=1, max_connections=100, password

  • Python+redis通过限流保护高并发系统

    保护高并发系统的三大利器:缓存.降级和限流.那什么是限流呢?用我没读过太多书的话来讲,限流就是限制流量.我们都知道服务器的处理能力是有上限的,如果超过了上限继续放任请求进来的话,可能会发生不可控的后果.而通过限流,在请求数量超出阈值的时候就排队等待甚至拒绝服务,就可以使系统在扛不住过高并发的情况下做到有损服务而不是不服务. 举个例子,如各地都出现口罩紧缺的情况,广州政府为了缓解市民买不到口罩的状况,上线了预约服务,只有预约到的市民才能到指定的药店购买少量口罩.这就是生活中限流的情况,说这个也是希

  • Python的Flask框架应用调用Redis队列数据的方法

    任务异步化 打开浏览器,输入地址,按下回车,打开了页面.于是一个HTTP请求(request)就由客户端发送到服务器,服务器处理请求,返回响应(response)内容. 我们每天都在浏览网页,发送大大小小的请求给服务器.有时候,服务器接到了请求,会发现他也需要给另外的服务器发送请求,或者服务器也需要做另外一些事情,于是最初们发送的请求就被阻塞了,也就是要等待服务器完成其他的事情. 更多的时候,服务器做的额外事情,并不需要客户端等待,这时候就可以把这些额外的事情异步去做.从事异步任务的工具有很多.

  • python redis 批量设置过期key过程解析

    这篇文章主要介绍了python redis 批量设置过期key过程解析,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下 在使用 Redis.Codis 时,我们经常需要做一些批量操作,通过连接数据库批量对 key 进行操作: 关于未过期: 1.常有大批量的key未设置过期,导致内存一直暴增 2.rd需求 扫描出这些key,rd自己处理过期(一般dba不介入数据的修改) 3.dba 批量设置过期时间,(一般测试可以直接批量设置,线上谨慎操作) 通过

  • 基于python实现操作redis及消息队列

    操作 redis import redis redisPool = redis.ConnectionPool(host='192.168.100.50', port=6379, db=8) redis= redis.Redis(connection_pool=redisPool) redis.set('key','values') redis.get('com') redis.append('keys','values') redis.delete('keys') print(redis.get

随机推荐