Spring Boot整合FTPClient线程池的实现示例

最近在写一个FTP上传工具,用到了Apache的FTPClient,但是每个线程频繁的创建和销毁FTPClient对象对服务器的压力很大,因此,此处最好使用一个FTPClient连接池。仔细翻了一下Apache的api,发现它并没有一个FTPClientPool的实现,所以,不得不自己写一个FTPClientPool。下面就大体介绍一下开发连接池的整个过程,供大家参考。

我们可以利用Apache提供的common-pool包来协助我们开发连接池。而开发一个简单的对象池,仅需要实现common-pool 包中的ObjectPool和PoolableObjectFactory两个接口即可。

线程池的意义

为了减少频繁创建、销毁对象带来的性能消耗,我们可以利用对象池的技术来实现对象的复用。对象池提供了一种机制,它可以管理对象池中对象的生命周期,提供了获取和释放对象的方法,可以让客户端很方便的使用对象池中的对象。

pom引入依赖

 <!-- FtpClient依赖包-->
    <dependency>
      <groupId>commons-net</groupId>
      <artifactId>commons-net</artifactId>
      <version>3.5</version>
    </dependency>

    <!-- 线程池-->
    <dependency>
      <groupId>commons-pool</groupId>
      <artifactId>commons-pool</artifactId>
      <version>1.6</version>
    </dependency>

    <dependency>
      <groupId>org.apache.commons</groupId>
      <artifactId>commons-pool2</artifactId>
      <version>2.0</version>
    </dependency>

创建ftp配置信息

在resources目录下创建ftp.properties配置文件,目录结构如下:

添加如下的配置信息:

########### FTP用户名称 ###########
ftp.userName=hrabbit
########### FTP用户密码 ###########
ftp.passWord=123456
########### FTP主机IP ###########
ftp.host=127.0.0.1
########### FTP主机端口号 ###########
ftp.port=21
########### 保存根路径 ###########
ftp.baseUrl=/

创建FTPProperties.java配置文件

加载配置内容到Spring中,配置信息基本延用我的就可以。

/**
 * FTP的配置信息
 * @Auther: hrabbit
 * @Date: 2018-12-03 2:06 PM
 * @Description:
 */
@Data
@Component
@PropertySource("classpath:ftp.properties")
@ConfigurationProperties(prefix = "ftp")
public class FTPProperties {

  private String username;

  private String password;

  private String host;

  private Integer port;

  private String baseUrl;

  private Integer passiveMode = FTP.BINARY_FILE_TYPE;

  private String encoding="UTF-8";

  private int clientTimeout=120000;

  private int bufferSize;

  private int transferFileType=FTP.BINARY_FILE_TYPE;

  private boolean renameUploaded;

  private int retryTime;
}

创建FTPClientPool线程池

/**
 * 自定义实现ftp连接池
 * @Auther: hrabbit
 * @Date: 2018-12-03 3:40 PM
 * @Description:
 */
@Slf4j
@SuppressWarnings("all")
public class FTPClientPool implements ObjectPool<FTPClient> {

  private static final int DEFAULT_POOL_SIZE = 10;

  public BlockingQueue<FTPClient> blockingQueue;

  private FTPClientFactory factory;

  public FTPClientPool(FTPClientFactory factory) throws Exception {
    this(DEFAULT_POOL_SIZE, factory);
  }

  public FTPClientPool(int poolSize, FTPClientFactory factory) throws Exception {
    this.factory = factory;
    this.blockingQueue = new ArrayBlockingQueue<FTPClient>(poolSize);
    initPool(poolSize);
  }

  /**
   * 初始化连接池
   * @param maxPoolSize
   *         最大连接数
   * @throws Exception
   */
  private void initPool(int maxPoolSize) throws Exception {
    int count = 0;
    while(count < maxPoolSize) {
      this.addObject();
      count++;
    }
  }

  /**
   * 从连接池中获取对象
   */
  @Override
  public FTPClient borrowObject() throws Exception {
    FTPClient client = blockingQueue.take();
    if(client == null) {
      client = factory.makeObject();
    } else if(!factory.validateObject(client)) {
      invalidateObject(client);
      client = factory.makeObject();
    }
    return client;
  }

  /**
   * 返还一个对象(链接)
   */
  @Override
  public void returnObject(FTPClient client) throws Exception {
    if ((client != null) && !blockingQueue.offer(client,2,TimeUnit.MINUTES)) {
      try {
        factory.destroyObject(client);
      } catch (Exception e) {
        throw e;
      }
    }
  }

  /**
   * 移除无效的对象(FTP客户端)
   */
  @Override
  public void invalidateObject(FTPClient client) throws Exception {
    blockingQueue.remove(client);
  }

  /**
   * 增加一个新的链接,超时失效
   */
  @Override
  public void addObject() throws Exception {
    blockingQueue.offer(factory.makeObject(), 2, TimeUnit.MINUTES);
  }

  /**
   * 重新连接
   */
  public FTPClient reconnect() throws Exception {
    return factory.makeObject();
  }

  /**
   * 获取空闲链接数(这里暂不实现)
   */
  @Override
  public int getNumIdle() {
    return blockingQueue.size();
  }

  /**
   * 获取正在被使用的链接数
   */
  @Override
  public int getNumActive() {
    return DEFAULT_POOL_SIZE - getNumIdle();
  }

  @Override
  public void clear() throws Exception {

  }

  /**
   * 关闭连接池
   */
  @Override
  public void close() {
    try {
      while(blockingQueue.iterator().hasNext()) {
        FTPClient client = blockingQueue.take();
        factory.destroyObject(client);
      }
    } catch(Exception e) {
      log.error("close ftp client pool failed...{}", e);
    }
  }

  /**
   * 增加一个新的链接,超时失效
   */
  public void addObject(FTPClient ftpClient) throws Exception {
    blockingQueue.put(ftpClient);
  }
}

创建一个FTPClientFactory工厂类

创建FTPClientFactory实现PoolableObjectFactory的接口,FTPClient工厂类,通过FTPClient工厂提供FTPClient实例的创建和销毁

/**
 * FTPClient 工厂
 * @Auther: hrabbit
 * @Date: 2018-12-03 3:41 PM
 * @Description:
 */
@Slf4j
@SuppressWarnings("all")
public class FTPClientFactory implements PoolableObjectFactory<FTPClient> {

  private FTPProperties ftpProperties;

  public FTPClientFactory(FTPProperties ftpProperties) {
    this.ftpProperties = ftpProperties;
  }

  @Override
  public FTPClient makeObject() throws Exception {
    FTPClient ftpClient = new FTPClient();
    ftpClient.setControlEncoding(ftpProperties.getEncoding());
    ftpClient.setConnectTimeout(ftpProperties.getClientTimeout());
    try {
      ftpClient.connect(ftpProperties.getHost(), ftpProperties.getPort());
      int reply = ftpClient.getReplyCode();
      if (!FTPReply.isPositiveCompletion(reply)) {
        ftpClient.disconnect();
        log.warn("FTPServer refused connection");
        return null;
      }
      boolean result = ftpClient.login(ftpProperties.getUsername(), ftpProperties.getPassword());
      ftpClient.setFileType(ftpProperties.getTransferFileType());
      if (!result) {
        log.warn("ftpClient login failed... username is {}", ftpProperties.getUsername());
      }
    } catch (Exception e) {
      log.error("create ftp connection failed...{}", e);
      throw e;
    }

    return ftpClient;
  }

  @Override
  public void destroyObject(FTPClient ftpClient) throws Exception {
    try {
      if(ftpClient != null && ftpClient.isConnected()) {
        ftpClient.logout();
      }
    } catch (Exception e) {
      log.error("ftp client logout failed...{}", e);
      throw e;
    } finally {
      if(ftpClient != null) {
        ftpClient.disconnect();
      }
    }

  }

  @Override
  public boolean validateObject(FTPClient ftpClient) {
    try {
      return ftpClient.sendNoOp();
    } catch (Exception e) {
      log.error("Failed to validate client: {}");
    }
    return false;
  }

  @Override
  public void activateObject(FTPClient obj) throws Exception {
    //Do nothing

  }

  @Override
  public void passivateObject(FTPClient obj) throws Exception {
    //Do nothing

  }
}

创建FTPUtils.java的工具类

FTPUtils.java中封装了上传、下载等方法,在项目启动的时候,在@PostConstruct注解的作用下通过执行init()的方法,创建FTPClientFactory工厂中,并初始化了FTPClientPool线程池,这样每次调用方法的时候,都直接从FTPClientPool中取出一个FTPClient对象

/**
 * @Auther: hrabbit
 * @Date: 2018-12-03 3:47 PM
 * @Description:
 */
@Slf4j
@Component
public class FTPUtils {

  /**
   * FTP的连接池
   */
  @Autowired
  public static FTPClientPool ftpClientPool;
  /**
   * FTPClient对象
   */
  public static FTPClient ftpClient;

  private static FTPUtils ftpUtils;

  @Autowired
  private FTPProperties ftpProperties;

  /**
   * 初始化设置
   * @return
   */
  @PostConstruct
  public boolean init() {
    FTPClientFactory factory = new FTPClientFactory(ftpProperties);
    ftpUtils = this;
    try {
      ftpClientPool = new FTPClientPool(factory);
    } catch (Exception e) {
      e.printStackTrace();
      return false;
    }
    return true;
  }

  /**
   * 获取连接对象
   * @return
   * @throws Exception
   */
  public static FTPClient getFTPClient() throws Exception {
    //初始化的时候从队列中取出一个连接
    if (ftpClient==null) {
      synchronized (ftpClientPool) {
        ftpClient = ftpClientPool.borrowObject();
      }
    }
    return ftpClient;
  }

  /**
   * 当前命令执行完成命令完成
   * @throws IOException
   */
  public void complete() throws IOException {
    ftpClient.completePendingCommand();
  }

  /**
   * 当前线程任务处理完成,加入到队列的最后
   * @return
   */
  public void disconnect() throws Exception {
    ftpClientPool.addObject(ftpClient);
  }

  /**
   * Description: 向FTP服务器上传文件
   *
   * @Version1.0
   * @param remoteFile
   *      上传到FTP服务器上的文件名
   * @param input
   *      本地文件流
   * @return 成功返回true,否则返回false
   */
  public static boolean uploadFile(String remoteFile, InputStream input) {
    boolean result = false;
    try {
      getFTPClient();
      ftpClient.enterLocalPassiveMode();
      result = ftpClient.storeFile(remoteFile, input);
      input.close();
      ftpClient.disconnect();
    } catch (Exception e) {
      e.printStackTrace();
    }
    return result;
  }

  /**
   * Description: 向FTP服务器上传文件
   *
   * @Version1.0
   * @param remoteFile
   *      上传到FTP服务器上的文件名
   * @param localFile
   *      本地文件
   * @return 成功返回true,否则返回false
   */
  public static boolean uploadFile(String remoteFile, String localFile){
    FileInputStream input = null;
    try {
      input = new FileInputStream(new File(localFile));
    } catch (FileNotFoundException e) {
      e.printStackTrace();
    }
    return uploadFile(remoteFile, input);
  }

  /**
   * 拷贝文件
   * @param fromFile
   * @param toFile
   * @return
   * @throws Exception
   */
  public boolean copyFile(String fromFile, String toFile) throws Exception {
    InputStream in=getFileInputStream(fromFile);
    getFTPClient();
    boolean flag = ftpClient.storeFile(toFile, in);
    in.close();
    return flag;
  }

  /**
   * 获取文件输入流
   * @param fileName
   * @return
   * @throws IOException
   */
  public static InputStream getFileInputStream(String fileName) throws Exception {
    ByteArrayOutputStream fos=new ByteArrayOutputStream();
    getFTPClient();
    ftpClient.retrieveFile(fileName, fos);
    ByteArrayInputStream in=new ByteArrayInputStream(fos.toByteArray());
    fos.close();
    return in;
  }

  /**
   * Description: 从FTP服务器下载文件
   *
   * @Version1.0
   * @return
   */
  public static boolean downFile(String remoteFile, String localFile){
    boolean result = false;
    try {
      getFTPClient();
      OutputStream os = new FileOutputStream(localFile);
      ftpClient.retrieveFile(remoteFile, os);
      ftpClient.logout();
      ftpClient.disconnect();
      result = true;
    } catch (Exception e) {
      e.printStackTrace();
    } finally {
      try {
      } catch (Exception e) {
        e.printStackTrace();
      }
    }
    return result;
  }

  /**
   * 从ftp中获取文件流
   * @param filePath
   * @return
   * @throws Exception
   */
  public static InputStream getInputStream(String filePath) throws Exception {
    getFTPClient();
    InputStream inputStream = ftpClient.retrieveFileStream(filePath);
    return inputStream;
  }

  /**
   * ftp中文件重命名
   * @param fromFile
   * @param toFile
   * @return
   * @throws Exception
   */
  public boolean rename(String fromFile,String toFile) throws Exception {
    getFTPClient();
    boolean result = ftpClient.rename(fromFile,toFile);
    return result;
  }

  /**
   * 获取ftp目录下的所有文件
   * @param dir
   * @return
   */
  public FTPFile[] getFiles(String dir) throws Exception {
    getFTPClient();
    FTPFile[] files = new FTPFile[0];
    try {
      files = ftpClient.listFiles(dir);
    }catch (Throwable thr){
      thr.printStackTrace();
    }
    return files;
  }

  /**
   * 获取ftp目录下的某种类型的文件
   * @param dir
   * @param filter
   * @return
   */
  public FTPFile[] getFiles(String dir, FTPFileFilter filter) throws Exception {
    getFTPClient();
    FTPFile[] files = new FTPFile[0];
    try {
      files = ftpClient.listFiles(dir, filter);
    }catch (Throwable thr){
      thr.printStackTrace();
    }
    return files;
  }

  /**
   * 创建文件夹
   * @param remoteDir
   * @return 如果已经有这个文件夹返回false
   */
  public boolean makeDirectory(String remoteDir) throws Exception {
    getFTPClient();
    boolean result = false;
    try {
      result = ftpClient.makeDirectory(remoteDir);
    } catch (IOException e) {
      e.printStackTrace();
    }
    return result;
  }

  public boolean mkdirs(String dir) throws Exception {
    boolean result = false;
    if (null == dir) {
      return result;
    }
    getFTPClient();
    ftpClient.changeWorkingDirectory("/");
    StringTokenizer dirs = new StringTokenizer(dir, "/");
    String temp = null;
    while (dirs.hasMoreElements()) {
      temp = dirs.nextElement().toString();
      //创建目录
      ftpClient.makeDirectory(temp);
      //进入目录
      ftpClient.changeWorkingDirectory(temp);
      result = true;
    }
    ftpClient.changeWorkingDirectory("/");
    return result;
  }
}

创建FtpClientTest.java测试类

上传一张图片到FTP服务器,并将文件重新命名为hrabbit.jpg,代码如下:

/**
 * FtpClient测试
 * @Auther: hrabbit
 * @Date: 2018-12-21 9:14 PM
 * @Description:
 */
@RunWith(SpringRunner.class)
@SpringBootTest
public class FtpClientTest {

  /**
   * 测试上传
   */
  @Test
  public void uploadFile(){
    boolean flag = FTPUtils.uploadFile("hrabbit.jpg", "/Users/mrotaku/Downloads/klklklkl_4x.jpg");
    Assert.assertEquals(true, flag);
  }

}

程序完美运行,这时候我们查看我们的FTP服务器,http://localhost:8866/hrabbit.jpg

码云地址:https://gitee.com/hrabbit/hrabbit-admin

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持我们。

(0)

相关推荐

  • SpringBoot使用WebJars统一管理静态资源的方法

    传统管理静态资源主要依赖于复制粘贴,不利于后期维护,为了让大家往后更舒心,让WebJars给静态资源来一次搬家革命吧!! 学习目标 简单两步!快速学会使用WebJars统一管理前端依赖. 快速查阅 源码下载:SpringBoot Webjars Learning 使用教程 一.引入相关依赖 在 WebJars官网找到项目中需要的依赖,例如在项目中引入jQuery.BootStrap前端组件等.例如: 版本定位工具:webjars-locator-core 前端组件:jquery .bootstr

  • 详解SpringBoot实现JPA的save方法不更新null属性

    序言:直接调用原生Save方法会导致null属性覆盖到数据库,使用起来十分不方便.本文提供便捷方法解决此问题. 核心思路 如果现在保存某User对象,首先根据主键查询这个User的最新对象,然后将此User对象的非空属性覆盖到最新对象. 核心代码 直接修改通用JpaRepository的实现类,然后在启动类标记此实现类即可. 一.通用CRUD实现类 public class SimpleJpaRepositoryImpl<T, ID> extends SimpleJpaRepository&l

  • Intellij IDEA 2017新特性之Spring Boot相关特征介绍

    前言 Intellij IDEA 2017.2.2版本针对Springboot设置了一些特性,本篇文章给大家简单介绍一下如何使用这些特性. Run Dashboard 针对Spring boot提供了Run Dashboard方式的来代替传统的run方法.下面看一下官网提供的面板结构图: 是不是很炫,直接可以通过Dashboard看到Springboot的启动项目,并显示相应的端口等信息,同时还能在这里进行相应的操作.下面我们来看看如何调用出Dashboard. 首先,你的项目应该是一个spri

  • Spring Boot集成netty实现客户端服务端交互示例详解

    前言 Netty 是一个高性能的 NIO 网络框架,本文主要给大家介绍了关于SpringBoot集成netty实现客户端服务端交互的相关内容,下面来一起看看详细的介绍吧 看了好几天的netty实战,慢慢摸索,虽然还没有摸着很多门道,但今天还是把之前想加入到项目里的 一些想法实现了,算是有点信心了吧(讲真netty对初学者还真的不是很友好......) 首先,当然是在SpringBoot项目里添加netty的依赖了,注意不要用netty5的依赖,因为已经废弃了 <!--netty--> <

  • SpringBoot基于HttpMessageConverter实现全局日期格式化

    还在为日期格式化的问题头痛?赶紧阅览文章寻找答案吧! 学习目标 快速学会使用Jackson消息转换器并实现日期的全局格式化. 快速查阅 源码下载:SpringBoot-Date-Format 开始教程 一.全局日期格式化(基于自动配置) 关于日期格式化,很多人会想到使用Jackson的自动配置: spring.jackson.date-format=yyyy-MM-dd HH:mm:ss spring.jackson.timeZone: GMT+8 这种全局日期格式化固然方便,但在消息传递时只能

  • spring boot使用sonarqube来检查技术债务

    作为代码质量检查的流行工具,比如Sonarqube能够检查代码的"七宗罪",跟代码结合起来能够更好地提高代码的质量,让我们来看一下,刚刚写的Springboot2的HelloWorld的代码有什么"罪". Sonarqube Sonarqube可以使用docker版本快速搭建,可以参看一下Easypack整理的镜像,具体使用可以参看如下链接,这里不再赘述: https://hub.docker.com/r/liumiaocn/sonarqube/ 环境假定 本文使用

  • Spring Boot配置Swagger的实现代码

    由于Spring Boot能够快速开发.便捷部署等特性,相信有很大一部分Spring Boot的用户会用来构建RESTful API.而我们构建RESTful API的目的通常都是由于多终端的原因,这些终端会共用很多底层业务逻辑,因此我们会抽象出这样一层来同时服务于多个移动端或者Web前端. Swagger Inspector:测试API和生成OpenAPI的开发工具.Swagger Inspector的建立是为了解决开发者的三个主要目标. 执行简单的API测试 生成OpenAPI文档 探索新的

  • 详解SpringBoot注入数据的方式

    关于注入数据说明 1.不通过配置文件注入数据 通过@Value将外部的值动态注入到Bean中,使用的情况有: 注入普通字符串 注入操作系统属性 注入表达式结果 注入其他Bean属性:注入Student对象的属性name 注入文件资源 注入URL资源 辅助代码 package com.hannpang.model; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereo

  • 在Spring boot的项目中使用Junit进行单体测试

    使用Junit或者TestNG可以进行单体测试,这篇文章简单说明一下如何在Spring boot的项目中使用Junit进行单体测试. pom设定 pom中需要添加spring-boot-starter-test <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>

  • spring boot2.0总结介绍

    从这篇文章开始以spring boot2为主要版本进行使用介绍. Spring boot 2特性 spring boot2在如下的部分有所变化和增强,相关特性在后续逐步展开. 特性增强 基础组件升级: JDK1.8+ tomcat 8+ Thymeleaf 3 Hibernate 5.2 spring framework 5 Reactive Spring Functional API Kotlin支持 Metrics Security 使用变化 配置属性变化 Gradle插件 Actuator

随机推荐