python项目实战10-网络机器人03

张开发
2026/4/13 22:17:21 15 分钟阅读

分享文章

python项目实战10-网络机器人03
项目获取电影榜单上的电影详情信息1.发送请求获取高分电影榜单数据response requests.get(BASE_URL, timeout60)2.解析数据获取电影列表document html.fromstring(response.text) movie_list document.xpath(//*[idpage_1]/div[classcard style_1])(1) 如果遇到response.text是空的这极有可能是因为目标网站有反爬虫机制。 User-Agent缺失 默认的 requests.get() 会发送一个类似python-requests/2.x.x的请求头。大多数现代网站会直接拒绝这种非浏览器的请求返回 403 Forbidden 或者返回一个空的响应。此时需要给请求添加请求头Headers伪装成浏览器访问。(具体header信息需访问网站查看)headers { User-Agent:Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/146.0.0.0 Safari/537.36 Edg/146.0.0.0, Cookie: ..., Referer: ..., X-Requested-With:... }(2) 也可以设置编码 response.encoding response.apparent_encoding 防止因中文乱码导致的解析问题。(3)如果修改后仍然报错 打印 response.text可以让你看到服务器到底返回了什么通常是“Access Denied” 或者一段JavaScript而不是你期望的HTML。request只能获取服务器返回的原始 HTML 源码它不会执行 JavaScript由于很多网站(特别是电影、电商类)的HTML源码里div idapp.../div往往是一个空壳子。真实的数据是通过JS脚本在浏览器里运行后才填充进去的。验证方法1) 在PyCharm的调试窗口中输入 print(response.text) 并查看输出。如果输出里包含电影名字说明数据在源码里是原因1(xpath写错了)2) 如果输出里只有div idapp.../div说明数据是JS加载的request抓不到解决办法1) 寻找网页背后的API接口(在浏览器按F12-Network-XHR/Fetch查看)直接请求那个接口2) 改用 Selenium 或 Playwright等自动化工具他们能模拟浏览器运行JSresponse requests.get(base_url, headersheaders, timeout10) response.raise_for_status() # 检查 HTTP 状态码 data response.json()3.检查HTTP状态码response.raise_for_status()response.raise_for_status()是requests库中Response对象的一个便捷方法它的作用是自动检查 HTTP响应的状态码。 核心功能检查状态码如果请求成功状态码在 200-399范围内它不执行任何操作程序继续往下走。如果请求失败状态码在 400-599 范围内如 404 Not Found, 500 Internal Server Error它会自动抛出一个 requests.exceptions.HTTPError 异常。如果没有raise_for_status() 如果请求返回了404页面不存在或500 服务器内部错误response.status_code会是404或500。 但程序会继续执行 response.json()。response.text可能是一个HTML错误页面比如“Page Not Found” 的HTML而不是你期望的JSON数据。 当response.json()尝试解析这个 HTML时就会抛出ValueErrorJSON解析错误。 你只能在except ValueError 中捕获错误看到的是“JSON解析失败”的信息而不知道最初的问题是 HTTP状态码错误比如404。4.从返回的结果中提取所需数据get_movie_info()for movie in data[items]: movie_info get_movie_info(movie) all_movies.append(movie_info) save_all_movies(all_movies)# 获取电影详情 def get_movie_info(movie): title movie.get(title,未知电影) rating movie.get(rating,{}) rating_value rating.get(value,) card_subtitle movie.get(card_subtitle) card_subtitle_result card_subtitle.split(/) year card_subtitle_result[0].strip() # 使用 strip() 清理可能的空格 area card_subtitle_result[1].strip() movie_type card_subtitle_result[2].strip() director card_subtitle_result[3].strip() actors card_subtitle_result[4].strip() id movie.get(id,) movie_info { 电影名: title if title else , 评分: rating_value if rating_value else , 年份: year if year else , 地区: area if area else , 类型: movie_type if movie_type else , 导演: director if director else , 演员: actors if actors else , id: id if id else } return movie_info5.将提取的数据转换为csv文件# 保存电影数据, 保存为 csv 文件 def save_all_movies(all_movies): with open(MOVIE_LIST_FILE, w, encodingutf-8, newline) as csvfile: writer csv.DictWriter(csvfile, fieldnames[电影名, 评分, 年份, 地区, 类型, 导演, 演员, id]) writer.writeheader() # 写入表头 writer.writerows(all_movies) # 写入数据6.分页查询更多网页信息要实现加载更多并爬取后续的电影信息核心在于理解豆瓣分页数据的获取逻辑也就是通过修改API接口中的 start 参数来控制页码(URL中包含start0limit20)以下是具体的实现原理和修改方案(1)核心原理修改start 参数豆瓣的列表接口通常遵循 offset/limit 的分页逻辑limit20 表示每页只返回20条数据start0 表示从第0条数据开始取(即第一页)start20 表示从第20条数据开始取(即第2页) start40 表示从第40条数据开始取(即第3页)。只需要在一个循环中不断增加start的值就能一次获取后面的数据BASE_URL https://m.douban.com/rexxar/api/v2/subject/recent_hot/movie?start{}limit20 start_index START_INDEX # 起始索引 limit LIMIT # 每页数量 total_to_fetch TOTAL_TO_FETCH # 你想爬取的总数量可根据需要调整 current_url BASE_URL.format(start_index) print(f正在爬取第 {start_index // limit 1} 页数据 (start {start_index})...)完整代码import time import requests import csv from lxml import html BASE_URL https://m.douban.com/rexxar/api/v2/subject/recent_hot/movie?start{}limit20 MOVIE_LIST_FILE csv_data/movie_list.csv START_INDEX 0 LIMIT 20 TOTAL_TO_FETCH 100 headers { User-Agent:Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/146.0.0.0 Safari/537.36 Edg/146.0.0.0, Referer:https://movie.douban.com/explore, X-Requested-With:com.douban.frodo } # 保存电影数据, 保存为 csv 文件 def save_all_movies(all_movies): with open(MOVIE_LIST_FILE, w, encodingutf-8, newline) as csvfile: writer csv.DictWriter(csvfile, fieldnames[电影名, 评分, 年份, 地区, 类型, 导演, 演员, id]) writer.writeheader() # 写入表头 writer.writerows(all_movies) # 写入数据 # 获取电影详情 def get_movie_info(movie): title movie.get(title,未知电影) rating movie.get(rating,{}) rating_value rating.get(value,) card_subtitle movie.get(card_subtitle) card_subtitle_result card_subtitle.split(/) # 确保列表长度足够避免 IndexError while len(card_subtitle_result) 5: card_subtitle_result.append() year card_subtitle_result[0].strip() # 使用 strip() 清理可能的空格 area card_subtitle_result[1].strip() movie_type card_subtitle_result[2].strip() director card_subtitle_result[3].strip() actors card_subtitle_result[4].strip() id movie.get(id,) movie_info { 电影名: title if title else , 评分: rating_value if rating_value else , 年份: year if year else , 地区: area if area else , 类型: movie_type if movie_type else , 导演: director if director else , 演员: actors if actors else , id: id if id else } return movie_info def main(): all_movies [] start_index START_INDEX # 起始索引 limit LIMIT # 每页数量 total_to_fetch TOTAL_TO_FETCH # 你想爬取的总数量可根据需要调整 while len(all_movies) total_to_fetch: current_url BASE_URL.format(start_index) print(f正在爬取第 {start_index // limit 1} 页数据 (start {start_index})...) try: response requests.get(current_url, headersheaders, timeout10) response.raise_for_status() # 检查 HTTP 状态码 data response.json() if msg in data and invalid_request in data.get(msg,): print(f!!! 接口报错: {data[msg]}(豆瓣反爬生效)) print(请检查 Headers 或 Cookie) break # 遇到反爬错误停止爬取 # --- 关键部分分行读取数据 --- if items in data: items data[items] if not items: # 如果返回items是空列表说明到底了 print(没有更多数据了停止爬取) break print(找到电影数据开始分行读取) # 3. 使用 for 循环遍历列表 for movie in items: movie_info get_movie_info(movie) all_movies.append(movie_info) # 如果已经到达了我们想要爬取的数量就退出 if len(all_movies) total_to_fetch: break # 4.保存数据, 保存为 csv 文件 print(获取到所有的电影详情, 保存电影数据到CSV文件 ...) else: print(f第{start_index // limit1}页未找到items字段) print(原始数据,data) # 打印原始数据看看到底是什么结构 break except requests.exceptions.RequestException as e: print(网络请求出错, e) break except ValueError as e: print(JSON 解析出错可能是返回了HTML而不是JSON, e) print(服务器返回的内容预览, response.text[:500]) break # 重要添加延时避免请求过于频繁 time.sleep(1.5) # 更新索引准备爬取下一页 start_index limit if all_movies: print(正在保存数据到CSV文件...) save_all_movies(all_movies) print(f数据已保存至{MOVIE_LIST_FILE}) else: print(未能获取到任何数据) if __name__ __main__: main()

更多文章