Python 队列示例

引言

Python内置的 queue 模块是在线程或进程之间安全传递数据的关键工具。然而,许多开发者忽视了选择合适的队列类型——FIFO、LIFO或优先级——对并发逻辑的重要性。当你应该使用 Queue 而不是 PriorityQueue 时,如何防止在繁忙的工作线程中出现死锁或数据丢失呢?

关键在于理解每种队列在负载下的行为。通过将你的数据流和锁定策略与队列类相匹配,你可以避免微妙的错误,提高吞吐量,并编写更清晰的代码。让我们深入具体的例子,以便你能够为你的场景选择最佳的队列。

基本队列使用

最简单的队列是FIFO Queue。它保证先进先出(FIFO)的顺序,并在内部处理锁定。你可以创建一个并在多个线程之间共享,而无需额外的锁。

from queue import Queue
q = Queue(maxsize=10)  # 限制容量
q.put('task1')
task = q.get()
要点:
  • maxsize=0 表示无限容量。
  • put() 在队列满时会阻塞;get() 在队列空时会阻塞。
  • 使用 task_done()join() 进行生产者-消费者同步。

提示:在处理 get() 后,始终调用 task_done() 以表示完成。

该模式防止生产者淹没消费者,并帮助您等待所有任务完成,使用 q.join()

使用队列进行线程处理

当您生成工作线程时,Queue 是您最安全的共享结构。线程通过调用 get() 来获取工作,通过 put() 来添加结果。

  1. 创建一个 Queue
  2. 启动 N 个工作线程,目标是一个工作函数。
  3. 将任务加入队列。
  4. 调用 join() 以阻塞,直到所有任务被处理完毕。
import threading
from queue import Queue

def worker(q):
    while True:
        item = q.get()
       if item is None:
            break
        process(item)
        q.task_done()

q = Queue()
threads = []
for _ in range(4):
    t = threading.Thread(target=worker, args=(q,))
    t.start()
    threads.append(t)

for item in range(10):
    q.put(item)

q.join()
for _ in threads:
    q.put(None)
for t in threads:
    t.join()

有关线程模式的更多信息,请参见此 Python 线程示例

多进程队列

multiprocessing 模块提供了自己的 Queue 以实现进程安全的通信。与 queue.Queue 不同,这个队列在底层使用管道和锁。

from multiprocessing import Process, Queue

def worker(q):
    q.put('来自进程的结果')

if __name__ == '__main__':
    q = Queue()
    p = Process(target=worker, args=(q,))
    p.start()
    print(q.get())  # 从子进程接收数据
    p.join()

实用提示:

  • 避免使用大型对象;它们可能会减慢序列化速度。
  • 完成后在进程上使用 join(),在队列上使用 close()
  • 对于许多生产者,考虑使用 Manager().Queue()

这可以让你在多个核心之间安全地并行处理 CPU 密集型任务。

优先级队列

当顺序按优先级重要时,切换到 PriorityQueue。每个条目是一个元组 (priority, data)

from queue import PriorityQueue

pq = PriorityQueue()
pq.put((2, '低优先级任务'))

pq.put((1, '高优先级任务'))
print(pq.get()[1])  # 打印 '高优先级任务'

使用案例:

  • 按截止日期调度任务。
  • 在网络服务器中管理任务优先级。
  • 实时事件处理。

注意:较小的数字优先输出。对于相同优先级的情况,采用先进先出(FIFO)规则进行打破平局。

将优先级与工作线程混合,只需将 Queue 替换为 PriorityQueue 即可。

使用 asyncio 的队列

在异步代码中,使用 asyncio.Queue。它与事件循环集成,并使用 await 而不是阻塞调用。

import asyncio

async def producer(q):
    for i in range(5):
        await q.put(i)
        print(f'put {i}')

async def consumer(q):
    while True:
        item = await q.get()
        print(f'获得 {item}')
        q.task_done()

async def main():
    q = asyncio.Queue()
    prod = asyncio.create_task(producer(q))

cons = asyncio.create_task(consumer(q))
    await prod
    await q.join()
    cons.cancel()

asyncio.run(main())

这种模式使您的异步生产者和消费者在没有线程的情况下保持同步。

常见陷阱和提示

即使有内置的安全性,队列也可能被误用:

  • 忘记调用 task_done() 会导致 join() 挂起。
  • 没有哨兵值意味着线程永远不会退出。
  • 队列过满会意外阻塞生产者。

最佳实践:

  • 始终发出线程退出的信号(例如,使用 None 作为哨兵)。
  • 选择较小的 maxsize 以防止内存膨胀。
  • 如果使用超时,请将 get() 包裹在 try/except 中。

通过及早发现这些问题,您可以在流量激增时保持系统的可靠性。

结论

Python 的队列模块是管理并发的强大助手。从简单的 FIFO 队列到优先级调度和异步工作流,每个类都解决了明确的需求。了解何时使用 QueuePriorityQueueasyncio.Queue 有助于避免死锁、数据丢失以及线程或进程挂起。通过实际示例练习并遵循上述提示,将使您的代码更加健壮和可维护。现在轮到您了:选择一种队列类型,构建一个小型的生产者-消费者任务,看看并发如何运作。

更多