多线程文件下载的服务器端及客户端
1 服务器端
简要说明:
1.1 根据接收到的文件id,读取服务器端的文件。
1.2 判断request中的head是否有“Range”,如果有,代表需要支持断点续传功能,否则不需要。
1.3 根据Range的值判断,如 (bytes=1000-)或(bytes=1000-2000)分别代表第1000字节以后的值,及1000到2000字节之间的值。
1.4 根据Range的值读取文件中相应的字节返回。
/** * 下载文件(支持单点续传下载) * * @param request * @param fileId * @throws IOException */ @RequestMapping(value = "/file/downloadMultiThread/{fileId}", method = RequestMethod.GET) public void download3(HttpServletRequest request, HttpServletResponse response, @PathVariable("fileId") int fileId) throws IOException { if (isTest) { Enumerationenums = request.getHeaderNames(); while (enums.hasMoreElements()) { String names = enums.nextElement(); if (isTest) { System.out.println(names + ":[" + request.getHeader(names) + "]"); } } } CmnTmpFile cmnTmpFile = cmnTmpFileSrvc.getCmnTmpFileByKeyId(fileId); if (cmnTmpFile == null) { return; } try { if (isTest) { log.info("请求下载的连接地址为:[" + request.getRequestURL() + "]?[" + request.getQueryString() + "]"); } } catch (IllegalArgumentException e) { log.error("请求下载的文件名参数为空!"); return; } File downloadFile = new File(cmnTmpFile.getFilePath()); if (downloadFile.exists()) { if (downloadFile.isFile()) { if (downloadFile.length() > 0) { } else { log.info("请求下载的文件是一个空文件"); return; } if (!downloadFile.canRead()) { log.info("请求下载的文件不是一个可读的文件"); return; } else { } } else { log.info("请求下载的文件是一个文件夹"); return; } } else { log.info("请求下载的文件不存在!"); return; } // 记录文件大小 long fileLength = downloadFile.length(); // 记录已下载文件大小 long pastLength = 0; // 0:从头开始的全文下载; // 1:从某字节开始的下载(bytes=1000-); // 2:从某字节开始到某字节结束的下载(bytes=1000-2000) int rangeSwitch = 0; // 记录客户端需要下载的字节段的最后一个字节偏移量(比如bytes=1000-2000,则这个值是为2000) long toLength = 0; // 客户端请求的字节总量 long contentLength = 0; // 记录客户端传来的形如“bytes=1000-”或者“bytes=1000-2000”的内容 String rangeBytes = ""; // 负责读取数据 RandomAccessFile raf = null; // 写出数据 OutputStream os = null; // 缓冲 OutputStream out = null; // 暂存容器 byte b[] = new byte[1024]; if (request.getHeader("Range") != null) { // 客户端请求的下载的文件块的开始字节 response.setStatus(javax.servlet.http.HttpServletResponse.SC_PARTIAL_CONTENT); if (isTest) { log.info("request.getHeader(\"Range\")=" + request.getHeader("Range")); } rangeBytes = request.getHeader("Range").replaceAll("bytes=", ""); if (rangeBytes.indexOf('-') == rangeBytes.length() - 1) { // 如:bytes=1000- rangeSwitch = 1; rangeBytes = rangeBytes.substring(0, rangeBytes.indexOf('-')); pastLength = Long.parseLong(rangeBytes.trim()); // 客户端请求的是 1000之后的字节 contentLength = fileLength - pastLength; } else { // 如:bytes=1000-2000 rangeSwitch = 2; String temp0 = rangeBytes.substring(0, rangeBytes.indexOf('-')); String temp2 = rangeBytes.substring(rangeBytes.indexOf('-') + 1, rangeBytes.length()); // bytes=1000-2000,从第1000个字节开始下载 pastLength = Long.parseLong(temp0.trim()); // bytes=1000-2000,到第2000个字节结束 toLength = Long.parseLong(temp2); // 客户端请求的是1000-2000之间的字节 contentLength = toLength - pastLength; } } else { // 从开始进行下载,客户端要求全文下载 contentLength = fileLength; } /** * 如果设设置了Content -Length,则客户端会自动进行多线程下载。如果不希望支持多线程,则不要设置这个参数。 响应的格式是: * Content - Length: [文件的总大小] - [客户端请求的下载的文件块的开始字节] * ServletActionContext.getResponse().setHeader("Content- Length", new * Long(file.length() - p).toString()); */ response.reset(); // 告诉客户端允许断点续传多线程连接下载,响应的格式是:Accept-Ranges: bytes response.setHeader("Accept-Ranges", "bytes"); // 如果是第一次下,还没有断点续传,状态是默认的 200,无需显式设置;响应的格式是:HTTP/1.1 200 OK if (pastLength != 0) { // 不是从最开始下载,响应的格式是: // Content-Range: bytes [文件块的开始字节]-[文件的总大小 - 1]/[文件的总大小] if (isTest) { log.info("---------不是从开始进行下载!服务器即将开始断点续传..."); } String contentRange = ""; switch (rangeSwitch) { case 1: // 针对 bytes=1000- 的请求 contentRange = new StringBuffer("bytes ").append(new Long(pastLength).toString()).append("-") .append(new Long(fileLength - 1).toString()).append("/").append(new Long(fileLength).toString()) .toString(); response.setHeader("Content-Range", contentRange); break; case 2: // 针对 bytes=1000-2000 的请求 contentRange = rangeBytes + "/" + new Long(fileLength).toString(); response.setHeader("Content-Range", contentRange); break; default: break; } } else { // 是从开始下载 if (isTest) { log.info("---------是从开始进行下载!"); } } try { response.addHeader("Content-Disposition", "attachment; filename=\"" + downloadFile.getName() + "\""); // 设置 MIME 类型. response.setContentType(CommonUtil.setContentType(downloadFile.getName())); response.addHeader("Content-Length", String.valueOf(contentLength)); os = response.getOutputStream(); out = new BufferedOutputStream(os); raf = new RandomAccessFile(downloadFile, "r"); int readNum = 0; long readLength = 0; try { switch (rangeSwitch) { case 0: // 普通下载,或者从头开始的下载,同1 case 1: // 针对 bytes=1000- 的请求 // 形如 bytes=1000- 的客户端请求,跳过 1000 个字节 raf.seek(pastLength); readNum = 0; while ((readNum = raf.read(b, 0, 1024)) != -1) { out.write(b, 0, readNum); } break; case 2: // 针对 bytes=2000-3000 的请求 // 形如 bytes=2000-3000 的客户端请求,找到第 2000 个字节 raf.seek(pastLength); readNum = 0; readLength = 0; // 记录已读字节数 while (readLength <= contentLength - 1024) { // 大部分字节在这里读取 readNum = raf.read(b, 0, 1024); readLength += 1024; out.write(b, 0, readNum); } if (readLength <= contentLength) { // 余下的不足 1024 个字节在这里读取 readNum = raf.read(b, 0, (int) (contentLength - readLength)); out.write(b, 0, readNum); } break; default: break; } out.flush(); if (isTest) { log.info("-----------下载结束"); } } catch (IOException ie) { /** * 在写数据的时候, 对于 ClientAbortException 之类的异常, * 是因为客户端取消了下载,而服务器端继续向浏览器写入数据时,抛出这个异常,这个是正常的。 */ if (isTest) { log.info("向客户端传输时出现IO异常,但此异常是允许的,有可能是客户端取消了下载,导致此异常"); } } } catch (Exception e) { log.error(e.getMessage(), e); } finally { if (out != null) { try { out.close(); } catch (IOException e) { // 远程主机或者本机强制关闭 // log.error(e.getMessage(), e); } finally { out = null; } } if (raf != null) { try { raf.close(); } catch (IOException e) { log.error(e.getMessage(), e); } finally { raf = null; } } } } /** * 打包压缩下载文件 * * @param response * @param fileId * @throws IOException */ @RequestMapping(value = "/file/downLoadZipFile/{fileId}", method = RequestMethod.GET) public void downLoadZipFile(HttpServletResponse response, @PathVariable("fileId") int fileId) throws IOException { CmnTmpFile cmnTmpFile = cmnTmpFileSrvc.getCmnTmpFileByKeyId(fileId); if (cmnTmpFile == null) { return; } File file = new File(cmnTmpFile.getFilePath()); response.setContentType("APPLICATION/OCTET-STREAM"); response.setHeader("Content-Disposition", "attachment; filename=" + cmnTmpFile.getFileName() + ".zip"); ZipOutputStream out = new ZipOutputStream(response.getOutputStream()); try { long start = System.currentTimeMillis(); if (isTest) { log.info("----------开始下载文件,文件长度[" + file.length() + "]"); } // 压缩输出文件 // ZipUtils.doCompress(cmnTmpFile.getFilePath(), out); ZipUtils.doCompress(file, out); response.flushBuffer(); if (isTest) { System.out.println("耗时:[" + (System.currentTimeMillis() - start) + "]ms"); log.info("----------压缩下载文件完成"); } } catch (Exception e) { e.printStackTrace(); } finally { out.close(); } } /** * 下载文件(常规单线程下载) * * @param request * @param fileId * @throws IOException */ @RequestMapping(value = "/file/download/{fileId}", method = RequestMethod.GET) public void download(HttpServletRequest request, HttpServletResponse response, @PathVariable("fileId") int fileId) throws IOException { // 测试Header if (isTest) { Enumeration enums = request.getHeaderNames(); while (enums.hasMoreElements()) { String names = enums.nextElement(); if (isTest) { System.out.println(names + ":[" + request.getHeader(names) + "]"); } } } CmnTmpFile cmnTmpFile = cmnTmpFileSrvc.getCmnTmpFileByKeyId(fileId); if (cmnTmpFile == null) { return; } File file = new File(cmnTmpFile.getFilePath()); // HttpHeaders headers = new HttpHeaders(); // headers.setContentType(MediaType.APPLICATION_OCTET_STREAM); // headers.setContentDispositionFormData("attachment", // cmnTmpFile.getFileName()); try { long start = System.currentTimeMillis(); if (isTest) { log.info("----------开始下载文件,文件长度[" + file.length() + "]"); } if (file.exists()) { String filename = file.getName(); InputStream fis = new BufferedInputStream(new FileInputStream(file)); response.reset(); response.setContentType("application/x-download"); response.addHeader("Content-Disposition", "attachment;filename=" + new String(filename.getBytes(), "iso-8859-1")); response.addHeader("Content-Length", "" + file.length()); OutputStream toClient = new BufferedOutputStream(response.getOutputStream()); response.setContentType("application/octet-stream"); byte[] buffer = new byte[1024 * 1024 * 4]; int i = -1; while ((i = fis.read(buffer)) != -1) { toClient.write(buffer, 0, i); } fis.close(); toClient.flush(); toClient.close(); try { response.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } if (isTest) { System.out.println("耗时:[" + (System.currentTimeMillis() - start) + "]ms"); log.info("----------下载文件完成"); } } catch (IOException ex) { ex.printStackTrace(); } }
压缩应用类代码如下:
package cn.xaele.utils.zip; import java.io.File; import java.io.FileInputStream; import java.io.FileOutputStream; import java.io.IOException; import java.util.zip.ZipEntry; import java.util.zip.ZipOutputStream; /** * 文件下载工具类 */ public class ZipUtils { public static void doCompress(String srcFile, String zipFile) throws Exception { doCompress(new File(srcFile), new File(zipFile)); } /** * 文件压缩 * * @param srcFile 目录或者单个文件 * @param destFile 压缩后的ZIP文件 */ public static void doCompress(File srcFile, File destFile) throws Exception { ZipOutputStream out = new ZipOutputStream(new FileOutputStream(destFile)); if (srcFile.isDirectory()) { File[] files = srcFile.listFiles(); for (File file : files) { doCompress(file, out); } } else { doCompress(srcFile, out); } } public static void doCompress(String pathname, ZipOutputStream out) throws IOException { doCompress(new File(pathname), out); } public static void doCompress(File file, ZipOutputStream out) throws IOException { if (file.exists()) { byte[] buffer = new byte[1024]; FileInputStream fis = new FileInputStream(file); out.putNextEntry(new ZipEntry(file.getName())); int len = 0; // 读取文件的内容,打包到zip文件 while ((len = fis.read(buffer)) > 0) { out.write(buffer, 0, len); } out.flush(); out.closeEntry(); fis.close(); } } }
2 客户端
简要说明:
2.1 客户端要下载一个文件, 先请求服务器,然后服务器将文件传送给客户端,最后客户端保存到本地, 完成下载过程。
2.2 多线程下载是在客户端开启多个线程同时下载,每个线程只负责下载文件其中的一部分,在客户端进行文件组装,当所有线程下载完的时候,文件下载完毕。 (注意:并非线程越多下载越快, 与网络环境有很大的关系;在同等的网络环境下,多线程下载速度要高于单线程;多线程下载占用资源比单线程多,相当于用资源换速度)。
2.3 下载前,首先要获取要下载文件的大小,然后用RandomAccessFile 类在客户端磁盘创建一个同样大小的文件,下载后的数据写入这个文件。
2.4 在每个线程分配的下载任务中包含待下载文件取值的开始和结束位置,然后启动下载线程下载数据。
2.5 判断有没有保存上次下载的临时文件,如果有,判断是否需要删除或重新启动下载线程;
2.6 在启动线程下载的时候保存下载文件的位置信息,下载完毕后删除当前线程产生的临时文件。
package cn.xaele.utils.file; import java.io.File; import java.io.IOException; import java.io.RandomAccessFile; import java.net.HttpURLConnection; import java.net.URL; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import org.apache.http.impl.client.CloseableHttpClient; import org.apache.http.impl.client.HttpClients; import org.apache.http.impl.conn.PoolingHttpClientConnectionManager; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import cn.xaele.utils.StringUtils; public class DownLoadLargeFile { private static final Logger log = LogManager.getLogger(DownLoadLargeFile.class); /** * 每个线程下载的字节数 */ private long unitSize = 1000 * 1024; private CloseableHttpClient httpClient; public static void main(String[] args) { long starttimes = System.currentTimeMillis(); DownLoadLargeFile app = new DownLoadLargeFile(); PoolingHttpClientConnectionManager cm = new PoolingHttpClientConnectionManager(); // 设置整个连接池最大连接数 20 cm.setMaxTotal(20); app.httpClient = HttpClients.custom().setConnectionManager(cm).build(); try { app.doDownload(); } catch (IOException e) { e.printStackTrace(); } System.out.println((System.currentTimeMillis() - starttimes) + "ms"); } /** * * 启动多个线程下载文件 */ public void doDownload() throws IOException { String remoteFileUrl = "http://127.0.0.1:80/file/downloadMultiThread/2"; String localPath = "d:/doctohtml/"; String fileName = new URL(remoteFileUrl).getFile(); System.out.println("下载文件名称:" + fileName); fileName = fileName.substring(fileName.lastIndexOf("/") + 1, fileName.length()).replace("%20", " "); System.out.println("本地文件名称:" + fileName); long fileSize = this.getRemoteFileSize(remoteFileUrl); this.createFile(localPath + System.currentTimeMillis() + fileName, fileSize); Long threadCount = (fileSize / unitSize) + (fileSize % unitSize != 0 ? 1 : 0); long offset = 0; CountDownLatch end = new CountDownLatch(threadCount.intValue()); ExecutorService cachedThreadPool = Executors.newCachedThreadPool(); if (fileSize <= unitSize) { // 如果下载文件尺寸小于等于unitSize DownloadThread downloadThread = new DownloadThread(remoteFileUrl, localPath + fileName, offset, fileSize, end, httpClient); cachedThreadPool.execute(downloadThread); } else { // 如果下载文件尺寸大于unitSize for (int i = 1; i < threadCount; i++) { DownloadThread downloadThread = new DownloadThread(remoteFileUrl, localPath + fileName, offset, unitSize, end, httpClient); cachedThreadPool.execute(downloadThread); offset = offset + unitSize; } if (fileSize % unitSize != 0) { // 如果不能整除,则需要再创建一个线程下载剩余字节 DownloadThread downloadThread = new DownloadThread(remoteFileUrl, localPath + fileName, offset, fileSize - unitSize * (threadCount - 1), end, httpClient); cachedThreadPool.execute(downloadThread); } } try { end.await(); } catch (InterruptedException e) { e.printStackTrace(); } log.debug("下载完成!{} ", localPath + fileName); } /** * 获取下载文件字节数 * * @param remoteFileUrl * @return * @throws IOException */ private long getRemoteFileSize(String remoteFileUrl) throws IOException { long fileSize = 0; HttpURLConnection httpConnection = (HttpURLConnection) new URL(remoteFileUrl).openConnection(); httpConnection.setRequestMethod("HEAD"); int responseCode = httpConnection.getResponseCode(); if (responseCode >= 400) { log.debug("Web服务器响应错误!"); return 0; } String field = httpConnection.getHeaderField("Content-Length"); if (field != null && StringUtils.isLong(field)) { System.out.println("文件大小:[" + field + "]"); fileSize = Long.parseLong(field); } return fileSize; } /** * 创建指定大小的临时文件 */ private void createFile(String fileName, long fileSize) throws IOException { File newFile = new File(fileName); RandomAccessFile raf = new RandomAccessFile(newFile, "rw"); raf.setLength(fileSize); raf.close(); } }
下载进程代码如下:
package cn.xaele.utils.file; import java.io.BufferedInputStream; import java.io.File; import java.io.IOException; import java.io.RandomAccessFile; import java.util.concurrent.CountDownLatch; import org.apache.commons.lang3.exception.ExceptionUtils; import org.apache.http.client.ClientProtocolException; import org.apache.http.client.methods.CloseableHttpResponse; import org.apache.http.client.methods.HttpGet; import org.apache.http.impl.client.CloseableHttpClient; import org.apache.http.protocol.BasicHttpContext; import org.apache.http.protocol.HttpContext; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; public class DownloadThread extends Thread { private static final Logger log = LogManager.getLogger(DownloadThread.class); /** * 待下载的文件 */ private String url = null; /** * 本地文件名 */ private String fileName = null; /** * 偏移量 */ private long offset = 0; /** * 分配给本线程的下载字节数 */ private long length = 0; private CountDownLatch end; private CloseableHttpClient httpClient; private HttpContext context; /** * * @param url 下载文件地址 * @param fileName 另存文件名 * @param offset 本线程下载偏移量 * @param length 本线程下载长度 */ public DownloadThread(String url, String file, long offset, long length, CountDownLatch end, CloseableHttpClient httpClient) { this.url = url; this.fileName = file; this.offset = offset; this.length = length; this.end = end; this.httpClient = httpClient; this.context = new BasicHttpContext(); log.debug("偏移量=" + offset + ";字节数=" + length); } public void run() { try { HttpGet httpGet = new HttpGet(this.url); httpGet.addHeader("Range", "bytes=" + this.offset + "-" + (this.offset + this.length - 1)); CloseableHttpResponse response = httpClient.execute(httpGet, context); BufferedInputStream bis = new BufferedInputStream(response.getEntity().getContent()); byte[] buff = new byte[1024]; int bytesRead; File newFile = new File(fileName); RandomAccessFile raf = new RandomAccessFile(newFile, "rw"); while ((bytesRead = bis.read(buff, 0, buff.length)) != -1) { raf.seek(this.offset); raf.write(buff, 0, bytesRead); this.offset = this.offset + bytesRead; } raf.close(); bis.close(); } catch (ClientProtocolException e) { log.error("DownloadThread exception msg:{}", ExceptionUtils.getStackTrace(e)); } catch (IOException e) { log.error("DownloadThread exception msg:{}", ExceptionUtils.getStackTrace(e)); } finally { end.countDown(); log.info(end.getCount() + " is go on!"); } } }