老师,我这是什么问题?怎么解决?
import requests import pymongo from queue import Queue from lxml import etree import threading def handle_request(url): """ 处理request函数 :param url: :return: response.text """ # 自定义请求头 headers = { 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7', 'Accept-Language': 'zh-CN,zh;q=0.9', 'Connection': 'keep-alive', 'Host': 'movie.douban.com', 'Upgrade-Insecure-Requests': '1', 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/114.0.0.0 Safari/537.36', 'Referer': 'https://movie.douban.com/top250?start=75&filter=', 'Cookie': 'bid=nVbYM3AAU2Q; _pk_id.100001.4cf6=1f6096e2532b9e23.1686192879.; __yadk_uid=fTj7CLY7a3yr6A0Xo85ilS1qYNMc8Y60; ll="118237"; _vwo_uuid_v2=DD681021F84F3560A783E1F7358827029|c0fe785101244962877999214b03ed61; _pk_ref.100001.4cf6=%5B%22%22%2C%22%22%2C1686215198%2C%22https%3A%2F%2Fclass.imooc.com%2Flesson%2F2194%22%5D; _pk_ses.100001.4cf6=1; ap_v=0,6.0' } response = requests.get(url=url, headers=headers, timeout=10) # 判断response的状态码是200并且有response满足条件就返回response文本信息 if response.status_code == 200 and response: return response.text class PageSpider(threading.Thread): """ 页码URL请求多线程爬虫类 """ def __init__(self, thread_name, page_queue, deta_queue): super(PageSpider, self).__init__() self.thread_name = thread_name self.detail_queue = deta_queue self.page_queue = page_queue def parse_detail_url(self, content): html = etree.HTML(content) data_html = html.xpath("//div[@class='article']/ol/li") for data in data_html: self.detail_queue.put(data) def run(self): print("{}启动".format(self.thread_name)) try: while not self.page_queue.empty(): # 从QUEUE中获取数据,并设置为非阻塞状态 page_url = self.page_queue.get(block=False) # 请求页码链接 page_response = handle_request(url=page_url) if page_response: self.parse_detail_url(content=page_response) except Exception as e: print("{} run error:{}".format(self.thread_name, e)) print("{}结束".format(self.thread_name)) class DataParse(threading.Thread): """ 详情页数据处理类 """ def __init__(self, thread_name, data_queue, mongo, lock): super(DataParse, self).__init__() self.thread_name = thread_name self.data_queue = data_queue self.mongo = mongo self.lock = lock def parse(self, data): info = { # xpath解析出来之后是一个列表 "movie_name": "".join(data.xpath(".//div[@class='info']/div[@class='hd']/a/span/text()")), "actors_information": data.xpath(".//div[@class='info']/div[@class='bd']/p[1]/text()"), "score": data.xpath(".//div[@class='info']/div[@class='bd']/div/span[@class='rating_num']/text()"), "evaluate": data.xpath(".//div[@class='info']/div[@class='bd']/div/span[@class='']/text()"), "describe": data.xpath(".//div[@class='info']/div[@class='bd']/p[2]/span/text()") } print(info) # 由于是多线程并发插入数据,因此使用lock来进行控制 with self.lock: self.mongo.insert_one(info) def run(self): print("{}启动".format(self.thread_name)) try: while not self.data_queue.empty(): detail_info = self.data_queue.get(block=False) self.parse(detail_info) except Exception as e: print("{} run error:{}".format(self.thread_name, e)) print("{}结束".format(self.thread_name)) def main(): # 页码队列 page_queue = Queue() # 仅发送3页数据,用于测试 for i in range(0, 226, 25): page_url = "https://movie.douban.com/top250?start={}".format(i) # 把页码url放入到队列 page_queue.put(page_url) # 详情页数据队列 data_queue = Queue() page_spider_threadname_list = ["列表页采集线程1号", "列表页采集线程2号", "列表页采集线程3号"] page_spider_list = [] for thread_name in page_spider_threadname_list: # 实例化创建一个线程 thread = PageSpider(thread_name, page_queue, data_queue) # 启动线程 thread.start() # 启动完之后也就是任务处理完之后 page_spider_list.append(thread) # 查看当前page_queue里面数据状态 while not page_queue.empty(): # 有数据的时候什么都不干 没有数据这个时候就跳出循环了 pass # 然后释放资源 for thread in page_spider_list: # 判断当前线程是否存活,存活就阻塞掉 if thread.is_alive(): thread.join() # 使用Lock,要向mongo插入数据 lock = threading.Lock() myclient = pymongo.MongoClient(host='127.0.0.1', port=27017, username='admin', password='abc123456') mydb = myclient["db_movies"] mycollection = mydb["movie_info"] # 启动多线程,处理数据 data_parse_threadname_list = ["数据处理线程1号", "数据处理线程2号", "数据处理线程3号", "数据处理线程4号", "数据处理线程5号"] data_parse_list = [] # 启动了5个线程处理详情页的信息 for thread_name in data_parse_threadname_list: thread = DataParse(thread_name, data_queue, mycollection, lock) # 启动线程 thread.start() data_parse_list.append(thread) # 查看当前data_queue里面数据状态 while not data_queue.empty(): # 有数据的时候什么都不干 pass # 数据为空就释放资源 for thread in data_parse_list: # 判断线程是否存活,存活就阻塞 if thread.is_alive(): thread.join() if __name__ == '__main__': main()
11
收起
正在回答 回答被采纳积分+1
1回答
相似问题
登录后可查看更多问答,登录/注册
Python全能工程师
- 参与学习 人
- 提交作业 16233 份
- 解答问题 4470 个
全新版本覆盖5大热门就业方向:Web全栈、爬虫、数据分析、软件测试、人工智能,零基础进击Python全能型工程师,从大厂挑人到我挑大厂,诱人薪资在前方!
了解课程
恭喜解决一个难题,获得1积分~
来为老师/同学的回答评分吧
0 星