Spring boot项目redisTemplate实现轻量级消息队列的方法
背景
公司项目有个需求, 前端上传excel文件, 后端读取数据、处理数据、返回错误数据, 最简单的方式同步处理, 客户端上传文件后一直阻塞等待响应, 但用户体验无疑很差, 处理数据可能十分耗时, 没人愿意傻等, 由于项目暂未使用ActiveMQ等消息队列中间件, 而redis的lpush和rpop很适合作为一种轻量级的消息队列实现, 所以用它完成此次功能开发
一、本文涉及知识点
- excel文件读写--阿里easyexcel sdk
- 文件上传、下载--腾讯云对象存储
- 远程服务调用--restTemplate
- 生产者、消费者--redisTemplate leftPush和rightPop操作
- 异步处理数据--Executors线程池
- 读取网络文件流--HttpClient
- 自定义注解实现用户身份认证--JWT token认证, 拦截器拦截标注有@LoginRequired注解的请求入口
当然, Java实现咯
涉及的知识点比较多, 每一个知识点都可以作为专题进行学习分析, 本文将完整实现呈现出来, 后期拆分与小伙伴分享学习
二、项目目录结构
说明: 数据库DAO层放到另一个模块了, 不是本文重点
三、主要maven依赖
1、easyexcel
<easyexcel-latestVersion>1.1.2-beta4</easyexcel-latestVersion> <dependency> <groupId>com.alibaba</groupId> <artifactId>easyexcel</artifactId> <version>${easyexcel-latestVersion}</version> </dependency>
JWT
<dependency> <groupId>io.jsonwebtoken</groupId> <artifactId>jjwt</artifactId> <version>0.7.0</version> </dependency>
redis
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-redis</artifactId> <version>1.3.5.RELEASE</version> </dependency>
腾讯cos
<dependency> <groupId>com.qcloud</groupId> <artifactId>cos_api</artifactId> <version>5.4.5</version> </dependency>
四、流程
- 用户上传文件
- 将文件存储到腾讯cos
- 将上传后的文件id及上传记录保存到数据库
- redis生产一条导入消息, 即保存文件id到redis
- 请求结束, 返回"处理中"状态
- redis消费消息
- 读取cos文件, 异步处理数据
- 将错误数据以excel形式上传至cos, 以供用户下载, 并更新处理状态为"处理完成"
- 客户端轮询查询处理状态, 并可以下载错误文件
- 结束
五、实现效果
上传文件
数据库导入记录
导入的数据
下载错误文件
错误数据提示
查询导入记录
六、代码实现
1、导入excel控制层
@LoginRequired @RequestMapping(value = "doImport", method = RequestMethod.POST) public JsonResponse doImport(@RequestParam("file") MultipartFile file, HttpServletRequest request) { PLUser user = getUser(request); return orderImportService.doImport(file, user.getId()); }
2、service层
@Override public JsonResponse doImport(MultipartFile file, Integer userId) { if (null == file || file.isEmpty()) { throw new ServiceException("文件不能为空"); } String filename = file.getOriginalFilename(); if (!checkFileSuffix(filename)) { throw new ServiceException("当前仅支持xlsx格式的excel"); } // 存储文件 String fileId = saveToOss(file); if (StringUtils.isBlank(fileId)) { throw new ServiceException("文件上传失败, 请稍后重试"); } // 保存记录到数据库 saveRecordToDB(userId, fileId, filename); // 生产一条订单导入消息 redisProducer.produce(RedisKey.orderImportKey, fileId); return JsonResponse.ok("导入成功, 处理中..."); } /** * 校验文件格式 * @param fileName * @return */ private static boolean checkFileSuffix(String fileName) { if (StringUtils.isBlank(fileName) || fileName.lastIndexOf(".") <= 0) { return false; } int pointIndex = fileName.lastIndexOf("."); String suffix = fileName.substring(pointIndex, fileName.length()).toLowerCase(); if (".xlsx".equals(suffix)) { return true; } return false; } /** * 将文件存储到腾讯OSS * @param file * @return */ private String saveToOss(MultipartFile file) { InputStream ins = null; try { ins = file.getInputStream(); } catch (IOException e) { e.printStackTrace(); } String fileId; try { String originalFilename = file.getOriginalFilename(); File f = new File(originalFilename); inputStreamToFile(ins, f); FileSystemResource resource = new FileSystemResource(f); MultiValueMap<String, Object> param = new LinkedMultiValueMap<>(); param.add("file", resource); ResponseResult responseResult = restTemplate.postForObject(txOssUploadUrl, param, ResponseResult.class); fileId = (String) responseResult.getData(); } catch (Exception e) { fileId = null; } return fileId; }
3、redis生产者
@Service public class RedisProducerImpl implements RedisProducer { @Autowired private RedisTemplate redisTemplate; @Override public JsonResponse produce(String key, String msg) { Map<String, String> map = Maps.newHashMap(); map.put("fileId", msg); redisTemplate.opsForList().leftPush(key, map); return JsonResponse.ok(); } }
4、redis消费者
@Service public class RedisConsumer { @Autowired public RedisTemplate redisTemplate; @Value("${txOssFileUrl}") private String txOssFileUrl; @Value("${txOssUploadUrl}") private String txOssUploadUrl; @PostConstruct public void init() { processOrderImport(); } /** * 处理订单导入 */ private void processOrderImport() { ExecutorService executorService = Executors.newCachedThreadPool(); executorService.execute(() -> { while (true) { Object object = redisTemplate.opsForList().rightPop(RedisKey.orderImportKey, 1, TimeUnit.SECONDS); if (null == object) { continue; } String msg = JSON.toJSONString(object); executorService.execute(new OrderImportTask(msg, txOssFileUrl, txOssUploadUrl)); } }); } }
5、处理任务线程类
public class OrderImportTask implements Runnable { public OrderImportTask(String msg, String txOssFileUrl, String txOssUploadUrl) { this.msg = msg; this.txOssFileUrl = txOssFileUrl; this.txOssUploadUrl = txOssUploadUrl; } } /** * 注入bean */ private void autowireBean() { this.restTemplate = BeanContext.getApplicationContext().getBean(RestTemplate.class); this.transactionTemplate = BeanContext.getApplicationContext().getBean(TransactionTemplate.class); this.orderImportService = BeanContext.getApplicationContext().getBean(OrderImportService.class); } @Override public void run() { // 注入bean autowireBean(); JSONObject jsonObject = JSON.parseObject(msg); String fileId = jsonObject.getString("fileId"); MultiValueMap<String, Object> param = new LinkedMultiValueMap<>(); param.add("id", fileId); ResponseResult responseResult = restTemplate.postForObject(txOssFileUrl, param, ResponseResult.class); String fileUrl = (String) responseResult.getData(); if (StringUtils.isBlank(fileUrl)) { return; } InputStream inputStream = HttpClientUtil.readFileFromURL(fileUrl); List<Object> list = ExcelUtil.read(inputStream); process(list, fileId); } /** * 将文件上传至oss * @param file * @return */ private String saveToOss(File file) { String fileId; try { FileSystemResource resource = new FileSystemResource(file); MultiValueMap<String, Object> param = new LinkedMultiValueMap<>(); param.add("file", resource); ResponseResult responseResult = restTemplate.postForObject(txOssUploadUrl, param, ResponseResult.class); fileId = (String) responseResult.getData(); } catch (Exception e) { fileId = null; } return fileId; }
说明: 处理数据的业务逻辑代码就不用贴了
6、上传文件到cos
@RequestMapping("/txOssUpload") @ResponseBody public ResponseResult txOssUpload(@RequestParam("file") MultipartFile file) throws UnsupportedEncodingException { if (null == file || file.isEmpty()) { return ResponseResult.fail("文件不能为空"); } String originalFilename = file.getOriginalFilename(); originalFilename = MimeUtility.decodeText(originalFilename);// 解决中文乱码问题 String contentType = getContentType(originalFilename); String key; InputStream ins = null; File f = null; try { ins = file.getInputStream(); f = new File(originalFilename); inputStreamToFile(ins, f); key = iFileStorageClient.txOssUpload(new FileInputStream(f), originalFilename, contentType); } catch (Exception e) { return ResponseResult.fail(e.getMessage()); } finally { if (null != ins) { try { ins.close(); } catch (IOException e) { e.printStackTrace(); } } if (f.exists()) {// 删除临时文件 f.delete(); } } return ResponseResult.ok(key); } public static void inputStreamToFile(InputStream ins,File file) { try { OutputStream os = new FileOutputStream(file); int bytesRead = 0; byte[] buffer = new byte[8192]; while ((bytesRead = ins.read(buffer, 0, 8192)) != -1) { os.write(buffer, 0, bytesRead); } os.close(); ins.close(); } catch (Exception e) { e.printStackTrace(); } } public String txOssUpload(FileInputStream inputStream, String key, String contentType) { key = Uuid.getUuid() + "-" + key; OSSUtil.txOssUpload(inputStream, key, contentType); try { if (null != inputStream) { inputStream.close(); } } catch (IOException e) { e.printStackTrace(); } return key; } public static void txOssUpload(FileInputStream inputStream, String key, String contentType) { ObjectMetadata objectMetadata = new ObjectMetadata(); try{ int length = inputStream.available(); objectMetadata.setContentLength(length); }catch (Exception e){ logger.info(e.getMessage()); } objectMetadata.setContentType(contentType); cosclient.putObject(txbucketName, key, inputStream, objectMetadata); }
7、下载文件
/** * 腾讯云文件下载 * @param response * @param id * @return */ @RequestMapping("/txOssDownload") public Object txOssDownload(HttpServletResponse response, String id) { COSObjectInputStream cosObjectInputStream = iFileStorageClient.txOssDownload(id, response); String contentType = getContentType(id); FileUtil.txOssDownload(response, contentType, cosObjectInputStream, id); return null; } public static void txOssDownload(HttpServletResponse response, String contentType, InputStream fileStream, String fileName) { FileOutputStream fos = null; response.reset(); OutputStream os = null; try { response.setContentType(contentType + "; charset=utf-8"); if(!contentType.equals(PlConstans.FileContentType.image)){ try { response.setHeader("Content-Disposition", "attachment; filename=" + new String(fileName.getBytes("UTF-8"), "ISO8859-1")); } catch (UnsupportedEncodingException e) { response.setHeader("Content-Disposition", "attachment; filename=" + fileName); logger.error("encoding file name failed", e); } } os = response.getOutputStream(); byte[] b = new byte[1024 * 1024]; int len; while ((len = fileStream.read(b)) > 0) { os.write(b, 0, len); os.flush(); try { if(fos != null) { fos.write(b, 0, len); fos.flush(); } } catch (Exception e) { logger.error(e.getMessage()); } } } catch (IOException e) { IOUtils.closeQuietly(fos); fos = null; } finally { IOUtils.closeQuietly(os); IOUtils.closeQuietly(fileStream); if(fos != null) { IOUtils.closeQuietly(fos); } } }
8、读取网络文件流
/** * 读取网络文件流 * @param url * @return */ public static InputStream readFileFromURL(String url) { if (StringUtils.isBlank(url)) { return null; } HttpClient httpClient = new DefaultHttpClient(); HttpGet methodGet = new HttpGet(url); try { HttpResponse response = httpClient.execute(methodGet); if (response.getStatusLine().getStatusCode() == 200) { HttpEntity entity = response.getEntity(); return entity.getContent(); } } catch (Exception e) { e.printStackTrace(); } return null; }
9、ExcelUtil
/** * 读excel * @param inputStream 文件输入流 * @return list集合 */ public static List<Object> read(InputStream inputStream) { return EasyExcelFactory.read(inputStream, new Sheet(1, 1)); } /** * 写excel * @param data list数据 * @param clazz * @param saveFilePath 文件保存路径 * @throws IOException */ public static void write(List<? extends BaseRowModel> data, Class<? extends BaseRowModel> clazz, String saveFilePath) throws IOException { File tempFile = new File(saveFilePath); OutputStream out = new FileOutputStream(tempFile); ExcelWriter writer = EasyExcelFactory.getWriter(out); Sheet sheet = new Sheet(1, 3, clazz, "Sheet1", null); writer.write(data, sheet); writer.finish(); out.close(); }
说明: 至此, 整个流程算是完整了, 下面将其他知识点代码也贴出来参考
七、其他
1、@LoginRequired注解
/** * 在需要登录验证的Controller的方法上使用此注解 */ @Target({ElementType.METHOD}) @Retention(RetentionPolicy.RUNTIME) public @interface LoginRequired { }
2、MyControllerAdvice
@ControllerAdvice public class MyControllerAdvice { @ResponseBody @ExceptionHandler(TokenValidationException.class) public JsonResponse tokenValidationExceptionHandler() { return JsonResponse.loginInvalid(); } @ResponseBody @ExceptionHandler(ServiceException.class) public JsonResponse serviceExceptionHandler(ServiceException se) { return JsonResponse.fail(se.getMsg()); } @ResponseBody @ExceptionHandler(Exception.class) public JsonResponse exceptionHandler(Exception e) { e.printStackTrace(); return JsonResponse.fail(e.getMessage()); } }
3、AuthenticationInterceptor
public class AuthenticationInterceptor implements HandlerInterceptor { private static final String CURRENT_USER = "user"; @Autowired private UserService userService; @Override public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) { // 如果不是映射到方法直接通过 if (!(handler instanceof HandlerMethod)) { return true; } HandlerMethod handlerMethod = (HandlerMethod) handler; Method method = handlerMethod.getMethod(); // 判断接口是否有@LoginRequired注解, 有则需要登录 LoginRequired methodAnnotation = method.getAnnotation(LoginRequired.class); if (methodAnnotation != null) { // 验证token Integer userId = JwtUtil.verifyToken(request); PLUser plUser = userService.selectByPrimaryKey(userId); if (null == plUser) { throw new RuntimeException("用户不存在,请重新登录"); } request.setAttribute(CURRENT_USER, plUser); return true; } return true; } @Override public void postHandle(HttpServletRequest httpServletRequest, HttpServletResponse httpServletResponse, Object o, ModelAndView modelAndView) throws Exception { } @Override public void afterCompletion(HttpServletRequest httpServletRequest, HttpServletResponse httpServletResponse, Object o, Exception e) throws Exception { } }
4、JwtUtil
public static final long EXPIRATION_TIME = 2592_000_000L; // 有效期30天 public static final String SECRET = "pl_token_secret"; public static final String HEADER = "token"; public static final String USER_ID = "userId"; /** * 根据userId生成token * @param userId * @return */ public static String generateToken(String userId) { HashMap<String, Object> map = new HashMap<>(); map.put(USER_ID, userId); String jwt = Jwts.builder() .setClaims(map) .setExpiration(new Date(System.currentTimeMillis() + EXPIRATION_TIME)) .signWith(SignatureAlgorithm.HS512, SECRET) .compact(); return jwt; } /** * 验证token * @param request * @return 验证通过返回userId */ public static Integer verifyToken(HttpServletRequest request) { String token = request.getHeader(HEADER); if (token != null) { try { Map<String, Object> body = Jwts.parser() .setSigningKey(SECRET) .parseClaimsJws(token) .getBody(); for (Map.Entry entry : body.entrySet()) { Object key = entry.getKey(); Object value = entry.getValue(); if (key.toString().equals(USER_ID)) { return Integer.valueOf(value.toString());// userId } } return null; } catch (Exception e) { logger.error(e.getMessage()); throw new TokenValidationException("unauthorized"); } } else { throw new TokenValidationException("missing token"); } }
结语: OK, 搞定,睡了, 好困
总结
以上就是这篇文章的全部内容了,希望本文的内容对大家的学习或者工作具有一定的参考学习价值,谢谢大家对我们的支持。
赞 (0)