用aiohttp爬取壁纸图片

    xiaoxiao2024-11-13  63

    最近看了某位爬虫大牛的讲的协程,试着去爬了一下图片,卧槽是真的猛。话不多说,附上大大文章的链接 五分钟入门python协程 Python 协程模块 asyncio 使用指南

    代码如下

    获取图片url

    import aiohttp import asyncio import requests from pyquery import PyQuery as pq import re from tqdm import tqdm # 随机生成请求头 import fake_useragent from contextvars import ContextVar # 定义全局上下文管理器 concurrent = ContextVar("concurrent") # 默认的网站 base_url = "https://alpha.wallhaven.cc/search?" # 存放图片 image_lists = [] # 存放图片地址 real_image_lists = set() # 代理 proxies = 'http://localhost:1080' async def catch_url(base_url, thing, page, ua): params = { 'q': thing, 'search_image': '', 'page': str(page) } headers = { 'cache-control': 'max-age=0', 'referer': 'https://alpha.wallhaven.cc/', 'upgrade-insecure-requests': '1', 'user-agent': ua } try: async with aiohttp.ClientSession() as session: async with session.get(base_url, params=params, headers=headers, timeout=10, proxy=proxies) as r: if r.status == 200: first_param_image_url(await r.text()) except Exception as e: print(e.args) async def catch_image_url(raw_url, ua): headers = { 'cache-control': 'max-age=0', 'referer': 'https://alpha.wallhaven.cc/', 'upgrade-insecure-requests': '1', 'user-agent': ua } # 获取上下文关键字 sem = concurrent.get() try: async with sem: async with aiohttp.ClientSession() as session: async with session.get(raw_url,headers=headers, timeout=10, proxy=proxies) as r: if r.status == 200: second_param_image_url(await r.text()) except Exception as e: print(e.args) def first_param_image_url(text): doc = pq(text) all_images = doc('section[class="thumb-listing-page"] ul li figure a') for image in all_images.items(): try: image_lists.append(image.attr.href) except Exception as e: print(e.args) def second_param_image_url(text): doc = pq(text) image = doc('#wallpaper') try: url = image.attr.src if url and len(url) != 0: real_image_lists.add(url) except Exception as e: print(e.args) async def get_image_url(): # windows限制最多打开文件数为509 concurrent.set(asyncio.Semaphore(50)) ua = fake_useragent.UserAgent() tasks = [asyncio.create_task(catch_url(base_url, 'asuna', i, ua.random)) for i in range(20)] await asyncio.wait(tasks, return_when="ALL_COMPLETED") print(f"获取url{len(image_lists)}个") new_tasks = [asyncio.create_task(catch_image_url(raw_url, ua.random)) for raw_url in image_lists] await asyncio.wait(new_tasks, return_when='ALL_COMPLETED') print(f"捕获url{len(real_image_lists)}个") print(real_image_lists) if __name__ == "__main__": asyncio.run(get_image_url()) with open('images.txt', 'w') as f: for image in real_image_lists: f.write("http:" + image+'\n')

    下载图片

    import aiohttp from tqdm import tqdm import asyncio from contextvars import ContextVar import fake_useragent # 定义全局上下文管理器 concurrent = ContextVar("concurrent") # 一次下载大小 chunk_size = 1024 # 代理 proxies = 'http://localhost:1080' URL_PATH = 'images.txt' async def download_image(url, ua): headers = { 'cache-control': 'max-age=0', 'referer': 'https://alpha.wallhaven.cc/', 'upgrade-insecure-requests': '1', 'user-agent': ua } sem = concurrent.get() try: async with sem: async with aiohttp.ClientSession() as session: async with session.get(url, headers=headers, proxy=proxies) as resp: # file_size = resp.headers.get('content-length') # if file_size: # file_size = int(file_size) # else: # file_size = "未知" name = url.split('/')[-1] # pbar = tqdm(unit="B", unit_scale=True, desc=name) with open(name, 'wb') as f: while True: chunk = await resp.content.read(chunk_size) if not chunk: break f.write(chunk) # pbar.update(chunk_size) # pbar.close() except Exception as e: print(e.args) def get_url(url_path=URL_PATH): with open(url_path, 'r') as f: while True: url = f.readline() if url: yield url.strip('\n') else: break async def main(): # windows限制最多打开文件数为509 concurrent.set(asyncio.Semaphore(50)) ua = fake_useragent.UserAgent() urls = get_url() tasks = [asyncio.create_task(download_image(url, ua.random)) for url in urls] await asyncio.wait(tasks, return_when="ALL_COMPLETED") if __name__ == "__main__": asyncio.run(main()) print("完毕")

    最后附上我亚神的高清大图

    最新回复(0)