理解异步编程
异步 ≠ 多线程。异步的核心是事件循环(Event Loop),通过任务切换实现并发,避免阻塞等待 I/O 的 CPU 空转。
1. async/await 基础
import asyncio
async def fetch_data(url: str) -> str:
"""模拟异步 I/O 操作"""
print(f"Fetching {url}...")
await asyncio.sleep(1) # 模拟网络请求
return f"Data from {url}"
async def main():
# 顺序执行
r1 = await fetch_data("/api/users")
r2 = await fetch_data("/api/posts") # 等 r1 完成后再执行
# 并发执行
r1, r2 = await asyncio.gather(
fetch_data("/api/users"),
fetch_data("/api/posts"),
) # 两个请求同时发出!
asyncio.run(main())
2. 异步爬虫实战
import asyncio
import aiohttp
from bs4 import BeautifulSoup
async def fetch(url: str, session: aiohttp.ClientSession) -> tuple:
try:
async with session.get(url, timeout=10) as resp:
return url, resp.status, await resp.text()
except Exception as e:
return url, None, str(e)
async def crawl_parallel(urls: list[str], concurrency: int = 5):
semaphore = asyncio.Semaphore(concurrency)
async def bounded_fetch(url):
async with semaphore:
return await fetch(url, session)
connector = aiohttp.TCPConnector(limit=concurrency)
async with aiohttp.ClientSession(connector=connector) as session:
tasks = [bounded_fetch(url) for url in urls]
results = await asyncio.gather(*tasks)
for url, status, html in results:
if status == 200:
soup = BeautifulSoup(html, 'html.parser')
title = soup.title.string if soup.title else "No title"
print(f"{url} → {title}")
else:
print(f"{url} → Error: {status}")
# 运行
urls = [f"https://example.com/page/{i}" for i in range(100)]
asyncio.run(crawl_parallel(urls, concurrency=10))
3. 异步上下文管理器
class AsyncDB:
async def __aenter__(self):
self.conn = await create_async_connection()
return self.conn
async def __aexit__(self, *args):
await self.conn.close()
async with AsyncDB() as db:
users = await db.fetch("SELECT * FROM users")
4. asyncio 常见陷阱
| 陷阱 | 正确做法 |
|---|---|
| 在 async 函数中调用 time.sleep() | 使用 await asyncio.sleep() |
| 同步阻塞库(requests) | 使用异步替代(aiohttp) |
| CPU 密集任务在事件循环中 | 用 run_in_executor() 放到线程池 |
| 忘记创建 Session | 复用 aiohttp.ClientSession() |
总结
Python 异步编程的核心是理解事件循环和协程调度。asyncio + aiohttp 组合能让 I/O 密集型任务的吞吐量提升数十倍。