RxJava加Retrofit文件分段上传实现详解

目录
  • 前言
  • 问题拆解
  • 分块并发读取文件
  • 文件片段上传
  • 完整代码

前言

本文基于 RxJava 和 Retrofit 库,设计并实现了一种用于大文件分块上传的工具,并对其进行了全面的拆解分析。抛砖引玉,对同样有处理文件分块上传诉求的读者,可能会起到一定的启发作用。

文章主体由四部分构成:

  • 首先分析问题,问题拆解为:多线程分段读取文件、构建和发出文件片段上传请求
  • 基于 JDK 随机读取文件的类库,设计本地多线程分段读取文件的单元
  • 基于 Retrofit 设计由文件片段构建上传的网络请求
  • 从上述设计演变而来的完整代码实现

  另外,在文章提供的完整代码中,还附了一段由 PHP 编写,用来接收多线程分段数据的服务端接口实现,其中处理了因客户端都线程上传片段,导致服务端接收的文件片段无序,故需在适当时机合并分块构成目标文件。

受限于笔者的开发经验与理论理解,文章的思路和代码难免可能有偏颇,对于有改进和优化的部分,欢迎大家讨论区提出。

问题拆解

要完成文件分段上传到服务端,第一步是分段读取本地文件。通常分段是为了多线程同时执行上传,提高设备计算和网络资源利用率,减少上传时间优化体验,这样即需要一个支持多线程的文件分段读取工具。由于文件可能超过设备内存大小,在读取这类超大文件时需要控制最大读取量防止内存溢出。此时文件已从磁盘数据转换为内存中的字节数据,只需要将这些内存数据传给服务端即可。这样问题被分成 3 个子问题:

  • 分段读取文件到内存中
  • 控制多线程数量
  • 将文件片段传给服务端

问题 1 很好解决,利用 Java 的 RandomAccessFile 可对文件的随机读取的特性,即可按需读取文件片段到内存中。

问题 2 相对复杂一点,但如果有阅读过 JDK 中线程池源码的读者,就会发现这个问题的和控制线程池中线程数量其实是类似的。

问题 3 就不复杂了,Retrofit 基于 OKhttp ,OkHttp是很容易基于字节数组构建 multipart/form-data 请求的。

分块并发读取文件

根据上述对问题 1、2 的拆解,可将读取抽象为一个文件读取器,构建时传入文件对象和分段大小以及最大并发数,以及分段数据的回调。当外部启动读取时将根据文件大小和配置的分段大小构建若干个 Task 用于读取对应片段的数据。

public BlockReader(@NotNull File file, @NotNull BlockCallback callback, int poolSize, int blockSize) {
    mFile = file;
    mCallback = callback;
    mPoolSize = poolSize;
    mBlockSize = blockSize;
}
public void start(@Nullable BlockFilter filter) {
    Observable.empty().observeOn(Schedulers.computation()).doOnComplete(() -> {
        long length = mFile.length();
        for (long offset = 0; offset < length; offset += mBlockSize) {
            if (null != filter && filter.ignore(offset)) {
                continue;
            }
            mQueue.offer(new ReadTask(offset));
        }
        for (int i = 0; i < Math.min(mPoolSize, mQueue.size()); i++) {
            Observable.empty().observeOn(Schedulers.io()).doOnComplete(this::schedule).subscribe();
        }
    }).subscribe();
}

多线程调度部分,可通过加锁和记录状态变量统计当前正运行的线程数,则可控制字节数组数,这样就相当于控制住了最大内存占用。

private void schedule() {
    if (mRunning.get() >= mPoolSize) {
        return;
    }
    ReadTask task;
    synchronized (mQueue) {
        if (mRunning.get() >= mPoolSize) {
            return;
        }
        task = mQueue.poll();
        if (null != task) {
            mRunning.incrementAndGet();
        }
    }
    if (null != task) {
        task.run();
    }
}

最后是文件随机读取,直接调用 RandomAccessFile 的 API 即可:

private class ReadTask implements Action {
    @Override
    public void run() {
        try (RandomAccessFile raf = new RandomAccessFile(mFile, RAF_MODE);
                ByteArrayOutputStream out = new ByteArrayOutputStream(mBlockSize)) {
            raf.seek(mOffset);
            byte[] buf = new byte[DEF_BLOCK_SIZE];
            long cnt = 0;
            for (int bytes = raf.read(buf); bytes != -1 && cnt < mBlockSize; bytes = raf.read(buf)) {
                out.write(buf, 0, bytes);
                cnt += bytes;
            }
            out.flush();
            mCallback.onFinished(mOffset, out.toByteArray());
        } catch (IOException e) {
            mCallback.onFinished(mOffset, null);
        } finally {
            mRunning.decrementAndGet();
            schedule();
        }
    }
}

文件片段上传

上传部分则使用 Retrofit 提供的注解和 OKHttp 的类库构建请求。但值得一提的是需要在磁盘IO线程同步完成网络IO,这样可以避免网络IO速度落后磁盘IO太多而导致任务堆积造成内存溢出。

public interface BlockUploader {
    @POST("test/upload.php")
    @Multipart
    Single<Response<ResponseBody>> upload(@Header("filename") String filename,
                                          @Header("total") long total,
                                          @Header("offset") long offset,
                                          @Part List<MultipartBody.Part> body);
}
private static void syncUpload(String fileName, long fileLength, long offset, byte[] bytes) {
    RequestBody data = RequestBody.create(MediaType.parse("application/octet-stream"), bytes);
    MultipartBody body = new MultipartBody.Builder()
            .addFormDataPart("file", fileName, data)
            .setType(MultipartBody.FORM)
            .build();
    retrofit.create(BlockUploader.class).upload(fileName, fileLength, offset, body.parts()).subscribe(resp -> {
        if (resp.isSuccessful()) {
            System.out.println("✓ offset: " + offset + " upload succeed " + resp.code());
        } else {
            System.out.println("✗ offset: " + offset + " upload failed " + resp.code());
        }
    }, throwable -> {
        System.out.println("! offset: " + offset + " upload failed");
    });
}

完整代码

为控制篇幅,完整代码请移步 Github,服务端部分处理形如:

以上就是RxJava加Retrofit文件分段上传示例的详细内容,更多关于RxJava Retrofit文件上传的资料请关注我们其它相关文章!

(0)

相关推荐

  • 如何利用Retrofit+RxJava实现网络请求的异常处理

    通常情况下我们在与服务器进行通信的时候,不一定就不会出错,有时会出现其他的错误,这个时候我们只要和服务器约定好各种异常,在返回结果处进行判断,到底是执行错误,还是返回正常数据.具体的思路大致就是这样.这里我们定义ExceptionHandle,这里我参考网上的东西,然后稍微做了一些改动. ExceptionHandle public class ExceptionHandle { private static final int UNAUTHORIZED = 401; private stati

  • Retrofit+RxJava实现带进度条的文件下载

    项目中需要使用到更新版本,因此研究了一下Retrofit的下载文件,和进度条效果,其间也遇到了一些坑,写出来加深一下记忆,也为别的同学提供一下思路. 先说一下版本控制吧,通用做法基本上是通过接口获取服务器存储的app版本号,与应用的版本号进行比较,版本较低就去更新,先看一下如何获取应用版本号吧 PackageManager packageManager = mActivity.getPackageManager(); PackageInfo packageInfo = null; try { p

  • RxJava和Retrofit2的统一处理单个请求示例详解

    前言 RxJava和Retrofit2用了一段时间了,写个小例子,分享出来,有什么不对的地方还请大神在评论区指正. 什么是Retrofit2 官网是这么介绍的: Retrofit adapts a Java interface to HTTP calls by using annotations on the declared methods to define how requests are made. 我翻译的可能不准确,他的大概意思是说:Retrofit 是一个 java 接口类,以注解

  • 基于Ok+Rxjava+retrofit实现断点续传下载

    本文为大家分享了实现断点续传下载的具体代码,供大家参考,具体内容如下 1.基于Ok+Rxjava实现断点续传下载 2.基于Ok+Rxjava+Retrofit实现断点续传下载 上一篇博客中介绍了基于Ok+Rxjava实现断点续传下载,这一篇给大家介绍下基于Ok+Rxjava+Retrofit实现断点续传下载,demo下载地址,效果图跟上一篇图片一样,哈哈 说下我的大致思路吧(跟上一篇略有不同):根据文件下载url按照自己定义的规则生成文件名,判断本地同路径下是否存在此文件,如果存在,文件大小与服

  • RxJava+Retrofit实现网络请求封装的方法

    简要介绍 Retrofit是当前应用非常广泛的网络请求框架,通常结合RxJava来进行网络请求,本文将展示一个采用RxJava+Retrofit的网络请求demo. 集成步骤 1.app工程的build.gradle中添加依赖 //retrofit2 implementation 'com.google.code.gson:gson:2.8.5' implementation 'com.squareup.retrofit2:retrofit:2.5.0' implementation 'com.

  • RxJava2和Retrofit2封装教程(整洁、简单、实用)

    前言 RxJava2与Retrofit2是老搭档了,之前写了一篇<RxJava和Retrofit2的统一处理单个请求>,是用的Rxjava1.0,本次使用Rxjava2.0与Retrofit2进行封装,一样整洁.简单.实用.Rxjava2相比Rxjava1优化和改动不少了东西,网上有很多大神写的文章,这里就不粘贴复制了.封装的过程有什么问题.疑问,请在下方留言. 下面话不多说了,来一起看看详细的介绍吧 封装教程如下: 核心网络请求: package com.lin.netrequestdemo

  • RxJava加Retrofit文件分段上传实现详解

    目录 前言 问题拆解 分块并发读取文件 文件片段上传 完整代码 前言 本文基于 RxJava 和 Retrofit 库,设计并实现了一种用于大文件分块上传的工具,并对其进行了全面的拆解分析.抛砖引玉,对同样有处理文件分块上传诉求的读者,可能会起到一定的启发作用. 文章主体由四部分构成: 首先分析问题,问题拆解为:多线程分段读取文件.构建和发出文件片段上传请求 基于 JDK 随机读取文件的类库,设计本地多线程分段读取文件的单元 基于 Retrofit 设计由文件片段构建上传的网络请求 从上述设计演

  • 基于Struts文件上传(FormFile)详解

    Struts中FormFile用于文件进行上传 1.在jsp文件中进行定义 <form action="/StrutsFileUpAndDown/register.do" method="post" enctype="multipart/form-data"> 名字:<input type="text" name="name" /> 头像:<input type="f

  • python中Django文件上传方法详解

    Django上传文件最简单最官方的方法 1.配置media路径 在settings.py中添加如下代码: MEDIA_ROOT = os.path.join(BASE_DIR, 'media') 2.定义数据表 import os from django.db import models from django.utils.timezone import now as timezone_now def upload_to(instance, filename):     now = timezo

  • JavaScript 使用Ckeditor+Ckfinder文件上传案例详解

    目录 一.准备工作 二.解压 三.开始集成 一.准备工作 Ckeditor_4.5.7_full + Ckfinder_java_2.6.0 二.解压 1.解压ckeditor,和平常文件解压相同,正常解压即可 2.解压ckfinder,解压完成后进入ckfinder文件夹下,发现有CKFinderJava-2.6.0.war文件,继续解压. 3.注意看红框部分 三.开始集成 1.准备工作完成,将图1中的ckeditor,及图3中的ckfinder文件夹拷贝到我们自己的项目的WebContent

  • Docker创建自己的镜像与上传流程详解

    目录 引入 了解Docker的资源隔离和主机模式 创建 centos7 容器 部署mysql5.7 创建Docker Hub仓库 生成镜像 提交镜像 引入 在部署毕节服务器时遇到了个问题:   因为在三台服务器做好ovirt-engine超融合后,在主节点服务器上部署可视化平台需要安装mysql5.7,但是安装mysql时需要卸载一些依赖,但是由于怕影响原有的postgresql数据库和ovirt-engine的服务组件,选择了用docker容器部署mysql数据库.   在顺利创建好容器,配置

  • PHP文件上传实例详解!!!

    首先来看下上传部分的表单代码:   复制代码 代码如下: <form method="post" action="upload.php" enctype="multipart/form-data">        <table border=0 cellspacing=0 cellpadding=0 align=center width="100%">         <tr>       

  • jquery组件WebUploader文件上传用法详解

    WebUploader是由Baidu WebFE(FEX)团队开发的一个简单的以HTML5为主,FLASH为辅的现代文件上传组件,下文来为各位演示一下关于jquery WebUploader文件上传组件的用法. 使用WebUploader还可以批量上传文件.支持缩略图等等众多参数选项可设置,以及多个事件方法可调用,你可以随心所欲的定制你要的上传组件. 接下来我以图片上传实例,给大家讲解如何使用WebUploader. HTML 我们首先将css和相关js文件加载. <link rel="s

  • 学习SpringMVC——国际化+上传+下载详解

    一个软件,一个产品,都是一点点开发并完善起来的,功能越来越多,性能越来越强,用户体验越来越好--这每个指标的提高都需要切切实实的做点东西出来,好比,你的这个产品做大了,用的人多了,不仅仅再是上海人用,北京人用,还有印度人用,法国人用等等,可以说这个产品已经走上了国际化的大舞台.当印度的哥们输入url访问产品时,界面上弹出"欢迎您,三哥",估计哥们当场就蒙圈了.而这个时候,国际化就应运而生了. 要做国际化这道菜,真的没有想象中的那么复杂,反而很简单,不信你看-- 1. 注入Resourc

  • VUE学习之Element-ui文件上传实例详解

    目录 引言 单文件上传 和表单一起上传 需求:需同时给后端传文件FormData和其他所需参数 数据的编码方式 补充:代理的使用 什么是代理?为什么要用代理 代理的使用 process.env.NODE_ENV 总结 引言 对于文件上传,在开发主要涉及到以下两个方面: 单个文件上传和表单一起实现上传(这种情况一般都是文件上传之后,后端返回保存在服务器的文件名,最后和我们的表单一起上传) 单文件上传 element-ui中的el-upload组件默认发送post请求,在使用upload组件自动携带

  • jQuery+php实现ajax文件即时上传的详解

    很多项目中需要用到即时上传功能,比如,选择本地图片后,立即上传并显示图像.本文结合实例讲解如何使用jQuery和PHP实现Ajax即时上传文件的功能,用户只需选择本地图片确定后即实现上传,并显示上传进度条,上传完成后,显示图片信息. HTML本示例基于jQuery以及相当出色的jquery.form插件,所以,先要载入jquery库和form插件.<script type="text/javascript" src="jquery.min.js"><

随机推荐