SpringBoot使用Redis的zset统计在线用户信息

统计在线用户的数量,是应用很常见的需求了。如果需要精准的统计到用户是在线,离线状态,我想只有客户端和服务器通过保持一个TCP长连接来实现。如果应用本身并非一个IM应用的话,这种方式成本极高。

现在的应用都趋向于使用心跳包来标识用户是否在线。用户登录后,每隔一段时间,往服务器推送一个消息,表示当前用户在线。服务器则可以定义一个时间差,例如:5分钟内收到过客户端心跳消息,视为在线用户。

在线用户统计的实现

基于数据库实现

最简单的办法,就是在用户表,添加一个最后心跳包的日期时间字段 last_active。服务器收到心跳后,每次都去更新这个字段为当前的最新时间。

如果要查询最近5分钟活跃的用户数量,就可以简单的通过一句SQL完成。

SELECT COUNT(1) AS `online_user_count` FROM `user` WHERE `last_active` BETWEEN  '2020-12-22 13:00:00' AND '020-12-22 13:05:00';

弊端也是显而易见,为了提高检索效率,不得不为last_active字段添加索引,而因为心跳的更新,会导致频繁的重新维护索引树,效率极其低下。

基于Redis实现

这是比较理想的一种实现方式了,Redis基于内存进行读写,性能自然比关系型数据库好得多,而且它所提供的Zset可以很方便的构建出一个在线用户的统计服务。

Redis的Zset

这里不会涉及太多redis的东西,简单说明以下zset。它是一个有序的set集合,集合中的每个元素由2个东西组成

  • member 既然是集合,那么它便是集合中的元素,并且不能重复
  • score  既然是有序的,它就是用于排序的权重字段

Zset的部分操作

添加元素

ZADD key score member [score member ...]

一次性添加一个或者多个元素到集合,如果member已经存在则会使用当前score进行覆盖

统计所有的元素数量

ZCARD key

统计score值在min和max之间元素数量

ZCOUNT key min max

删除score值在min和max之间的元素

ZREMRANGEBYSCORE key min max

一个示例

我打算,用一个zset存储我内心中编程语言的评分排名,这个key叫做lang

添加信息,返回新添加的元素个数

> zadd lang 999 php 10 java 9 go 8 python 7 javascript
"5"

查看添加的数量

> zcard lang
"5"

查看评分在8 - 10之间的元素个数,有3个

> zcount lang 8 10
"3"

删除评分在8 - 1000的元素,返回删除的个数

> ZREMRANGEBYSCORE lang 8 1000
"4"

在线用户服务的实现

知道了zset后,就可以实现一个在线用户的统计服务了。

实现思路

客户端每隔5分钟发送一个心跳到服务器,服务器根据会话获取到用户的ID,作为zset的member
存入zset,score便是当前收到心跳的时间戳,当同一个用户第二次发送心跳的时候,就会更新他对应的score值,由于更新是在内存,这个速度相当快。

zadd users 1608616915109 10000

需要统计出在线用户的数量,本质上就是需要统计出,最近5分钟有发送心跳的用户,通过zcount可以很轻松的统计出来。通过程序获取到当前的时间戳,作为maxScore,时间戳减去5分钟后作为minScore。

zcount users 1608616615109 1608616915109 

因为某些用户可能长时间没有登录过了,可以通过ZREMRANGEBYSCORE进行清理。通过程序获取到当前的时间戳,减去5分钟后作为maxScore,使用0, 作为minScore,表示清理所有超过5分钟没有发送过心跳包的用户。

ZREMRANGEBYSCORE users 0 1608616615109 

实现代码

import java.time.Duration;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneId;

import javax.annotation.Resource;

import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;

/**
 * 
 * 
 * 在线用户统计
 * 
 * @author Administrator
 *
 */
@Component
public class OnlineUserStatsService {
    
    private static final String ONLINE_USERS = "onlie_users";

    @Resource
    private StringRedisTemplate stringRedisTemplate;

    /**
     * 添加用户在线信息
     * @param userId
     * @return 
     */
    public Boolean online(Integer userId) {
        return this.stringRedisTemplate.opsForZSet().add(ONLINE_USERS, userId.toString(), Instant.now().toEpochMilli());
    }
    
    /**
     * 获取一定时间内,在线的用户数量
     * @param duration
     * @return
     */
    public Long count(Duration duration) {
        LocalDateTime now = LocalDateTime.now();
        return this.stringRedisTemplate.opsForZSet().count(ONLINE_USERS, 
                                    now.minus(duration).atZone(ZoneId.systemDefault()).toInstant().toEpochMilli(), 
                                    now.atZone(ZoneId.systemDefault()).toInstant().toEpochMilli());
    }
    
    /**
     * 获取所有在线过的用户数量,不论时间
     * @return
     */
    public Long count() {
        return this.stringRedisTemplate.opsForZSet().zCard(ONLINE_USERS);
    }
    
    /**
     * 清除超过一定时间没在线的用户数据
     * @param duration
     * @return
     */
    public Long clear(Duration duration) {
        return this.stringRedisTemplate.opsForZSet().removeRangeByScore(ONLINE_USERS, 0, 
                LocalDateTime.now().minus(duration).atZone(ZoneId.systemDefault()).toInstant().toEpochMilli());
    }
}

使用示例

@Resource
private OnlineUserStatsService onlineUserStatsService;

@Test
public void test() {
    
    // ID为1的用户发送了心跳包
    boolean result = this.onlineUserStatsService.online(1);
    System.out.println("online=" + result);
    
    // 获取5分钟内,发送过心跳包的用户数量,也就是在线用户的数量
    Long count = this.onlineUserStatsService.count(Duration.ofMinutes(5));
    System.out.println("oneline count=" + count);
    
    // 获取所有发送过心跳包的用户数量
    count = this.onlineUserStatsService.count();
    System.out.println("all count=" + count);
    
    // 清除超过1天都没发送过心跳包的用户
    Long clear = this.onlineUserStatsService.clear(Duration.ofDays(1));
    System.out.println("clear=" + clear);
}

内存消耗分析

可以通过 http://www.redis.cn/redis_memory/ 预算Redis的内存消耗

我对Redis的内存分配并不熟悉,只是按照自己的想法去填写了一些数据,所以我在这里理解的东西,可能是错误的。但是我想这并不耽误证明 - 在这种场景使用Zset对内存消耗极低的事实

设想onlie_users需要存储1亿个用户的状态信息,每个元素score和member需要10个字节存储,那么一共大约需要20G内存。20G的内存对于现在的服务器来说,并不是大问题。

最后

  • 心跳协议不一定非要HTTP,如果客户端支持的话UDP就很适合,可以节约一些系统开销。
  • zset的key,不一定非要用String,可以修改序列化方式,以固定的字节的形式存储用户ID,在用户ID过大的时候,可以节约一些存储空间。
String userId = "10010";
System.out.println(userId.getBytes().length); // 以字符串形式存储 => 需要5个字节

byte[] bin = ByteBuffer.allocate(4).putInt(Integer.valueOf(userId)).array();
System.out.println(bin.length);                    // 序列化为字节形式存储 => 需要4个字节

System.out.println(ByteBuffer.wrap(bin).getInt());    // 反序列化为ID => 10010

以上就是SpringBoot使用Redis的zset统计在线用户信息的详细内容,更多关于SpringBoot统计在线用户信息的资料请关注我们其它相关文章!

(0)

相关推荐

  • springboot基于Redis发布订阅集群下WebSocket的解决方案

    一.背景 单机节点下,WebSocket连接成功后,可以直接发送消息.而多节点下,连接时通过nginx会代理到不同节点. 假设一开始用户连接了node1的socket服务.触发消息发送的条件的时候也通过nginx进行代理,假如代理转到了node2节点上,那么node2节点的socket服务就发送不了消息,因为一开始用户注册的是node1节点.这就导致了消息发送失败. 为了解决这一方案,消息发送时,就需要一个中间件来记录,这样,三个节点都可以获取消息,然后在根据条件进行消息推送. 二.解决方案(s

  • 使用SpringBoot集成redis的方法

    今天,日月在这里教大家如何使用springBoot集成redis,说实话比较简单,网上也有大把的教程.先套用一下网上的简介. 定义 REmote DIctionary Server(Redis) 是一个由Salvatore Sanfilippo写的key-value存储系统. Redis是一个开源的使用ANSI C语言编写.遵守BSD协议.支持网络.可基于内存亦可持久化的日志型.Key-Value数据库,并提供多种语言的API. 它通常被称为数据结构服务器,因为值(value)可以是 字符串(S

  • 关于Springboot2.x集成lettuce连接redis集群报超时异常Command timed out after 6 second(s)

    背景:最近在对一新开发Springboot系统做压测,发现刚开始压测时,可以正常对redis集群进行数据存取,但是暂停几分钟后,接着继续用jmeter进行压测时,发现redis就开始突然疯狂爆出异常提示:Command timed out after 6 second(s)...... Caused by: io.lettuce.core.RedisCommandTimeoutException: Command timed out after 6 second(s) at io.lettuce

  • SpringBoot集成Redis实现消息队列的方法

    list 原理说明 Redis 的 list 是按照插入顺序排序的字符串链表. 如图所示,可以通过 lpush 和 rpop 或者 rpush 和 lpop 实现消息队列. 1 lpush 和 rpop 2 rpush 和 lpop 消息队列功能实现 引入 Redis 依赖 <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data

  • java、spring、springboot中整合Redis的详细讲解

    java整合Redis 1.引入依赖或者导入jar包 <dependency> <groupId>redis.clients</groupId> <artifactId>jedis</artifactId> <version>2.9.0</version> </dependency> 2.代码实现 public class JedisTest { public static void main(String[]

  • 使用go操作redis的有序集合(zset)

    我就废话不多说了,大家还是直接看代码吧~ package main import ( "fmt" "github.com/garyburd/redigo/redis" ) func main() { // 连接redis数据库,指定数据库的IP和端口 conn, err := redis.Dial("tcp", "36.99.16.197:6379") if err != nil { fmt.Println("Con

  • springboot连接Redis的教程详解

    创建springboot项目 在NoSQL中选择Redis 项目目录 pom.xml中还需要加入下面的jar包 org.springframework.boot spring-boot-starter-json 在application.properties文件中添加Redis服务器信息 spring.redis.host=192.168.5.132 spring.redis.port=6379 剩下4个test类,我直接以源码的方式粘出来,里面有些代码是非必须的,我保留了测试的验证过程,所以里

  • springboot2整合redis使用lettuce连接池的方法(解决lettuce连接池无效问题)

    lettuce客户端 Lettuce 和 Jedis 的都是连接Redis Server的客户端程序.Jedis在实现上是直连redis server,多线程环境下非线程安全(即多个线程对一个连接实例操作,是线程不安全的),除非使用连接池,为每个Jedis实例增加物理连接.Lettuce基于Netty的连接实例(StatefulRedisConnection),可以在多个线程间并发访问,且线程安全,满足多线程环境下的并发访问(即多个线程公用一个连接实例,线程安全),同时它是可伸缩的设计,一个连接

  • springboot+websocket+redis搭建的实现

    在多负载环境下使用websocket. 一.原因 在某些业务场景,我们需要页面对于后台的操作进行实时的刷新,这时候就需要使用websocket. 通常在后台单机的情况下没有任何的问题,如果后台经过nginx等进行负载的话,则会导致前台不能准备的接收到后台给与的响应.socket属于长连接,其session只会保存在一台服务器上,其他负载及其不会持有这个session,此时,我们需要使用redis的发布订阅来实现,session的共享. 二.环境准备 在https://mvnrepository.

  • SpringBoot和Redis实现Token权限认证的实例讲解

    一.引言 登陆权限控制是每个系统都应必备的功能,实现方法也有好多种.下面使用Token认证来实现系统的权限访问. 功能描述: 用户登录成功后,后台返回一个token给调用者,同时自定义一个@AuthToken注解,被该注解标注的API请求都需要进行token效验,效验通过才可以正常访问,实现接口级的鉴权控制. 同时token具有生命周期,在用户持续一段时间不进行操作的话,token则会过期,用户一直操作的话,则不会过期. 二.环境 SpringBoot Redis(Docke中镜像) MySQL

  • 如何自定义redis工具jar包供其他SpringBoot项目直接使用

    注:(最终redis数据库连接信息由使用者项目模块配置提供) 一.Redis常用存储操作实现(redis-util模块,该module最后会打包成jar供其他服务使用) 1.引用相关依赖 <!-- 如果有继承父级spring-boot-starter-parent,可不用添加版本号 --> <!-- Redis缓存 [start] --> <dependency> <groupId>org.springframework.boot</groupId&g

  • php使用redis的有序集合zset实现延迟队列应用示例

    本文实例讲述了php使用redis的有序集合zset实现延迟队列.分享给大家供大家参考,具体如下: 延迟队列就是个带延迟功能的消息队列,相对于普通队列,它可以在指定时间消费掉消息. 延迟队列的应用场景: 1.新用户注册,10分钟后发送邮件或站内信. 2.用户下单后,30分钟未支付,订单自动作废. 我们通过redis的有序集合zset来实现简单的延迟队列,将消息数据序列化,作为zset的value,把消息处理时间作为score,每次通过zRangeByScore获取一条消息进行处理. <?php

  • SpringBoot集成Redisson实现延迟队列的场景分析

    使用场景 1.下单成功,30分钟未支付.支付超时,自动取消订单 2.订单签收,签收后7天未进行评价.订单超时未评价,系统默认好评 3.下单成功,商家5分钟未接单,订单取消 4.配送超时,推送短信提醒 ...... 对于延时比较长的场景.实时性不高的场景,我们可以采用任务调度的方式定时轮询处理.如:xxl-job 今天我们采用一种比较简单.轻量级的方式,使用 Redis 的延迟队列来进行处理.当然有更好的解决方案,可根据公司的技术选型和业务体系选择最优方案.如:使用消息中间件Kafka.Rabbi

随机推荐