当前位置:主页 > java教程 > Spring boot项目redisTemplate实现轻量级消息队列的方法

学习Spring boot项目redisTemplate实现轻量级消息队列

发布:2020-03-21 09:39:55 144


本站收集了一篇Java相关的编程文章,网友许雨晴根据主题投稿了本篇教程内容,涉及到Spring、boot、redisTemplate、消息队列、Spring boot项目redisTemplate实现轻量级消息队列的方法相关内容,已被469网友关注,涉猎到的知识点内容可以在下方电子书获得。

Spring boot项目redisTemplate实现轻量级消息队列的方法

背景

公司项目有个需求, 前端上传excel文件, 后端读取数据、处理数据、返回错误数据, 最简单的方式同步处理, 客户端上传文件后一直阻塞等待响应, 但用户体验无疑很差, 处理数据可能十分耗时, 没人愿意傻等, 由于项目暂未使用ActiveMQ等消息队列中间件, 而redis的lpush和rpop很适合作为一种轻量级的消息队列实现, 所以用它完成此次功能开发

一、本文涉及知识点

  • excel文件读写--阿里easyexcel sdk
  • 文件上传、下载--腾讯云对象存储
  • 远程服务调用--restTemplate
  • 生产者、消费者--redisTemplate leftPush和rightPop操作
  • 异步处理数据--Executors线程池
  • 读取网络文件流--HttpClient
  • 自定义注解实现用户身份认证--JWT token认证, 拦截器拦截标注有@LoginRequired注解的请求入口

当然, Java实现咯

涉及的知识点比较多, 每一个知识点都可以作为专题进行学习分析, 本文将完整实现呈现出来, 后期拆分与小伙伴分享学习

二、项目目录结构

Spring boot项目redisTemplate实现轻量级消息队列的方法

说明: 数据库DAO层放到另一个模块了, 不是本文重点

三、主要maven依赖

1、easyexcel

<easyexcel-latestVersion>1.1.2-beta4</easyexcel-latestVersion>

  <dependency>
   <groupId>com.alibaba</groupId>
   <artifactId>easyexcel</artifactId>
   <version>${easyexcel-latestVersion}</version>
  </dependency>

JWT

  <dependency>
   <groupId>io.jsonwebtoken</groupId>
   <artifactId>jjwt</artifactId>
   <version>0.7.0</version>
  </dependency>

redis

  <dependency>
   <groupId>org.springframework.boot</groupId>
   <artifactId>spring-boot-starter-redis</artifactId>
   <version>1.3.5.RELEASE</version>
  </dependency>

腾讯cos

  <dependency>
   <groupId>com.qcloud</groupId>
   <artifactId>cos_api</artifactId>
   <version>5.4.5</version>
  </dependency>

四、流程

  1. 用户上传文件
  2. 将文件存储到腾讯cos
  3. 将上传后的文件id及上传记录保存到数据库
  4. redis生产一条导入消息, 即保存文件id到redis
  5. 请求结束, 返回"处理中"状态
  6. redis消费消息
  7. 读取cos文件, 异步处理数据
  8. 将错误数据以excel形式上传至cos, 以供用户下载, 并更新处理状态为"处理完成"
  9. 客户端轮询查询处理状态, 并可以下载错误文件
  10. 结束

五、实现效果

上传文件

Spring boot项目redisTemplate实现轻量级消息队列的方法

数据库导入记录

Spring boot项目redisTemplate实现轻量级消息队列的方法

导入的数据

Spring boot项目redisTemplate实现轻量级消息队列的方法

下载错误文件

Spring boot项目redisTemplate实现轻量级消息队列的方法

错误数据提示

Spring boot项目redisTemplate实现轻量级消息队列的方法

查询导入记录

Spring boot项目redisTemplate实现轻量级消息队列的方法

六、代码实现

1、导入excel控制层

  @LoginRequired
  @RequestMapping(value = "doImport", method = RequestMethod.POST)
  public JsonResponse doImport(@RequestParam("file") MultipartFile file, HttpServletRequest request) {
    PLUser user = getUser(request);
    return orderImportService.doImport(file, user.getId());
  }

2、service层

  @Override
  public JsonResponse doImport(MultipartFile file, Integer userId) {
    if (null == file || file.isEmpty()) {
      throw new ServiceException("文件不能为空");
    }

    String filename = file.getOriginalFilename();
    if (!checkFileSuffix(filename)) {
      throw new ServiceException("当前仅支持xlsx格式的excel");
    }

    // 存储文件
    String fileId = saveToOss(file);
    if (StringUtils.isBlank(fileId)) {
      throw new ServiceException("文件上传失败, 请稍后重试");
    }

    // 保存记录到数据库
    saveRecordToDB(userId, fileId, filename);

    // 生产一条订单导入消息
    redisProducer.produce(RedisKey.orderImportKey, fileId);

    return JsonResponse.ok("导入成功, 处理中...");
  }

  /**
   * 校验文件格式
   * @param fileName
   * @return
   */
  private static boolean checkFileSuffix(String fileName) {
    if (StringUtils.isBlank(fileName) || fileName.lastIndexOf(".") <= 0) {
      return false;
    }

    int pointIndex = fileName.lastIndexOf(".");
    String suffix = fileName.substring(pointIndex, fileName.length()).toLowerCase();
    if (".xlsx".equals(suffix)) {
      return true;
    }

    return false;
  }

  /**
   * 将文件存储到腾讯OSS
   * @param file
   * @return
   */
  private String saveToOss(MultipartFile file) {
    InputStream ins = null;
    try {
      ins = file.getInputStream();
    } catch (IOException e) {
      e.printStackTrace();
    }

    String fileId;
    try {
      String originalFilename = file.getOriginalFilename();
      File f = new File(originalFilename);
      inputStreamToFile(ins, f);
      FileSystemResource resource = new FileSystemResource(f);

      MultiValueMap<String, Object> param = new LinkedMultiValueMap<>();
      param.add("file", resource);

      ResponseResult responseResult = restTemplate.postForObject(txOssUploadUrl, param, ResponseResult.class);
      fileId = (String) responseResult.getData();
    } catch (Exception e) {
      fileId = null;
    }

    return fileId;
  }

3、redis生产者

@Service
public class RedisProducerImpl implements RedisProducer {

  @Autowired
  private RedisTemplate redisTemplate;

  @Override
  public JsonResponse produce(String key, String msg) {
    Map<String, String> map = Maps.newHashMap();
    map.put("fileId", msg);
    redisTemplate.opsForList().leftPush(key, map);
    return JsonResponse.ok();
  }

}

4、redis消费者

@Service
public class RedisConsumer {

  @Autowired
  public RedisTemplate redisTemplate;

  @Value("${txOssFileUrl}")
  private String txOssFileUrl;

  @Value("${txOssUploadUrl}")
  private String txOssUploadUrl;

  @PostConstruct
  public void init() {
    processOrderImport();
  }

  /**
   * 处理订单导入
   */
  private void processOrderImport() {
    ExecutorService executorService = Executors.newCachedThreadPool();
    executorService.execute(() -> {
      while (true) {
        Object object = redisTemplate.opsForList().rightPop(RedisKey.orderImportKey, 1, TimeUnit.SECONDS);
        if (null == object) {
          continue;
        }
        String msg = JSON.toJSONString(object);
        executorService.execute(new OrderImportTask(msg, txOssFileUrl, txOssUploadUrl));
      }
    });
  }

}

5、处理任务线程类

public class OrderImportTask implements Runnable {
  public OrderImportTask(String msg, String txOssFileUrl, String txOssUploadUrl) {
    this.msg = msg;
    this.txOssFileUrl = txOssFileUrl;
    this.txOssUploadUrl = txOssUploadUrl;
  }
}

  /**
   * 注入bean
   */
  private void autowireBean() {
    this.restTemplate = BeanContext.getApplicationContext().getBean(RestTemplate.class);
    this.transactionTemplate = BeanContext.getApplicationContext().getBean(TransactionTemplate.class);
    this.orderImportService = BeanContext.getApplicationContext().getBean(OrderImportService.class);
  }

  @Override
  public void run() {
    // 注入bean
    autowireBean();

    JSONObject jsonObject = JSON.parseObject(msg);
    String fileId = jsonObject.getString("fileId");

    MultiValueMap<String, Object> param = new LinkedMultiValueMap<>();
    param.add("id", fileId);

    ResponseResult responseResult = restTemplate.postForObject(txOssFileUrl, param, ResponseResult.class);
    String fileUrl = (String) responseResult.getData();
    if (StringUtils.isBlank(fileUrl)) {
      return;
    }

    InputStream inputStream = HttpClientUtil.readFileFromURL(fileUrl);
    List<Object> list = ExcelUtil.read(inputStream);
    process(list, fileId);
  }

  /**
   * 将文件上传至oss
   * @param file
   * @return
   */
  private String saveToOss(File file) {
    String fileId;
    try {
      FileSystemResource resource = new FileSystemResource(file);
      MultiValueMap<String, Object> param = new LinkedMultiValueMap<>();
      param.add("file", resource);

      ResponseResult responseResult = restTemplate.postForObject(txOssUploadUrl, param, ResponseResult.class);
      fileId = (String) responseResult.getData();
    } catch (Exception e) {
      fileId = null;
    }
    return fileId;
  }

说明: 处理数据的业务逻辑代码就不用贴了

6、上传文件到cos

  @RequestMapping("/txOssUpload")
  @ResponseBody
  public ResponseResult txOssUpload(@RequestParam("file") MultipartFile file) throws UnsupportedEncodingException {
    if (null == file || file.isEmpty()) {
      return ResponseResult.fail("文件不能为空");
    }

    String originalFilename = file.getOriginalFilename();
    originalFilename = MimeUtility.decodeText(originalFilename);// 解决中文乱码问题
    String contentType = getContentType(originalFilename);
    String key;

    InputStream ins = null;
    File f = null;

    try {
      ins = file.getInputStream();
      f = new File(originalFilename);
      inputStreamToFile(ins, f);
      key = iFileStorageClient.txOssUpload(new FileInputStream(f), originalFilename, contentType);
    } catch (Exception e) {
      return ResponseResult.fail(e.getMessage());
    } finally {
      if (null != ins) {
        try {
          ins.close();
        } catch (IOException e) {
          e.printStackTrace();
        }
      }
      if (f.exists()) {// 删除临时文件
        f.delete();
      }
    }

    return ResponseResult.ok(key);
  }

  public static void inputStreamToFile(InputStream ins,File file) {
    try {
      OutputStream os = new FileOutputStream(file);
      int bytesRead = 0;
      byte[] buffer = new byte[8192];
      while ((bytesRead = ins.read(buffer, 0, 8192)) != -1) {
        os.write(buffer, 0, bytesRead);
      }
      os.close();
      ins.close();
    } catch (Exception e) {
      e.printStackTrace();
    }
  }

  public String txOssUpload(FileInputStream inputStream, String key, String contentType) {
    key = Uuid.getUuid() + "-" + key;
    OSSUtil.txOssUpload(inputStream, key, contentType);
    try {
      if (null != inputStream) {
        inputStream.close();
      }
    } catch (IOException e) {
      e.printStackTrace();
    }
    return key;
  }

  public static void txOssUpload(FileInputStream inputStream, String key, String contentType) {
    ObjectMetadata objectMetadata = new ObjectMetadata();
    try{
      int length = inputStream.available();
      objectMetadata.setContentLength(length);
    }catch (Exception e){
      logger.info(e.getMessage());
    }
    objectMetadata.setContentType(contentType);
    cosclient.putObject(txbucketName, key, inputStream, objectMetadata);
  }

7、下载文件

  /**
   * 腾讯云文件下载
   * @param response
   * @param id
   * @return
   */
  @RequestMapping("/txOssDownload")
  public Object txOssDownload(HttpServletResponse response, String id) {
    COSObjectInputStream cosObjectInputStream = iFileStorageClient.txOssDownload(id, response);
    String contentType = getContentType(id);
    FileUtil.txOssDownload(response, contentType, cosObjectInputStream, id);
    return null;
  }

  public static void txOssDownload(HttpServletResponse response, String contentType, InputStream fileStream, String fileName) {
    FileOutputStream fos = null;
    response.reset();
    OutputStream os = null;
    try {
      response.setContentType(contentType + "; charset=utf-8");
      if(!contentType.equals(PlConstans.FileContentType.image)){
        try {
          response.setHeader("Content-Disposition", "attachment; filename=" + new String(fileName.getBytes("UTF-8"), "ISO8859-1"));
        } catch (UnsupportedEncodingException e) {
          response.setHeader("Content-Disposition", "attachment; filename=" + fileName);
          logger.error("encoding file name failed", e);
        }
      }

      os = response.getOutputStream();

      byte[] b = new byte[1024 * 1024];
      int len;
      while ((len = fileStream.read(b)) > 0) {
        os.write(b, 0, len);
        os.flush();
        try {
          if(fos != null) {
            fos.write(b, 0, len);
            fos.flush();
          }
        } catch (Exception e) {
          logger.error(e.getMessage());
        }
      }
    } catch (IOException e) {
      IOUtils.closeQuietly(fos);
      fos = null;
    } finally {
      IOUtils.closeQuietly(os);
      IOUtils.closeQuietly(fileStream);
      if(fos != null) {
        IOUtils.closeQuietly(fos);
      }
    }
  }

8、读取网络文件流

  /**
   * 读取网络文件流
   * @param url
   * @return
   */
  public static InputStream readFileFromURL(String url) {
    if (StringUtils.isBlank(url)) {
      return null;
    }

    HttpClient httpClient = new DefaultHttpClient();
    HttpGet methodGet = new HttpGet(url);
    try {
      HttpResponse response = httpClient.execute(methodGet);
      if (response.getStatusLine().getStatusCode() == 200) {
        HttpEntity entity = response.getEntity();
        return entity.getContent();
      }
    } catch (Exception e) {
      e.printStackTrace();
    }
    return null;
  }

9、ExcelUtil

  /**
   * 读excel
   * @param inputStream 文件输入流
   * @return list集合
   */
  public static List<Object> read(InputStream inputStream) {
    return EasyExcelFactory.read(inputStream, new Sheet(1, 1));
  }

  /**
   * 写excel
   * @param data list数据
   * @param clazz
   * @param saveFilePath 文件保存路径
   * @throws IOException
   */
  public static void write(List<? extends BaseRowModel> data, Class<? extends BaseRowModel> clazz, String saveFilePath) throws IOException {
    File tempFile = new File(saveFilePath);
    OutputStream out = new FileOutputStream(tempFile);
    ExcelWriter writer = EasyExcelFactory.getWriter(out);
    Sheet sheet = new Sheet(1, 3, clazz, "Sheet1", null);
    writer.write(data, sheet);
    writer.finish();
    out.close();
  }

说明: 至此, 整个流程算是完整了, 下面将其他知识点代码也贴出来参考

七、其他

1、@LoginRequired注解

/**
 * 在需要登录验证的Controller的方法上使用此注解
 */
@Target({ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
public @interface LoginRequired {
}

2、MyControllerAdvice

@ControllerAdvice
public class MyControllerAdvice {

  @ResponseBody
  @ExceptionHandler(TokenValidationException.class)
  public JsonResponse tokenValidationExceptionHandler() {
    return JsonResponse.loginInvalid();
  }

  @ResponseBody
  @ExceptionHandler(ServiceException.class)
  public JsonResponse serviceExceptionHandler(ServiceException se) {
    return JsonResponse.fail(se.getMsg());
  }

  @ResponseBody
  @ExceptionHandler(Exception.class)
  public JsonResponse exceptionHandler(Exception e) {
    e.printStackTrace();
    return JsonResponse.fail(e.getMessage());
  }

}

3、AuthenticationInterceptor

public class AuthenticationInterceptor implements HandlerInterceptor {

  private static final String CURRENT_USER = "user";

  @Autowired
  private UserService userService;

  @Override
  public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) {
    // 如果不是映射到方法直接通过
    if (!(handler instanceof HandlerMethod)) {
      return true;
    }
    HandlerMethod handlerMethod = (HandlerMethod) handler;
    Method method = handlerMethod.getMethod();

    // 判断接口是否有@LoginRequired注解, 有则需要登录
    LoginRequired methodAnnotation = method.getAnnotation(LoginRequired.class);
    if (methodAnnotation != null) {
      // 验证token
      Integer userId = JwtUtil.verifyToken(request);
      PLUser plUser = userService.selectByPrimaryKey(userId);
      if (null == plUser) {
        throw new RuntimeException("用户不存在,请重新登录");
      }
      request.setAttribute(CURRENT_USER, plUser);
      return true;
    }
    return true;
  }

  @Override
  public void postHandle(HttpServletRequest httpServletRequest, HttpServletResponse httpServletResponse, Object o, ModelAndView modelAndView) throws Exception {
  }

  @Override
  public void afterCompletion(HttpServletRequest httpServletRequest, HttpServletResponse httpServletResponse, Object o, Exception e) throws Exception {
  }
}

4、JwtUtil

  public static final long EXPIRATION_TIME = 2592_000_000L; // 有效期30天
  public static final String SECRET = "pl_token_secret";
  public static final String HEADER = "token";
  public static final String USER_ID = "userId";

  /**
   * 根据userId生成token
   * @param userId
   * @return
   */
  public static String generateToken(String userId) {
    HashMap<String, Object> map = new HashMap<>();
    map.put(USER_ID, userId);
    String jwt = Jwts.builder()
        .setClaims(map)
        .setExpiration(new Date(System.currentTimeMillis() + EXPIRATION_TIME))
        .signWith(SignatureAlgorithm.HS512, SECRET)
        .compact();
    return jwt;
  }

  /**
   * 验证token
   * @param request
   * @return 验证通过返回userId
   */
  public static Integer verifyToken(HttpServletRequest request) {
    String token = request.getHeader(HEADER);
    if (token != null) {
      try {
        Map<String, Object> body = Jwts.parser()
            .setSigningKey(SECRET)
            .parseClaimsJws(token)
            .getBody();

        for (Map.Entry entry : body.entrySet()) {
          Object key = entry.getKey();
          Object value = entry.getValue();
          if (key.toString().equals(USER_ID)) {
            return Integer.valueOf(value.toString());// userId
          }
        }
        return null;
      } catch (Exception e) {
        logger.error(e.getMessage());
        throw new TokenValidationException("unauthorized");
      }
    } else {
      throw new TokenValidationException("missing token");
    }
  }

结语: OK, 搞定,睡了, 好困

总结

以上就是这篇文章的全部内容了,希望本文的内容对大家的学习或者工作具有一定的参考学习价值,谢谢大家对码农之家的支持。


参考资料

相关文章

  • SpringBoot添加单元测试实例

    发布:2021-04-27

    本篇文章主要介绍了详解SpringBoot之添加单元测试,小编觉得挺不错的,现在分享给大家,也给大家做个参考。一起跟随小编过来看看吧


  • Bootstrap禁用响应式布局的实现方法

    发布:2023-01-11

    给大家整理了关于Bootstrap的教程,这篇文章主要介绍了Bootstrap禁用响应式布局的实现方法,非常不错,具有参考借鉴价值,需要的朋友可以参考下


  • spring与disruptor集成的实例内容

    发布:2021-06-16

    本篇文章主要介绍了spring与disruptor集成的简单示例,小编觉得挺不错的,现在分享给大家,也给大家做个参考。一起跟随小编过来看看吧


  • BootStrap Fileinput上传插件实例详解

    发布:2019-11-18

    这篇文章主要介绍了BootStrap Fileinput上传插件使用实例代码,,通过引入js和css文件,具体实现代码大家参考下本文


  • Bootstrap可折叠侧边导航菜单实现代码

    发布:2020-04-24

    这篇文章主要介绍了Bootstrap实现可折叠分组侧边导航菜单的相关资料,需要的朋友可以参考下


  • SpringBoot使用SensitiveWord实现敏感词过滤

    发布:2023-03-06

    这篇文章主要为大家详细介绍了SpringBoot如何使用SensitiveWord实现敏感词过滤功能,文中示例代码讲解详细,感兴趣的小伙伴可以了解一下


  • 深入理解Spring中事务传播行为

    发布:2020-02-11

    今天小编就为大家分享一篇关于Spring中事务传播行为的介绍,小编觉得内容挺不错的,现在分享给大家,具有很好的参考价值,需要的朋友一起跟随小编来看看吧


  • 浅谈SpringBoot在使用测试的时候是否需要@RunWith

    发布:2023-03-11

    本文主要介绍了浅谈SpringBoot在使用测试的时候是否需要@RunWith,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧


网友讨论