爬虫Part4——异步爬虫

[TOC]

多线程与多进程

多线程

  1. 进程是资源单位,开启一个程序则开启一个进程,为每个进程分配内存空间。
  2. 线程是执行单位,每一个进程至少要有一个线程,为每个线程分配计算资源。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
from threading import Thread


def func(name):
for i in range(1000):
print(name, i)


class MyThread(Thread):
def __init__(self, name): # 传参做初始化
super().__init__()
self.name = name

def run(self): # 当线程被start时, 自动调用run()
for i in range(1000):
print(self.name, i)


if __name__ == '__main__':
# 1. 通过函数启动线程
t = Thread(target=func, args=("t",)) # 传参必须是元组
t.start()

t0 = Thread(target=func, args=("t0",))
t0.start()

# 2. 通过类启动线程
t1 = MyThread("t1")
# t1.run() # 直接run则为单线程
t1.start() # 开启多线程

t2 = MyThread("t2")
t2.start()

# 主线程
for i in range(1000):
print("主线程", i)

多进程

  • 开进程太费资源,不建议使用
1
2
3
4
5
6
7
8
9
10
11
12
13
14
from multiprocessing import Process


def func():
for i in range(10000):
print("子", i)


if __name__ == '__main__':
p = Process(target=func)
p.start()
for i in range(10000):
print("主进程", i)

线程池和进程池

  • 线程池:一次性开辟多个线程,只需要考虑给线程池子编写任务,线程任务的调度交给线程池来完成。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor


def func(name):
for i in range(1000):
print(name, i)


if __name__ == '__main__':
with ThreadPoolExecutor(50) as t:
for i in range(100):
t.submit(func, name=f"线程{i}")
# 等待线程池中的任务全部执行完毕. 才继续执行和后续操作
print("yes!")

批量新发地菜价

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
import requests
from lxml import etree
import csv

f = open("source/菜价.csv", encoding="utf-8", newline="", mode="w")
csv_writer = csv.writer(f)


def download(url):
resp = requests.get(url=url)
# print(resp.text)
tree = etree.HTML(resp.text)
ts = tree.xpath("/html/body/div[2]/div[4]/div[1]/table")[0] # tbody是生成的,需要删去
# print(len(ts)) # 即使结果只有一个,xpath()也永远返回list
trs = ts.xpath("./tr[position()>1]") # table下有多个tr,tr下有多个td
# rows = t.xpath("./tr")[1:]
for tr in trs:
tds = tr.xpath("./td/text()")
# 对数据做简单处理
tds = list(td.replace('\\', "").replace("/", "") for td in tds) # 将生成器转成list
# print(tds)
csv_writer.writerow(tds)
print(url, "have been downloaded!")


if __name__ == '__main__':
with ThreadPoolExecutor(50) as t:
for i in range(1, 200):
u = f"http://www.xinfadi.com.cn/marketanalysis/0/list%20/{i}.shtml"
t.submit(download, url=u)

f.close()

协程

asyncio

  • 协程能够更加⾼效的利用CPU资源。

  • time.sleep()input()以及request.get()的等待过程中,该线程都处于阻塞状态,CPU资源浪费。

    • 一般情况下,遇到IO操作时,线程都会处于阻塞状态。
  • 协程:当程序遇到阻塞时,可以选择性地切换到其他任务上。(单线程多任务异步操作)

    • 在微观上,是一个个任务进行切换

    • 在宏观上,能感受到的其实是多个任务一起在执行

1
2
3
4
5
6
7
8
9
10
11
12
import asyncio


async def func():
print("你好啊, 我叫赛利亚")


if __name__ == '__main__':
g = func() # 此时的函数是异步协程函数,将返回一个协程对象
# print(g)
asyncio.run(g) # 协程程序运行需要asyncio模块的支持(需要python3.7以上)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
import asyncio
import time


async def func1():
print("你好啊, 我叫潘金莲")
# time.sleep(3) # sleep、get属于同步操作。当程序出现了同步操作的时候. 异步就中断了
await asyncio.sleep(3) # asyncio.sleep才是异步睡眠。需要使用await挂起
print("你好啊, 我叫潘金莲")


async def func2():
print("你好啊, 我叫王建国")
# time.sleep(2)
await asyncio.sleep(2)
print("你好啊, 我叫王建国")


async def func3():
print("你好啊, 我叫李雪琴")
await asyncio.sleep(4)
print("你好啊, 我叫李雪琴")


if __name__ == '__main__':
f1 = func1()
f2 = func2()
f3 = func3()
tasks = [
f1, f2, f3
]

t1 = time.time()

# 一次性启动多个任务(协程)
# asyncio.run(asyncio.wait(tasks))

# python版本低于3.7时会有以下报错
# AttributeError: module 'asyncio' has no attribute 'run'
# 修正:
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))

t2 = time.time()
print(t2 - t1)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
import asyncio
import time


async def func1():
print("你好啊, 我叫潘金莲")
await asyncio.sleep(3)
print("你好啊, 我叫潘金莲")


async def func2():
print("你好啊, 我叫王建国")
await asyncio.sleep(2)
print("你好啊, 我叫王建国")


async def func3():
print("你好啊, 我叫李雪琴")
await asyncio.sleep(4)
print("你好啊, 我叫李雪琴")


async def main():
# 第一种写法
# f1 = func1()
# await f1 # 一般await挂起操作放在协程对象前面

# 第二种写法(推荐)
tasks = [
func1(), # py3.8以后asyncio.create_task(func1())
func2(),
func3()
]
await asyncio.wait(tasks)


if __name__ == '__main__':
t1 = time.time()
# 一次性启动多个任务(协程)

# 定义协程对象
m = main()
# 定义事件循环对象容器
loop = asyncio.get_event_loop()
# task = asyncio.ensure_future(coroutine)
# 将协程转为task任务
task = loop.create_task(m)
# 将task任务扔进事件循环对象中并触发
loop.run_until_complete(task)

t2 = time.time()
print(t2 - t1)

aiohttp异步请求

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
import asyncio
import aiohttp

urls = [
"http://kr.shanghai-jiuxin.com/file/2020/1031/191468637cab2f0206f7d1d9b175ac81.jpg",
"http://kr.shanghai-jiuxin.com/file/2020/1031/563337d07af599a9ea64e620729f367e.jpg",
"http://kr.shanghai-jiuxin.com/file/2020/1031/774218be86d832f359637ab120eba52d.jpg"
]


async def aio_download(url):
name = url.rsplit('/', 1)[1] # 从右往左按'/'切分一次,即['http...', '...jpg']
async with aiohttp.ClientSession() as session:
async with session.get(url) as resp:
# 拿到请求
with open("source/" + name, mode="wb") as f:
f.write(await resp.content.read()) # 读取内容为异步,需要挂起
print(name, "downloaded!")


async def main():
task = []
for url in urls:
task.append(aio_download(url))

await asyncio.wait(task)


if __name__ == '__main__':
loop = asyncio.get_event_loop()
task = loop.create_task(main())
loop.run_until_complete(task)

实战案例

纵横小说

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
import asyncio
from lxml import etree
import aiohttp
import aiofiles # 异步文件读写
import requests

"http://book.zongheng.com/showchapter/1084237.html"
"http://book.zongheng.com/chapter/1084237/63603883.html"


# resp = requests.get("http://book.zongheng.com/chapter/1084237/63603883.html")
# print(resp.text)

async def aio_download(path, href):
url = href
async with aiohttp.ClientSession() as session:
async with session.get(url) as resp:
dic = await resp.text()
tree = etree.HTML(dic)
contents = tree.xpath("/html/body/div[2]/div[3]/div[3]/div/div[5]/p/text()")
async with aiofiles.open(path, mode="w", encoding="utf-8") as f:
for content in contents:
await f.write(content + "\n") # 把小说内容写出
print(path, "ok!")


async def get_catalog(url):
tasks = []
# 获取书本主页面响应
resp = requests.get(url)
tree = etree.HTML(resp.text)
chs = tree.xpath("/html/body/div[3]/div[2]/div[2]/div/ul")[1:3] # todo 初步爬取
# print(len(chs)) # 有四卷
for item in chs: # 获取每一卷中,各章节的标题和链接
hrefs = item.xpath("./li/a/@href")
titles = item.xpath("./li/a/text()")
for title, href in zip(titles, hrefs):
# 准备异步任务
# print(title, href)
path = "source/" + title + ".txt"
tasks.append(aio_download(path, href))
# break # todo 初步爬取
await asyncio.wait(tasks)


if __name__ == '__main__':
b_id = "1084237"
url_book = f"http://book.zongheng.com/showchapter/{b_id}.html"

loop = asyncio.get_event_loop()
task = loop.create_task(get_catalog(url_book))
loop.run_until_complete(task)
  • 教程原网页已炸,这里用其他网站做测试学习
  • 主要还是需要自己去定位资源位置
  • 有加密的是真的烦,还有些直接导向其他域名,离谱
  • 幸好纵横的基本不反爬,比较容易

抓取视频

  • 一般在html中写入<video src="不能播的视频.mp4"></video>即可播放视频,但是其需要加载完整个视频资源,效率极低。

  • 网站上视频一般都需要:①转码成不同分辨率; ②切片处理以减少拉动进度条时的资源占用。

  • 因此需要一个M3U8(本质就是utf-8文本)文件记录:1.视频切片播放顺序;2.视频切片存放的路径……

  • 想要抓取一个视频:

    1. 找到m3u8
    2. 通过m3u8下载到ts文件
    3. 把ts文件合并为一个mp4文件

简单版

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
"""
流程:
1. 拿到548121-1-1.html的页面源代码
2. 从源代码中提取到m3u8的url。
3. 下载m3u8
4. 读取m3u8文件, 下载视频
5. 合并视频
tips:
1. 第二部的url中?note=是从第一步中拿到的,即必须先第一步才能第二步
2. 这个note具备时效性,间隔太长则拒绝访问
"""
import requests
import re

# url在js中,所以用re而不是xpath来提取m3u8的url地址
# 复制并查找 url: ' 和 ', 是否在文件中唯一
obj = re.compile(r"url: '(?P<url>.*?)',", re.S)

url_html = "https://www.91kanju.com/vod-play/54812-1-1.html"

headers = {
"User-Agent": "User-Agent"
}

resp = requests.get(url_html, headers=headers)
url_m3u8 = obj.search(resp.text).group("url") # 拿到m3u8的地址
print(url_m3u8)
resp.close()

# 下载m3u8文件
resp_m3u8 = requests.get(url_m3u8, headers=headers)

with open("哲仁王后.m3u8", mode="wb") as f:
f.write(resp_m3u8.content) # 直接以wb二进制形式写入,避免编码问题

resp_m3u8.close()
print("下载完毕")

# 解析m3u8文件
n = 1
with open("哲仁王后.m3u8", mode="r", encoding="utf-8") as f:
for line in f:
line = line.strip() # 去除空格, 空白, 换行符
if line.startswith("#"): # 如果以#开头. 则跳过
continue

# 下载视频片段
with requests.get(line) as resp3:
with open(f"video/{n}.ts", mode="wb") as f:
f.write(resp3.content)
n += 1
print("完成了1个")

复杂版

image-20210605103918532

  • 当遇到右键点不了的网页时,可以通过source->top/buding520.com/acg/19888/1.html找到页面源代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
"""
思路:
1. 拿到主页面的页面源代码, 找到iframe
2. 从iframe的页面源代码中拿到m3u8文件的地址
3. 下载第一层m3u8文件 -> 下载第二层m3u8文件(视频存放路径)
4. 下载ts切片视频(协程)
5. 下载秘钥, 进行解密操作(协程)
6. 合并所有ts文件为一个mp4文件
"""
import requests
from bs4 import BeautifulSoup
import re
import asyncio
import aiohttp
import aiofiles
from Crypto.Cipher import AES # pip install pycryptodome + 文件夹改名crypto->Crypto
import os


def get_iframe_src(url):
resp = requests.get(url)
main_page = BeautifulSoup(resp.text, "html.parser") # 可以用bs/xpath
src = main_page.find("iframe").get("src")
return src
# return "https://boba.52kuyun.com/share/xfPs9NPHvYGhNzFp" # 为了测试


def get_first_m3u8_url(url):
resp = requests.get(url)
# print(resp.text)
# 从js中找,所以用re
obj = re.compile(r'var main = "(?P<m3u8_url>.*?)"', re.S)
m3u8_url = obj.search(resp.text).group("m3u8_url")
# print(m3u8_url)
return m3u8_url


def download_m3u8_file(url, name):
resp = requests.get(url)
with open(name, mode="wb") as f:
f.write(resp.content)


async def download_ts(url, name, session):
async with session.get(url) as resp:
async with aiofiles.open(f"video2/{name}", mode="wb") as f:
await f.write(await resp.content.read()) # 把下载到的内容写入到文件中
print(f"{name}下载完毕")


async def aio_download(up_url): # https://boba.52kuyun.com/20170906/Moh2l9zV/hls/
tasks = []
async with aiohttp.ClientSession() as session: # 提前准备好session
async with aiofiles.open("越狱第一季第一集_second_m3u8.txt", mode="r", encoding='utf-8') as f:
async for line in f:
if line.startswith("#"):
continue
# line就是xxxxx.ts
line = line.strip() # 去掉没用的空格和换行
# 拼接真正的ts路径
ts_url = up_url + line
task = asyncio.create_task(download_ts(ts_url, line, session)) # 创建任务
tasks.append(task)

await asyncio.wait(tasks) # 等待任务结束


def get_key(url):
resp = requests.get(url)
return resp.text


async def dec_ts(name, key):
aes = AES.new(key=key, IV=b"0000000000000000", mode=AES.MODE_CBC)
async with aiofiles.open(f"video2/{name}", mode="rb") as f1, \
aiofiles.open(f"video2/temp_{name}", mode="wb") as f2:
bs = await f1.read() # 从源文件读取内容
await f2.write(aes.decrypt(bs)) # 把解密好的内容写入文件
print(f"{name}处理完毕")


async def aio_dec(key):
# 解密
tasks = []
async with aiofiles.open("越狱第一季第一集_second_m3u8.txt", mode="r", encoding="utf-8") as f:
async for line in f:
if line.startswith("#"):
continue
line = line.strip()
# 开始创建异步任务
task = asyncio.create_task(dec_ts(line, key))
tasks.append(task)
await asyncio.wait(tasks)


def merge_ts(): # 其实也可以用ab追加写入
# mac: cat 1.ts 2.ts 3.ts > xxx.mp4
# windows: copy /b 1.ts+2.ts+3.ts xxx.mp4
lst = []
with open("越狱第一季第一集_second_m3u8.txt", mode="r", encoding="utf-8") as f:
for line in f:
if line.startswith("#"):
continue
line = line.strip()
lst.append(f"video2/temp_{line}")

s = "+".join(lst) # 1.ts+2.ts+3.ts...
os.system(f"copy /b {s} movie.mp4")
print("done!")


def main(url):
# 1. 拿到主页面的页面源代码, 找到iframe对应的url
iframe_src = get_iframe_src(url)

# 2. 拿到第一层的m3u8文件的下载地址
first_m3u8_url = get_first_m3u8_url(iframe_src)
# 拿到iframe的域名 "https://boba.52kuyun.com/share/xfPs9NPHvYGhNzFp"
iframe_domain = iframe_src.split("/share")[0]
# 拼接出真正的m3u8的下载路径
first_m3u8_url = iframe_domain + first_m3u8_url
# https://boba.52kuyun.com/20170906/Moh2l9zV/index.m3u8?sign=548ae366a075f0f9e7c76af215aa18e1
# print(first_m3u8_url)

# 3.1 下载第一层m3u8文件
download_m3u8_file(first_m3u8_url, "越狱第一季第一集_first_m3u8.txt")
# 3.2 下载第二层m3u8文件
with open("越狱第一季第一集_first_m3u8.txt", mode="r", encoding="utf-8") as f:
for line in f:
if line.startswith("#"):
continue
else:
line = line.strip() # 去掉空白或者换行符 hls/index.m3u8
# 准备拼接第二层m3u8的下载路径
# https://boba.52kuyun.com/20170906/Moh2l9zV/ + hls/index.m3u8
# https://boba.52kuyun.com/20170906/Moh2l9zV/hls/index.m3u8
# https://boba.52kuyun.com/20170906/Moh2l9zV/hls/cFN8o3436000.ts
second_m3u8_url = first_m3u8_url.split("index.m3u8")[0] + line
download_m3u8_file(second_m3u8_url, "越狱第一季第一集_second_m3u8.txt")
print("m3u8文件下载完毕")

# 4. 下载视频
second_m3u8_url_up = second_m3u8_url.replace("index.m3u8", "")
# 异步协程
asyncio.run(aio_download(second_m3u8_url_up)) # 测试的使用可以注释掉

# 5.1 拿到秘钥
key_url = second_m3u8_url_up + "key.key" # 偷懒写法, 正常应该去m3u8文件里去找
key = get_key(key_url)
# 5.2 解密
asyncio.run(aio_dec(key))

# 6. 合并ts文件为mp4文件
merge_ts()


if __name__ == '__main__':
# ↓↓↓网站已挂↓↓↓
url = "https://www.91kanju.com/vod-play/541-2-1.html"
main(url)

寻找思路时:复杂问题简单化(逻辑化思路)

实现思路时:简单问题复杂化(思考更多可能性)