Skip to main content

async-multi

在 Python 中实现并发编程,由于全局解释器锁(GIL)的存在,多线程并不能真正并行执行 CPU 密集型任务(但能并发处理 I/O 密集型任务)。因此,Python 提供了多种并发模型:多线程多进程异步 I/O。同时,为了保证共享资源的正确访问,需要使用同步机制(锁、信号量、条件变量等)。

1. 三种并发模型

模型适用场景优势局限
多线程 (threading)I/O 密集型、网络请求、文件读写轻量、共享内存、切换开销小GIL 限制 CPU 并行
多进程 (multiprocessing)CPU 密集型(计算、图像处理)真正的并行(多核)内存开销大、IPC 复杂
asyncio高并发 I/O(万级连接)单线程事件循环,极低开销需要 async/await 语法

2. 同步原语(避免竞态条件)

当多个线程/进程同时访问共享资源(变量、文件、网络套接字)时,需要同步。Python 的 threadingmultiprocessing 模块提供了几乎同名的同步工具。

2.1 锁 (Lock)

最简单的同步机制,同一时刻只有一个执行单元能持有锁。

import threading

counter = 0
lock = threading.Lock()

def increment():
global counter
for _ in range(100000):
with lock: # 等价于 lock.acquire() + lock.release()
counter += 1

threads = [threading.Thread(target=increment) for _ in range(10)]
for t in threads: t.start()
for t in threads: t.join()
print(counter) # 1000000(正确)

如果不加锁,结果会远小于预期(数据竞争)。

2.2 RLock (可重入锁)

允许同一个线程多次获取锁,适用于递归调用或复杂嵌套逻辑。

2.3 信号量 (Semaphore)

允许最多 N 个线程同时访问资源。常用于限制连接池数量、数据库连接等。

sem = threading.Semaphore(3)

def fetch_data():
with sem:
# 最多同时 3 个线程执行这里
pass

2.4 事件 (Event)

一个线程等待另一个线程发出信号。常用于“等待初始化完成”、“停止工作线程”等场景。

event = threading.Event()

def worker():
event.wait() # 阻塞直到 event.set()
print("工作开始")

t = threading.Thread(target=worker)
t.start()
# ... 主线程做一些准备
event.set() # 唤醒等待的线程

2.5 条件变量 (Condition)

比事件更灵活,可以等待某个条件成立(如队列非空)。通常与锁配合使用。

import threading

queue = []
cond = threading.Condition()

def producer():
with cond:
queue.append("item")
cond.notify() # 通知一个等待的消费者

def consumer():
with cond:
while not queue: # 防止虚假唤醒
cond.wait()
item = queue.pop()

2.6 屏障 (Barrier)

等待多个线程到达同一个点,然后同时继续执行。

barrier = threading.Barrier(3)

def task():
print("开始准备")
# ... 阶段1
barrier.wait()
print("全部就绪,同时进行阶段2")

3. 进程间同步

multiprocessing 提供了类似的同步原语:Lock, Semaphore, Event, Condition, Barrier。用法与多线程几乎相同,但底层使用操作系统级的 IPC 原语。

对于进程间数据共享,可以使用 Value, ArrayManager(更慢但更灵活)。

import multiprocessing

counter = multiprocessing.Value('i', 0)
lock = multiprocessing.Lock()

def increment():
with lock:
counter.value += 1

4. 异步编程中的同步

asyncio 中,共享资源通常不需要锁(因为单线程),但当使用 await 切换任务时仍可能产生竞态条件。例如两个协程同时读写一个可变对象。此时应使用 asyncio.Lock

import asyncio

lock = asyncio.Lock()
shared_resource = 0

async def modify():
async with lock:
# 临界区
global shared_resource
temp = shared_resource
await asyncio.sleep(0) # 可能切换任务
shared_resource = temp + 1

其他同步原语:asyncio.Semaphore, asyncio.Event, asyncio.Condition, asyncio.Queue(常用于生产者-消费者)。

5. 常见陷阱与最佳实践

  • 死锁:获取锁的顺序不一致,或忘记释放锁。使用 with 语句自动管理。
  • 锁粒度:过大会降低并发性能,过小可能无法保证正确性。
  • GIL 与多线程:对于 CPU 密集型任务,使用 multiprocessingconcurrent.futures.ProcessPoolExecutor
  • 共享状态越少越好:优先使用队列(queue.Queue / multiprocessing.Queue / asyncio.Queue)在线程/进程/协程间传递数据,可以避免大部分锁。
  • 线程安全的数据结构queue.Queuecollections.deque(某些操作原子)、threading.local(线程局部存储)无需锁。

6. 总结对比

场景推荐方案
大量 HTTP 请求,爬虫asyncio + aiohttp
读写文件,调用数据库threading (或 concurrent.futures)
图像处理、机器学习训练multiprocessing 或 原生多进程设计
既要 CPU 密集又要高 I/O 并发混合:asyncio 负责 I/O,ProcessPoolExecutor 卸载计算

选择合适的并发模型 + 正确的同步机制,是写出高效、正确 Python 程序的关键。如果只是简单的并行循环,可以先考虑 concurrent.futures 的高级接口,它内部封装了线程或进程池,并提供了 Future 对象方便获取结果。