首页 › 论坛 › 置顶 › Python 队列示例
-
作者帖子
-
2025-08-07 15:20 #25347Q QPY课程团队管理员
引言
Python内置的
queue
模块是在线程或进程之间安全传递数据的关键工具。然而,许多开发者忽视了选择合适的队列类型——FIFO、LIFO或优先级——对并发逻辑的重要性。当你应该使用Queue
而不是PriorityQueue
时,如何防止在繁忙的工作线程中出现死锁或数据丢失呢?关键在于理解每种队列在负载下的行为。通过将你的数据流和锁定策略与队列类相匹配,你可以避免微妙的错误,提高吞吐量,并编写更清晰的代码。让我们深入具体的例子,以便你能够为你的场景选择最佳的队列。
基本队列使用
最简单的队列是FIFO
Queue
。它保证先进先出(FIFO)的顺序,并在内部处理锁定。你可以创建一个并在多个线程之间共享,而无需额外的锁。from queue import Queue
q = Queue(maxsize=10) # 限制容量 q.put('task1') task = q.get()
要点:maxsize=0
表示无限容量。put()
在队列满时会阻塞;get()
在队列空时会阻塞。- 使用
task_done()
和join()
进行生产者-消费者同步。
提示:在处理
get()
后,始终调用task_done()
以表示完成。该模式防止生产者淹没消费者,并帮助您等待所有任务完成,使用
q.join()
。使用队列进行线程处理
当您生成工作线程时,
Queue
是您最安全的共享结构。线程通过调用get()
来获取工作,通过put()
来添加结果。- 创建一个
Queue
。 - 启动 N 个工作线程,目标是一个工作函数。
- 将任务加入队列。
- 调用
join()
以阻塞,直到所有任务被处理完毕。
import threading from queue import Queue def worker(q): while True: item = q.get()
if item is None: break process(item) q.task_done() q = Queue() threads = [] for _ in range(4): t = threading.Thread(target=worker, args=(q,)) t.start() threads.append(t)
for item in range(10): q.put(item) q.join() for _ in threads: q.put(None) for t in threads: t.join()
有关线程模式的更多信息,请参见此 Python 线程示例。
多进程队列
multiprocessing
模块提供了自己的Queue
以实现进程安全的通信。与queue.Queue
不同,这个队列在底层使用管道和锁。from multiprocessing import Process, Queue def worker(q): q.put('来自进程的结果')
if __name__ == '__main__': q = Queue() p = Process(target=worker, args=(q,)) p.start() print(q.get()) # 从子进程接收数据 p.join()
实用提示:
- 避免使用大型对象;它们可能会减慢序列化速度。
- 完成后在进程上使用
join()
,在队列上使用close()
。 - 对于许多生产者,考虑使用
Manager().Queue()
。
这可以让你在多个核心之间安全地并行处理 CPU 密集型任务。
优先级队列
当顺序按优先级重要时,切换到
PriorityQueue
。每个条目是一个元组(priority, data)
。from queue import PriorityQueue pq = PriorityQueue() pq.put((2, '低优先级任务'))
pq.put((1, '高优先级任务')) print(pq.get()[1]) # 打印 '高优先级任务'
使用案例:
- 按截止日期调度任务。
- 在网络服务器中管理任务优先级。
- 实时事件处理。
注意:较小的数字优先输出。对于相同优先级的情况,采用先进先出(FIFO)规则进行打破平局。
将优先级与工作线程混合,只需将
Queue
替换为PriorityQueue
即可。使用 asyncio 的队列
在异步代码中,使用
asyncio.Queue
。它与事件循环集成,并使用await
而不是阻塞调用。import asyncio async def producer(q): for i in range(5): await q.put(i) print(f'put {i}')
async def consumer(q): while True: item = await q.get() print(f'获得 {item}') q.task_done() async def main(): q = asyncio.Queue() prod = asyncio.create_task(producer(q))
cons = asyncio.create_task(consumer(q)) await prod await q.join() cons.cancel() asyncio.run(main())
这种模式使您的异步生产者和消费者在没有线程的情况下保持同步。
常见陷阱和提示
即使有内置的安全性,队列也可能被误用:
- 忘记调用
task_done()
会导致join()
挂起。 - 没有哨兵值意味着线程永远不会退出。
- 队列过满会意外阻塞生产者。
最佳实践:
- 始终发出线程退出的信号(例如,使用
None
作为哨兵)。 - 选择较小的
maxsize
以防止内存膨胀。 - 如果使用超时,请将
get()
包裹在 try/except 中。
通过及早发现这些问题,您可以在流量激增时保持系统的可靠性。
结论
Python 的队列模块是管理并发的强大助手。从简单的 FIFO 队列到优先级调度和异步工作流,每个类都解决了明确的需求。了解何时使用
Queue
、PriorityQueue
或asyncio.Queue
有助于避免死锁、数据丢失以及线程或进程挂起。通过实际示例练习并遵循上述提示,将使您的代码更加健壮和可维护。现在轮到您了:选择一种队列类型,构建一个小型的生产者-消费者任务,看看并发如何运作。 -
作者帖子
- 哎呀,回复话题必需登录。