java使用多线程读取超大文件

接上次写的“JAVA读取超大文件”。在读取超过10G的文件时会发现一次读一行的速度实在是不能接受,想到使用多线程+FileChannel来做一个使用多线程版本。

基本思路如下:

1.计算出文件总大小

2.分段处理,计算出每个线程读取文件的开始与结束位置

(文件大小/线程数)*N,N是指第几个线程,这样能得到每个线程在读该文件的大概起始位置

使用"大概起始位置",作为读文件的开始偏移量(fileChannel.position("大概起始位置")),来读取该文件,直到读到第一个换行符,记录下这个换行符的位置,作为该线程的准确起 始位置.同时它也是上一个线程的结束位置.最后一个线程的结束位置也直接设置为-1

3.启动线程,每个线程从开始位置读取到结束位置为止

代码如下:

读文件工具类

import java.io.*;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.util.Observable;

/**
 * Created with IntelliJ IDEA.
 * User: okey
 * Date: 14-4-2
 * Time: 下午3:12
 * 读取文件
 */
public class ReadFile extends Observable {

 private int bufSize = 1024;
 // 换行符
 private byte key = "\n".getBytes()[0];
 // 当前行数
 private long lineNum = 0;
 // 文件编码,默认为gb2312
 private String encode = "gb2312";
 // 具体业务逻辑监听器
 private ReaderFileListener readerListener;

 public void setEncode(String encode) {
  this.encode = encode;
 }

 public void setReaderListener(ReaderFileListener readerListener) {
  this.readerListener = readerListener;
 }

 /**
  * 获取准确开始位置
  * @param file
  * @param position
  * @return
  * @throws Exception
  */
 public long getStartNum(File file, long position) throws Exception {
  long startNum = position;
  FileChannel fcin = new RandomAccessFile(file, "r").getChannel();
  fcin.position(position);
  try {
   int cache = 1024;
   ByteBuffer rBuffer = ByteBuffer.allocate(cache);
   // 每次读取的内容
   byte[] bs = new byte[cache];
   // 缓存
   byte[] tempBs = new byte[0];
   String line = "";
   while (fcin.read(rBuffer) != -1) {
    int rSize = rBuffer.position();
    rBuffer.rewind();
    rBuffer.get(bs);
    rBuffer.clear();
    byte[] newStrByte = bs;
    // 如果发现有上次未读完的缓存,则将它加到当前读取的内容前面
    if (null != tempBs) {
     int tL = tempBs.length;
     newStrByte = new byte[rSize + tL];
     System.arraycopy(tempBs, 0, newStrByte, 0, tL);
     System.arraycopy(bs, 0, newStrByte, tL, rSize);
    }
    // 获取开始位置之后的第一个换行符
    int endIndex = indexOf(newStrByte, 0);
    if (endIndex != -1) {
     return startNum + endIndex;
    }
    tempBs = substring(newStrByte, 0, newStrByte.length);
    startNum += 1024;
   }
  } catch (Exception e) {
   e.printStackTrace();
  } finally {
   fcin.close();
  }
  return position;
 }

 /**
  * 从设置的开始位置读取文件,一直到结束为止。如果 end设置为负数,刚读取到文件末尾
  * @param fullPath
  * @param start
  * @param end
  * @throws Exception
  */
 public void readFileByLine(String fullPath, long start, long end) throws Exception {
  File fin = new File(fullPath);
  if (fin.exists()) {
   FileChannel fcin = new RandomAccessFile(fin, "r").getChannel();
   fcin.position(start);
   try {
    ByteBuffer rBuffer = ByteBuffer.allocate(bufSize);
    // 每次读取的内容
    byte[] bs = new byte[bufSize];
    // 缓存
    byte[] tempBs = new byte[0];
    String line = "";
    // 当前读取文件位置
    long nowCur = start;
    while (fcin.read(rBuffer) != -1) {
     nowCur += bufSize;

     int rSize = rBuffer.position();
     rBuffer.rewind();
     rBuffer.get(bs);
     rBuffer.clear();
     byte[] newStrByte = bs;
     // 如果发现有上次未读完的缓存,则将它加到当前读取的内容前面
     if (null != tempBs) {
      int tL = tempBs.length;
      newStrByte = new byte[rSize + tL];
      System.arraycopy(tempBs, 0, newStrByte, 0, tL);
      System.arraycopy(bs, 0, newStrByte, tL, rSize);
     }
     // 是否已经读到最后一位
     boolean isEnd = false;
     // 如果当前读取的位数已经比设置的结束位置大的时候,将读取的内容截取到设置的结束位置
     if (end > 0 && nowCur > end) {
      // 缓存长度 - 当前已经读取位数 - 最后位数
      int l = newStrByte.length - (int) (nowCur - end);
      newStrByte = substring(newStrByte, 0, l);
      isEnd = true;
     }
     int fromIndex = 0;
     int endIndex = 0;
     // 每次读一行内容,以 key(默认为\n) 作为结束符
     while ((endIndex = indexOf(newStrByte, fromIndex)) != -1) {
      byte[] bLine = substring(newStrByte, fromIndex, endIndex);
      line = new String(bLine, 0, bLine.length, encode);
      lineNum++;
      // 输出一行内容,处理方式由调用方提供
      readerListener.outLine(line.trim(), lineNum, false);
      fromIndex = endIndex + 1;
     }
     // 将未读取完成的内容放到缓存中
     tempBs = substring(newStrByte, fromIndex, newStrByte.length);
     if (isEnd) {
      break;
     }
    }
    // 将剩下的最后内容作为一行,输出,并指明这是最后一行
    String lineStr = new String(tempBs, 0, tempBs.length, encode);
    readerListener.outLine(lineStr.trim(), lineNum, true);
   } catch (Exception e) {
    e.printStackTrace();
   } finally {
    fcin.close();
   }

  } else {
   throw new FileNotFoundException("没有找到文件:" + fullPath);
  }
  // 通知观察者,当前工作已经完成
  setChanged();
  notifyObservers(start+"-"+end);
 }

 /**
  * 查找一个byte[]从指定位置之后的一个换行符位置
  *
  * @param src
  * @param fromIndex
  * @return
  * @throws Exception
  */
 private int indexOf(byte[] src, int fromIndex) throws Exception {

  for (int i = fromIndex; i < src.length; i++) {
   if (src[i] == key) {
    return i;
   }
  }
  return -1;
 }

 /**
  * 从指定开始位置读取一个byte[]直到指定结束位置为止生成一个全新的byte[]
  *
  * @param src
  * @param fromIndex
  * @param endIndex
  * @return
  * @throws Exception
  */
 private byte[] substring(byte[] src, int fromIndex, int endIndex) throws Exception {
  int size = endIndex - fromIndex;
  byte[] ret = new byte[size];
  System.arraycopy(src, fromIndex, ret, 0, size);
  return ret;
 }

}

读文件线程

/**
 * Created with IntelliJ IDEA.
 * User: okey
 * Date: 14-4-2
 * Time: 下午4:50
 * To change this template use File | Settings | File Templates.
 */
public class ReadFileThread extends Thread {

 private ReaderFileListener processPoiDataListeners;
 private String filePath;
 private long start;
 private long end;

 public ReadFileThread(ReaderFileListener processPoiDataListeners,long start,long end,String file) {
  this.setName(this.getName()+"-ReadFileThread");
  this.start = start;
  this.end = end;
  this.filePath = file;
  this.processPoiDataListeners = processPoiDataListeners;
 }

 @Override
 public void run() {
  ReadFile readFile = new ReadFile();
  readFile.setReaderListener(processPoiDataListeners);
  readFile.setEncode(processPoiDataListeners.getEncode());
//  readFile.addObserver();
  try {
   readFile.readFileByLine(filePath, start, end + 1);
  } catch (Exception e) {
   e.printStackTrace();
  }
 }
}

具体业务逻辑监听

/**
 * Created with Okey
 * User: Okey
 * Date: 13-3-14
 * Time: 下午3:19
 * NIO逐行读数据回调方法
 */
public abstract class ReaderFileListener {

 // 一次读取行数,默认为500
 private int readColNum = 500;

 private String encode;

 private List<String> list = new ArrayList<String>();

 /**
  * 设置一次读取行数
  * @param readColNum
  */
 protected void setReadColNum(int readColNum) {
  this.readColNum = readColNum;
 }

 public String getEncode() {
  return encode;
 }

 public void setEncode(String encode) {
  this.encode = encode;
 }

 /**
  * 每读取到一行数据,添加到缓存中
  * @param lineStr 读取到的数据
  * @param lineNum 行号
  * @param over 是否读取完成
  * @throws Exception
  */
 public void outLine(String lineStr, long lineNum, boolean over) throws Exception {
  if(null != lineStr)
   list.add(lineStr);
  if (!over && (lineNum % readColNum == 0)) {
   output(list);
   list.clear();
  } else if (over) {
   output(list);
   list.clear();
  }
 }

 /**
  * 批量输出
  *
  * @param stringList
  * @throws Exception
  */
 public abstract void output(List<String> stringList) throws Exception;

}

线程调度

import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;

/**
 * Created with IntelliJ IDEA.
 * User: okey
 * Date: 14-4-1
 * Time: 下午6:03
 * To change this template use File | Settings | File Templates.
 */
public class BuildData {
 public static void main(String[] args) throws Exception {
  File file = new File("E:\\1396341974289.csv");
  FileInputStream fis = null;
  try {
   ReadFile readFile = new ReadFile();
   fis = new FileInputStream(file);
   int available = fis.available();
   int maxThreadNum = 50;
   // 线程粗略开始位置
   int i = available / maxThreadNum;
   for (int j = 0; j < maxThreadNum; j++) {
    // 计算精确开始位置
    long startNum = j == 0 ? 0 : readFile.getStartNum(file, i * j);
    long endNum = j + 1 < maxThreadNum ? readFile.getStartNum(file, i * (j + 1)) : -2;
    // 具体监听实现
    ProcessDataByPostgisListeners listeners = new ProcessDataByPostgisListeners("gbk");
    new ReadFileThread(listeners, startNum, endNum, file.getPath()).start();
   }
  } catch (IOException e) {
   e.printStackTrace();
  } catch (Exception e) {
   e.printStackTrace();
  }
 }
}

现在就可以尽情的调整 maxThreadNum来享受风一般的速度吧!

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持我们。

(0)

相关推荐

  • Java读取、写入文件如何解决乱码问题

    读取文件流时,经常会遇到乱码的现象,造成乱码的原因当然不可能是一个,这里主要介绍因为文件编码格式而导致的乱码的问题.首先,明确一点,文本文件与二进制文件的概念与差异. 文本文件是基于字符编码的文件,常见的编码有ASCII编码,UNICODE编码.ANSI编码等等.二进制文件是基于值编码的文件,你可以根据具体应用,指定某个值是什么意思(这样一个过程,可以看作是自定义编码.) 因此可以看出文本文件基本上是定长编码的(也有非定长的编码如UTF-8).而二进制文件可看成是变长编码的,因为是值编码嘛,多少

  • java读取resource目录下文件的方法示例

    本文主要介绍的是java读取resource目录下文件的方法,比如这是你的src目录的结构 ├── main │ ├── java │ │ └── com │ │ └── test │ │ └── core │ │ ├── bean │ │ ├── Test.java │ └── resources │ └── test │ ├── test.txt └── test └── java 我们希望在Test.java中读取test.txt文件中的内容,那么我们可以借助Guava库的Resource

  • java实现读取txt文件中的内容

    我们先来看个例子 import java.io.*; /** * Created by liguoqing on 2016/3/28. */ public class ReadTxtFile { public static void readTxt(String filePath) { try { File file = new File(filePath); if(file.isFile() && file.exists()) { InputStreamReader isr = new

  • Java读取文件的简单实现方法

    本文实例讲述了Java读取文件的简单实现方法,非常实用.分享给大家供大家参考之用.具体方法如下: 这是一个简单的读取文件的代码,并试着读取一个log文件,再输出. 主要代码如下: import java.io.*; public class FileToString { public static String readFile(String fileName) { String output = ""; File file = new File(fileName); if(file.

  • java多线程有序读取同一个文件

    本人刚参加工作,面试的时候遇四道笔试题,其中就有这道多线程有序读取文件的题目,初看时拿不准,感觉会,又感觉不会.于是放弃了这道题,今天闲下来好好做一遍. //定义一个runnable接口的实现类 import java.io.File; import java.io.FileNotFoundException; import java.io.IOException; import java.io.RandomAccessFile; public class RunnableImpl implem

  • Java数据导入功能之读取Excel文件实例

    在编程中经常需要使用到表格(报表)的处理主要以Excel表格为主.下面给出用java读取excel表格方法: 1.添加jar文件 java导入导出Excel文件要引入jxl.jar包,最关键的是这套API是纯Java的,并不依赖Windows系统,即使运行在Linux下,它同样能够正确的处理Excel文件.下载地址:http://www.andykhan.com/jexcelapi/ 2.jxl对Excel表格的认识 (1)每个单元格的位置认为是由一个二维坐标(i,j)给定,其中i表示列,j表示

  • JAVA中读取文件(二进制,字符)内容的几种方法总结

    JAVA中读取文件内容的方法有很多,比如按字节读取文件内容,按字符读取文件内容,按行读取文件内容,随机读取文件内容等方法,本文就以上方法的具体实现给出代码,需要的可以直接复制使用 public class ReadFromFile { /** * 以字节为单位读取文件,常用于读二进制文件,如图片.声音.影像等文件. */ public static void readFileByBytes(String fileName) { File file = new File(fileName); In

  • java读取csv文件示例分享(java解析csv文件)

    复制代码 代码如下: import java.io.*;import java.util.*;public class HandleCsv {public static void main(String[] args) throws IOException {BufferedReader br = new BufferedReader(   new InputStreamReader(    new FileInputStream("test.csv")   )); String li

  • Java如何读取XML文件 具体实现

    今天的CSDN常见问题来讲解下在Java中如何读取XML文件的内容. 直接上代码吧,注释写的很清楚了! 复制代码 代码如下: import java.io.*;import javax.xml.parsers.DocumentBuilder;import javax.xml.parsers.DocumentBuilderFactory;import org.w3c.dom.Document;import org.w3c.dom.Element;import org.w3c.dom.Node;im

  • java读取文件显示进度条的实现方法

    实现这个功能比较简单,用到的类有两个:ProgressMonitorInputStream(主要是整个类) 和 ProgressMonitor ,它们在javax.swing中大体思路,你要首先知道整个文件的大小,和当前已经读取文件的大小,获得整个文件大小的方法 复制代码 代码如下: ProgressMonitorInputStream monitor;/*** @param 表示此进度条要依附在哪个组件上* @param 显示在此进度条上的消息* @param 需要监控的输入流*/monito

  • Java读取Excel文件内容的简单实例

    借助于apathe的poi.jar,由于上传文件不支持.jar所以请下载后将文件改为.jar,在应用程序中添加poi.jar包,并将需要读取的excel文件放入根目录即可 本例使用java来读取excel的内容并展出出结果,代码如下: 复制代码 代码如下: import java.io.BufferedInputStream;import java.io.File;import java.io.FileInputStream;import java.io.FileNotFoundExceptio

随机推荐