Python全栈入门到实战【进阶篇 11】Python线程池编程:从入门到实战(附批量爬虫/文件处理实战)

张开发
2026/4/19 4:25:27 15 分钟阅读

分享文章

Python全栈入门到实战【进阶篇 11】Python线程池编程:从入门到实战(附批量爬虫/文件处理实战)
前言在上一节中我们掌握了多线程基础用法但手动创建、管理线程时会遇到“频繁创建销毁线程开销大、线程数失控、管理逻辑复杂”等问题——而线程池是解决这些问题的最优方案它是企业级并发编程中最常用的工具也是Python进阶的核心技能之一。本节课聚焦线程池从“新手能懂的核心价值”到“企业级实战”全程用通用极简示例讲透为什么用→怎么用→高级配置→实战场景→避坑点新手也能直接上手复用。本节核心学习内容线程池核心价值解决手动多线程的3大痛点开销/失控/复杂ThreadPoolExecutor核心用法submit/map/shutdown最简实现线程池获取结果result()阻塞式/ add_done_callback回调式两种方式线程池高级配置最大线程数/超时/异常处理实战必备实战1线程池批量爬取网页IO密集型经典场景实战2线程池批量处理文件通用场景可直接复用线程池vs手动多线程一眼分清该用谁新手必避的5个坑超时/异常/资源释放文章目录前言一、为什么需要线程池手动多线程的痛点二、线程池核心概念三、ThreadPoolExecutor核心用法最简实现1. 基础用法submit shutdown最灵活运行结果核心说明2. 简化用法map方法批量任务运行结果核心说明3. 异步获取结果add_done_callback回调函数运行结果4. 线程池异常处理关键避坑运行结果四、线程池高级配置核心参数与最佳实践1. 最大线程数max_workers设置原则2. 超时配置timeout3. 线程池关闭shutdown五、实战1线程池批量爬取网页IO密集型经典场景核心价值六、实战2线程池批量处理文件通用场景核心价值七、线程池vs手动多线程对比选型八、新手避坑大全九、核心总结十、专栏订阅一、为什么需要线程池手动多线程的痛点手动创建多线程时会遇到以下核心问题而线程池能完美解决手动多线程的痛点线程池的解决方案频繁创建/销毁线程系统开销大提前创建固定数量的线程任务完成后线程复用避免重复开销线程数失控如创建1000个线程导致CPU/内存耗尽限制最大线程数始终保持可控的并发量手动管理线程生命周期join/锁/通信代码复杂线程池自动管理线程只需关注“任务本身”无需关心线程任务执行结果需手动收集异常处理繁琐线程池提供统一的结果获取、异常捕获机制简单来说线程池是“线程的池子”复用线程、控制并发、简化管理是IO密集型场景爬虫/文件处理/接口调用的首选。二、线程池核心概念线程池提前创建一组固定数量的线程存放在“池子”中任务提交将需要执行的任务提交给线程池线程池会分配空闲线程执行任务线程复用任务执行完成后线程不会销毁返回池子等待下一个任务核心优势降低线程创建/销毁开销、控制并发数、简化任务管理。Python中实现线程池的首选工具是concurrent.futures.ThreadPoolExecutorPython3.2内置无需安装第三方库开箱即用。三、ThreadPoolExecutor核心用法最简实现1. 基础用法submit shutdown最灵活submit()用于提交单个任务返回Future对象可获取任务结果/状态shutdown()用于关闭线程池等待所有任务完成。fromconcurrent.futuresimportThreadPoolExecutorimporttime# 定义任务函数deftask(name,delay):模拟IO任务睡眠指定时间print(f任务{name}开始执行延迟{delay}秒)time.sleep(delay)print(f任务{name}执行完成)returnf任务{name}结果成功# 1. 创建线程池指定最大线程数为2withThreadPoolExecutor(max_workers2)asexecutor:# 2. 提交任务返回Future对象future1executor.submit(task,t1,2)future2executor.submit(task,t2,1)future3executor.submit(task,t3,3)# 线程池只有2个线程t3等待t2完成后执行# 3. 获取任务结果result()会阻塞直到任务完成result1future1.result()result2future2.result()result3future3.result()print(f\n任务1结果{result1})print(f任务2结果{result2})print(f任务3结果{result3})# with语句会自动调用shutdown()无需手动关闭print(\n所有任务执行完毕线程池已关闭)运行结果任务t1开始执行延迟2秒 任务t2开始执行延迟1秒 任务t2执行完成 任务t3开始执行延迟3秒 任务t1执行完成 任务t3执行完成 任务1结果任务t1结果成功 任务2结果任务t2结果成功 任务3结果任务t3结果成功 所有任务执行完毕线程池已关闭核心说明max_workers线程池最大线程数核心参数推荐根据场景设置下文会讲原则submit(func, *args, **kwargs)提交任务参数为“任务函数函数参数”Future对象代表异步任务的结果核心方法result(timeoutNone)获取任务结果超时会抛TimeoutErrordone()判断任务是否完成返回True/Falsecancel()取消未执行的任务已执行则返回Falsewith语句自动管理线程池生命周期结束时调用shutdown(waitTrue)等待所有任务完成后关闭。2. 简化用法map方法批量任务如果任务函数相同、参数不同用map()更简洁类似Python内置map自动分配任务并返回结果列表。fromconcurrent.futuresimportThreadPoolExecutorimporttime# 定义批量任务的函数defbatch_task(num):模拟批量IO任务计算数字平方time.sleep(0.5)returnnum*num# 创建线程池执行批量任务withThreadPoolExecutor(max_workers3)asexecutor:# 传入任务函数参数列表返回结果生成器resultsexecutor.map(batch_task,[1,2,3,4,5])# 遍历获取结果按参数顺序返回即使任务完成顺序不同print(批量任务结果)fornum,resinzip([1,2,3,4,5],results):print(f{num}的平方{res})运行结果批量任务结果 1的平方1 2的平方4 3的平方9 4的平方16 5的平方25核心说明map(func, *iterables, timeoutNone)func任务函数iterables参数列表多个可迭代对象则按位置传参返回值按参数顺序的结果生成器即使任务并发执行结果顺序与参数一致适合“任务逻辑统一、参数批量”的场景如批量爬取URL、批量处理文件。3. 异步获取结果add_done_callback回调函数result()是阻塞式获取结果而add_done_callback()是回调式——任务完成后自动调用回调函数无需主动等待更适合异步场景。fromconcurrent.futuresimportThreadPoolExecutorimporttime# 任务函数defasync_task(name):time.sleep(1)returnf任务{name}完成# 回调函数任务完成后自动执行defcallback(future):处理任务完成后的结果resultfuture.result()print(f回调函数{result})# 创建线程池异步获取结果withThreadPoolExecutor(max_workers2)asexecutor:future1executor.submit(async_task,t1)future2executor.submit(async_task,t2)# 绑定回调函数add_done_callback传入回调函数future1.add_done_callback(callback)future2.add_done_callback(callback)print(主线程继续执行无需等待任务完成)运行结果主线程继续执行无需等待任务完成 回调函数任务t1完成 回调函数任务t2完成4. 线程池异常处理关键避坑线程池中的任务异常不会直接抛出需通过result()或exception()捕获否则会隐藏错误。fromconcurrent.futuresimportThreadPoolExecutor,TimeoutError# 有异常的任务函数deferror_task(num):模拟任务异常除以0return10/numwithThreadPoolExecutor(max_workers2)asexecutor:# 提交可能出错的任务future1executor.submit(error_task,2)future2executor.submit(error_task,0)# 会抛ZeroDivisionError# 捕获异常方式1result()中捕获try:res1future1.result()print(f任务1结果{res1})exceptExceptionase:print(f任务1异常{e})try:res2future2.result()print(f任务2结果{res2})exceptZeroDivisionErrorase:print(f任务2异常{e})# 捕获异常方式2exception()方法excfuture2.exception()ifexc:print(f任务2异常exception方法{exc})运行结果任务1结果5.0 任务2异常division by zero 任务2异常exception方法division by zero四、线程池高级配置核心参数与最佳实践1. 最大线程数max_workers设置原则max_workers是线程池最核心的参数设置不当会严重影响性能遵循以下原则任务类型最大线程数设置原则原因IO密集型CPU核心数 × 5 ~ 10如8核设40~80IO操作网络/文件时线程阻塞CPU空闲更多线程可利用空闲时间提升并发CPU密集型CPU核心数 1如8核设9避免线程切换开销最大化利用CPU获取CPU核心数importos cpu_countos.cpu_count()print(fCPU核心数{cpu_count})# 输出当前机器的CPU核心数2. 超时配置timeout避免任务无限阻塞给result()/map()设置超时时间fromconcurrent.futuresimportTimeoutErrorwithThreadPoolExecutor(max_workers2)asexecutor:futureexecutor.submit(time.sleep,3)try:# 超时时间2秒任务需要3秒会抛异常resultfuture.result(timeout2)exceptTimeoutError:print(任务执行超时终止等待)future.cancel()# 取消未完成的任务3. 线程池关闭shutdownshutdown(waitTrue)默认值等待所有任务完成后关闭线程池shutdown(waitFalse)立即关闭线程池未完成的任务不再执行线程池关闭后不能再提交新任务会抛RuntimeErrorwith语句自动调用shutdown(waitTrue)推荐优先使用。五、实战1线程池批量爬取网页IO密集型经典场景以“批量爬取多个网页获取标题和响应时间”为例演示线程池在IO密集型场景的实战用法fromconcurrent.futuresimportThreadPoolExecutorimportrequestsimporttimefrombs4importBeautifulSoup# 要爬取的URL列表URL_LIST[https://www.baidu.com,https://www.zhihu.com,https://www.github.com,https://www.csdn.net,https://www.python.org]# 爬取单个网页的函数defcrawl_url(url):爬取网页返回标题和响应时间try:start_timetime.time()# 设置超时避免卡壳responserequests.get(url,timeout10)response.raise_for_status()# 非200状态码抛异常response.encodingutf-8soupBeautifulSoup(response.text,html.parser)titlesoup.title.string.strip()ifsoup.titleelse无标题cost_timeround(time.time()-start_time,2)return{url:url,title:title,cost_time:cost_time,status:成功}exceptExceptionase:return{url:url,title:,cost_time:0,status:f失败{str(e)[:50]}# 截取异常信息避免过长}# 主线程线程池批量爬取defbatch_crawl():# 设置最大线程数IO密集型CPU核心数×5max_workersos.cpu_count()*5print(f启动线程池最大线程数{max_workers})start_totaltime.time()withThreadPoolExecutor(max_workersmax_workers)asexecutor:# 批量提交任务resultsexecutor.map(crawl_url,URL_LIST)# 输出结果print(\n 批量爬取结果 )forresinresults:print(fURL{res[url]})print(f标题{res[title]})print(f耗时{res[cost_time]}秒 | 状态{res[status]})print(-*50)total_costround(time.time()-start_total,2)print(f\n总耗时{total_cost}秒单线程需约{total_cost*len(URL_LIST)}秒)if__name____main__:importos# 避免上面代码块的import重复batch_crawl()核心价值相比单线程逐个爬取线程池批量爬取耗时仅为单线程的1/5左右统一的异常处理单个URL爬取失败不影响其他任务控制最大线程数避免请求过多被目标网站封禁。六、实战2线程池批量处理文件通用场景以“批量读取多个文本文件提取关键词并统计出现次数”为例演示线程池在文件处理场景的用法fromconcurrent.futuresimportThreadPoolExecutorimportosimportre# 要处理的文件列表替换为你的文件路径FILE_LIST[test1.txt,test2.txt,test3.txt,test4.txt]# 关键词列表KEYWORDS[Python,线程池,并发,编程]# 处理单个文件的函数defprocess_file(file_path):读取文件统计关键词出现次数try:ifnotos.path.exists(file_path):return{file:file_path,result:文件不存在,status:失败}# 读取文件内容withopen(file_path,r,encodingutf-8)asf:contentf.read().lower()# 转小写不区分大小写# 统计关键词次数keyword_count{}forkeywordinKEYWORDS:# 正则匹配不区分大小写countlen(re.findall(keyword.lower(),content))keyword_count[keyword]countreturn{file:file_path,result:keyword_count,status:成功}exceptExceptionase:return{file:file_path,result:str(e),status:失败}# 主线程线程池批量处理defbatch_process_files():max_workersmin(4,os.cpu_count()1)# 限制最大线程数不超过4print(f启动线程池最大线程数{max_workers})withThreadPoolExecutor(max_workersmax_workers)asexecutor:resultsexecutor.map(process_file,FILE_LIST)# 输出结果print(\n 批量文件处理结果 )forresinresults:print(f文件{res[file]}| 状态{res[status]})ifres[status]成功:forkeyword,countinres[result].items():print(f -{keyword}出现{count}次)else:print(f 错误{res[result]})print(-*50)if__name____main__:batch_process_files()核心价值批量处理文件时线程池利用IO等待时间并发读取效率提升显著单个文件处理失败不影响其他文件容错性强代码结构清晰只需关注“文件处理逻辑”无需管理线程。七、线程池vs手动多线程对比选型特性线程池ThreadPoolExecutor手动多线程threading.Thread线程复用支持降低开销任务完成后销毁开销大并发数控制max_workers限制需手动控制易失控任务结果获取Future/ map 便捷获取需手动用队列收集异常处理统一捕获不影响其他任务单个线程异常可能导致崩溃代码复杂度低只需关注任务逻辑高需管理线程/锁/通信适用场景批量任务、IO密集型、企业级开发简单并发、自定义线程管理选型建议90%的场景优先用线程池简洁、高效、易维护仅需高度自定义线程行为如线程通信/优先级时用手动多线程。八、新手避坑大全max_workers设置过大IO密集型也不是越大越好过多线程会导致系统调度开销增加建议按“CPU核心数×5~10”设置忽略任务异常线程池任务异常不会主动抛出必须通过result()/exception()捕获否则会隐藏bug超时未设置未给result()设置超时可能导致主线程无限阻塞重复提交任务线程池shutdown()后提交任务会抛RuntimeError需确保提交逻辑在shutdown()前资源未释放用requests/文件操作时需在任务函数内确保资源关闭如response.close()/f.close()回调函数阻塞add_done_callback()的回调函数不要写耗时逻辑否则会阻塞线程池。九、核心总结本节课我们掌握了Python线程池的核心知识核心要点回顾线程池价值解决手动多线程的“开销大、失控、复杂”问题复用线程、控制并发、简化管理核心用法灵活场景用submit()Future获取结果批量任务用map()更简洁异步场景用add_done_callback()回调参数配置max_workers按任务类型设置IO密集型CPU×5~10CPU密集型CPU1异常处理必须捕获任务异常避免隐藏错误适用场景IO密集型爬虫/文件/接口优先用线程池90%场景无需手动管理线程。线程池是Python并发编程的“主力军”掌握后可高效应对批量爬取、文件处理、接口并发调用等企业级场景。下一节我们将学习多进程解决CPU密集型任务的并发问题。十、专栏订阅专栏优点《Python从入门到实战》专栏内容涵盖Python基础到高级编程、Web开发Django/Flask框架、数据库MySQL/ORM、网络爬虫、Linux部署运维等全栈核心知识以项目驱动教学构建清晰学习路径适合零基础入门和进阶提升的同学跟着一步步从入门到精通专栏地址https://blog.csdn.net/zsh_1314520/category_13108073.html文章是永久吗一次订阅后可永久免费查看专栏内所有文章后续会持续更新全栈相关内容第一时间获取最新教程有答疑交流群吗订阅专栏后有专属的全栈学习答疑群群内提供专业问题答疑、和众多学习者抱团取暖一起沉淀技术、赋能成长进群方式订阅专栏后可直接在专栏内申请加入答疑群或私信博主沟通进群事宜https://bbs.csdn.net/topics/620104702更多干货点赞收藏关注博主不迷路博主博客链接https://blog.csdn.net/zsh_1314520?spm1000.2115.3001.5343专注Python全栈技术分享评论区留言问题会一一回复助力大家轻松搞定Python全栈【原创声明】除本文原文地址以外如发现同款内容皆为盗版本文已收录于《Python全栈从入门到实战》请勿购买盗版文章和专栏如购买盗版内容不提供任何服务。原文地址https://blog.csdn.net/zsh_1314520/article/details/160249894

更多文章