import time
import requests
# Queue
from multiprocessing import Queue
# 用来通过正则表达式来抽取网页数据的
import re
# 多线程模块
import threading
from handle_mongo import Mongo_Client
import json
page_flag = False
data_flag = False
Lock = threading.Lock()
class Crawl_page(threading.Thread):
'''此类专用于给页码发送请求'''
def __init__(self, thread_name, page_queue, data_queue): #导入这几个变量因为此类要get页码队列和数据队列
super(Crawl_page,self).__init__()
self.thread_name = thread_name
self.page_queue = page_queue
self.data_queue = data_queue
self.header ={
"Host": "jobs.51job.com",
"Connection": "keep-alive",
"Upgrade-Insecure-Requests": "1",
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/88.0.4324.190 Safari/537.36",
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.9",
"Sec-Fetch-Site": "none",
"Sec-Fetch-Mode": "navigate",
"Sec-Fetch-User": "?1",
"Sec-Fetch-Dest": "document",
"Accept-Encoding": "gzip, deflate, br",
"Accept-Language": "zh-CN,zh;q=0.9"
}
def run(self):
while not page_flag:
'''
使用了异常处理流程try..except,此处添加了else模块,\
其表示为当try没有出现异常时,程序会执行else块
'''
try:
page = self.page_queue.get_nowait()
except:
pass
else:
page_url = "https://search.51job.com/list/000000,000000,0000,00,9,99,python,2,"+str(page)+".html"
time.sleep(1)
print("当前请求的URL为: %s" % page_url)
# 设置代理,内容为一个字典
proxy = {
}
# 建立起get请求
response = requests.get(url=page_url, headers=self.header, proxies=proxy, verify=False) #, verify=False
response.encoding = "gbk"
self.data_queue = response.text
class Crawl_html(threading.Thread):
'''此类专用于处理 上面发送请求后,页码返回的数据'''
def __init__(self, thread_name, data_queue, Lock):
super(Crawl_html,self).__init__()
self.thread_name = thread_name
self.data_queue = data_queue
self.Lock = Lock
def run(self):
print("当前处理文本任务的线程为:%s" % self.thread_name)
while not data_flag:
try:
text = self.data_queue.get_nowait()
except:
pass
else:
result = self.parse(text)
print(result)
data = json.loads(result).get("engine_search_result")
with self.lock:
Mongo_Client().insert_data(data=data)
def parse(self, text):
# 匹配数据的正则表达式
search_data = re.compile(r'window\.__SEARCH_RESULT__\s=\s(.*?)</script>')
# 抽取数据
data = search_data.search(text)
if data:
job_items = data.group(1)
print(job_items)
return job_items
def main():
page_queue = Queue()
#定义收到的网页的数据队列
data_queue = Queue()
for page in range(1, 4):
page_queue.put(page)
print("当前页码队列中存贮的页码总量为 {}".format(page_queue.qsize())) #qsize()方法是用于获取队列长度
crawl_page_list = ["页码线程1号","页码线程2号","页码线程3号"]
page_thread_list = []
for thread_name1 in crawl_page_list:
thread_page = Crawl_page(thread_name = thread_name1, page_queue= page_queue, data_queue= data_queue)
thread_page.start()
page_thread_list.append(thread_page)
#启动html_page线程
html_page_list =['数据线程1号','数据线程2号','数据线程3号']
html_thread_list = []
for thread_name_html in html_page_list:
thread_html = Crawl_html(thread_name_html, data_queue, Lock)
thread_html.start()
html_thread_list.append(thread_html)
global page_flag, data_flag
while not page_queue.empty():
pass
page_flag = True
for page_thread_list_join in page_thread_list:
page_thread_list_join.join()
print("{0}释放结束".format(page_thread_list_join.thread_name))
# 4.2 停止Crawl_html线程
while not data_queue.empty():
pass
data_flag = True
# 5.2释放线程
for html_thread_list_join in html_thread_list:
html_thread_list_join.join()
print("{0}释放结束".format(html_thread_list_join.thread_name))
if __name__ == '__main__':
main()
恭喜解决一个难题,获得1积分~
来为老师/同学的回答评分吧
0 星