async-multi
在 Python 中实现并发编程,由于全局解释器锁(GIL)的存在,多线程并不能真正并行执行 CPU 密集型任务(但能并发处理 I/O 密集型任务)。因此,Python 提供了多种并发模型:多线程、多进程、异步 I/O。同时,为了保证共享资源的正确访问,需要使用同步机制(锁、信号量、条件变量等)。
1. 三种并发模型
| 模型 | 适用场景 | 优势 | 局限 |
|---|---|---|---|
| 多线程 (threading) | I/O 密集型、网络请求、文件读写 | 轻量、共享内存、切换开销小 | GIL 限制 CPU 并行 |
| 多进程 (multiprocessing) | CPU 密集型(计算、图像处理) | 真正的并行(多核) | 内存开销大、IPC 复杂 |
| asyncio | 高并发 I/O(万级连接) | 单线程事件循环,极低开销 | 需要 async/await 语法 |
2. 同步原语(避免竞态条件)
当多个线程/进程同时访问共享资源(变量、文件、网络套接字)时,需要同步。Python 的 threading 和 multiprocessing 模块提供了几乎同名的同步工具。
2.1 锁 (Lock)
最简单的同步机制,同一时刻只有一个执行单元能持有锁。
import threading
counter = 0
lock = threading.Lock()
def increment():
global counter
for _ in range(100000):
with lock: # 等价于 lock.acquire() + lock.release()
counter += 1
threads = [threading.Thread(target=increment) for _ in range(10)]
for t in threads: t.start()
for t in threads: t.join()
print(counter) # 1000000(正确)
如果不加锁,结果会远小于预期(数据竞争)。
2.2 RLock (可重入锁)
允许同一个线程多次获取锁,适用于递归调用或复杂嵌套逻辑。
2.3 信号量 (Semaphore)
允许最多 N 个线程同时访问资源。常用于限制连接池数量、数据库连接等。
sem = threading.Semaphore(3)
def fetch_data():
with sem:
# 最多同时 3 个线程执行这里
pass
2.4 事件 (Event)
一个线程等待另一个线程发出信号。常用于“等待初始化完成”、“停止工作线程”等场景。
event = threading.Event()
def worker():
event.wait() # 阻塞直到 event.set()
print("工作开始")
t = threading.Thread(target=worker)
t.start()
# ... 主线程做一些准备
event.set() # 唤醒等待的线程
2.5 条件变量 (Condition)
比事件更灵活,可以等待某个条件成立(如队列非空)。通常与锁配合使用。
import threading
queue = []
cond = threading.Condition()
def producer():
with cond:
queue.append("item")
cond.notify() # 通知一个等待的消费者
def consumer():
with cond:
while not queue: # 防止虚假唤醒
cond.wait()
item = queue.pop()
2.6 屏障 (Barrier)
等待多个线程到达同一个点,然后同时继续执行。
barrier = threading.Barrier(3)
def task():
print("开始准备")
# ... 阶段1
barrier.wait()
print("全部就绪,同时进行阶段2")
3. 进程间同步
multiprocessing 提供了类似的同步原语:Lock, Semaphore, Event, Condition, Barrier。用法与多线程几乎相同,但底层使用操作系统级的 IPC 原语。
对于进程间数据共享,可以使用 Value, Array 或 Manager(更慢但更灵活)。
import multiprocessing
counter = multiprocessing.Value('i', 0)
lock = multiprocessing.Lock()
def increment():
with lock:
counter.value += 1
4. 异步编程中的同步
在 asyncio 中,共享资源通常不需要锁(因为单线程),但当使用 await 切换任务时仍可能产生竞态条件。例如两个协程同时读写一个可变对象。此时应使用 asyncio.Lock。
import asyncio
lock = asyncio.Lock()
shared_resource = 0
async def modify():
async with lock:
# 临界区
global shared_resource
temp = shared_resource
await asyncio.sleep(0) # 可能切换任务
shared_resource = temp + 1
其他同步原语:asyncio.Semaphore, asyncio.Event, asyncio.Condition, asyncio.Queue(常用于生产者-消费者)。
5. 常见陷阱与最佳实践
- 死锁:获取锁的顺序不一致,或忘记释放锁。使用
with语句自动管理。 - 锁粒度:过大会降低并发性能,过小可能无法保证正确性。
- GIL 与多线程:对于 CPU 密集型任务,使用
multiprocessing或concurrent.futures.ProcessPoolExecutor。 - 共享状态越少越好:优先使用队列(
queue.Queue/multiprocessing.Queue/asyncio.Queue)在线程/进程/协程间传递数据,可以避免大部分锁。 - 线程安全的数据结构:
queue.Queue、collections.deque(某些操作原子)、threading.local(线程局部存储)无需锁。
6. 总结对比
| 场景 | 推荐方案 |
|---|---|
| 大量 HTTP 请求,爬虫 | asyncio + aiohttp |
| 读写文件,调用数据库 | threading (或 concurrent.futures) |
| 图像处理、机器学习训练 | multiprocessing 或 原生多进程设计 |
| 既要 CPU 密集又要高 I/O 并发 | 混合:asyncio 负责 I/O,ProcessPoolExecutor 卸载计算 |
选择合适的并发模型 + 正确的同步机制,是写出高效、正确 Python 程序的关键。如果只是简单的并行循环,可以先考虑 concurrent.futures 的高级接口,它内部封装了线程或进程池,并提供了 Future 对象方便获取结果。