Python实战:构建支持断点续传的多线程大文件下载器

张开发
2026/4/17 17:45:38 15 分钟阅读

分享文章

Python实战:构建支持断点续传的多线程大文件下载器
1. 为什么需要多线程断点续传下载器在日常开发中我们经常遇到需要下载大文件的需求。比如从云存储平台拉取数据集、备份服务器日志文件或者下载高清视频素材。当文件体积达到GB级别时传统的单线程下载方式会面临三个致命问题首先是下载速度慢。我实测过一个3GB的文件单线程下载耗时接近1小时而改用10个线程后只需6分钟。这是因为多线程能够充分利用带宽每个线程负责下载文件的不同部分最后合并成完整文件。其次是网络不稳定。去年我在下载一个开源镜像时连续失败了3次——每次都是在90%进度时连接中断。没有断点续传功能的下载器会从头开始重试既浪费时间又消耗服务器资源。最后是内存占用高。单线程下载大文件时需要将整个文件内容加载到内存而分块下载可以按需写入磁盘。上周我用requests直接下载2GB文件导致程序崩溃就是因为内存不足。2. 核心设计思路与关键技术2.1 HTTP范围请求机制现代HTTP服务器都支持Range头字段这是实现分块下载的基础。通过发送HEAD请求我们可以检查服务器是否支持断点续传import requests url https://example.com/large_file.zip response requests.head(url) print(response.headers.get(Accept-Ranges)) # 输出应为bytes关键响应头说明Content-Length: 文件总字节数Accept-Ranges: 服务器支持的范围请求单位通常是bytes2.2 文件分片策略合理的分片策略直接影响下载效率。我的经验是每个分片大小建议在5-10MB之间线程数不超过CPU核心数的2倍动态调整分片大小网络好时增大分片def calculate_ranges(total_size, chunk_num10): chunk_size total_size // chunk_num ranges [] for i in range(chunk_num): start i * chunk_size end start chunk_size - 1 if i chunk_num - 1 else total_size - 1 ranges.append((start, end)) return ranges2.3 多线程协同下载使用ThreadPoolExecutor管理下载线程最方便。这里有个坑要注意必须用rb模式打开文件否则多线程写入会冲突from concurrent.futures import ThreadPoolExecutor def download_chunk(url, file_path, start, end): headers {Range: fbytes{start}-{end}} response requests.get(url, headersheaders, streamTrue) with open(file_path, rb) as f: f.seek(start) for chunk in response.iter_content(chunk_size8192): f.write(chunk) with ThreadPoolExecutor(max_workers5) as executor: futures [] for start, end in ranges: futures.append(executor.submit(download_chunk, url, save_path, start, end))3. 断点续传实现方案3.1 进度持久化存储推荐使用JSON记录下载状态。我在实际项目中是这样设计的{ url: https://example.com/file.zip, file_size: 1024000, chunks: [ {start: 0, end: 99999, completed: true}, {start: 100000, end: 199999, completed: false} ], save_path: /downloads/file.zip }3.2 异常恢复机制关键是要处理三种异常情况网络中断捕获requests.exceptions.ConnectionError磁盘空间不足检查os.statvfs().f_bavail服务器拒绝处理HTTP 416错误码try: download_chunk(url, save_path, start, end) except requests.exceptions.RequestException as e: logging.error(f下载分片{start}-{end}失败: {str(e)}) update_progress(save_path, start, end, False)3.3 完整性校验下载完成后必须验证文件比较文件大小与Content-Length计算MD5校验和如果服务器提供ETag分片哈希比对import hashlib def verify_file(file_path, expected_size, expected_md5None): actual_size os.path.getsize(file_path) if actual_size ! expected_size: return False if expected_md5: md5 hashlib.md5() with open(file_path, rb) as f: while chunk : f.read(8192): md5.update(chunk) return md5.hexdigest() expected_md5 return True4. 完整实现与优化技巧4.1 工程化代码结构建议采用面向对象设计这是我常用的类结构class ResumableDownloader: def __init__(self, url, workers5): self.url url self.workers workers self.progress_file None self.lock threading.Lock() def start(self): self._load_progress() self._create_temp_file() self._download_chunks() self._verify_file() def _download_chunks(self): with ThreadPoolExecutor(self.workers) as executor: futures [] for chunk in self.progress[chunks]: if not chunk[completed]: future executor.submit( self._download_single_chunk, chunk[start], chunk[end] ) futures.append(future) for future in as_completed(futures): future.result()4.2 性能优化建议经过多次测试我总结出这些优化点适当增大chunk_size默认8KB可以提升到64KB使用连接池requests.Session禁用SSL验证仅限内网环境设置合理的超时时间session requests.Session() adapter requests.adapters.HTTPAdapter( pool_connections20, pool_maxsize20, max_retries3 ) session.mount(https://, adapter) response session.get( url, streamTrue, verifyFalse, timeout(10, 30) )4.3 错误处理最佳实践这些是我踩过的坑临时文件命名加上.part后缀下载目录要有写权限检查处理Windows下的路径分隔符问题清理中断的临时文件def _create_temp_file(self): temp_path f{self.save_path}.part if not os.path.exists(os.path.dirname(temp_path)): os.makedirs(os.path.dirname(temp_path)) if not os.path.exists(temp_path): with open(temp_path, wb) as f: f.truncate(self.file_size)5. 异步IO实现方案对于IO密集型任务异步方案更高效。我推荐使用aiohttpasyncioimport aiohttp import asyncio async def async_download(session, url, save_path, start, end): headers {Range: fbytes{start}-{end}} async with session.get(url, headersheaders) as response: with open(save_path, rb) as f: f.seek(start) while True: chunk await response.content.read(8192) if not chunk: break f.write(chunk) async def main(): connector aiohttp.TCPConnector(limit10) async with aiohttp.ClientSession(connectorconnector) as session: tasks [] for start, end in ranges: tasks.append(async_download(session, url, save_path, start, end)) await asyncio.gather(*tasks)这种方案在下载大量小文件时特别有效我在测试中发现比多线程版本快30%左右。不过要注意异步编程需要处理更多边界情况比如信号量控制和错误重试机制。

更多文章