首页 论坛 置顶 Python 队列示例

正在查看 1 个帖子:1-1 (共 1 个帖子)
  • 作者
    帖子
  • #25347

    引言

    Python内置的 queue 模块是在线程或进程之间安全传递数据的关键工具。然而,许多开发者忽视了选择合适的队列类型——FIFO、LIFO或优先级——对并发逻辑的重要性。当你应该使用 Queue 而不是 PriorityQueue 时,如何防止在繁忙的工作线程中出现死锁或数据丢失呢?

    关键在于理解每种队列在负载下的行为。通过将你的数据流和锁定策略与队列类相匹配,你可以避免微妙的错误,提高吞吐量,并编写更清晰的代码。让我们深入具体的例子,以便你能够为你的场景选择最佳的队列。

    基本队列使用

    最简单的队列是FIFO Queue。它保证先进先出(FIFO)的顺序,并在内部处理锁定。你可以创建一个并在多个线程之间共享,而无需额外的锁。

    from queue import Queue
    q = Queue(maxsize=10)  # 限制容量
    q.put('task1')
    task = q.get()
    
    要点:
    • maxsize=0 表示无限容量。
    • put() 在队列满时会阻塞;get() 在队列空时会阻塞。
    • 使用 task_done()join() 进行生产者-消费者同步。

    提示:在处理 get() 后,始终调用 task_done() 以表示完成。

    该模式防止生产者淹没消费者,并帮助您等待所有任务完成,使用 q.join()

    使用队列进行线程处理

    当您生成工作线程时,Queue 是您最安全的共享结构。线程通过调用 get() 来获取工作,通过 put() 来添加结果。

    1. 创建一个 Queue
    2. 启动 N 个工作线程,目标是一个工作函数。
    3. 将任务加入队列。
    4. 调用 join() 以阻塞,直到所有任务被处理完毕。
    import threading
    from queue import Queue
    
    def worker(q):
        while True:
            item = q.get()
           if item is None:
                break
            process(item)
            q.task_done()
    
    q = Queue()
    threads = []
    for _ in range(4):
        t = threading.Thread(target=worker, args=(q,))
        t.start()
        threads.append(t)
    
    for item in range(10):
        q.put(item)
    
    q.join()
    for _ in threads:
        q.put(None)
    for t in threads:
        t.join()

    有关线程模式的更多信息,请参见此 Python 线程示例

    多进程队列

    multiprocessing 模块提供了自己的 Queue 以实现进程安全的通信。与 queue.Queue 不同,这个队列在底层使用管道和锁。

    from multiprocessing import Process, Queue
    
    def worker(q):
        q.put('来自进程的结果')
    
    if __name__ == '__main__':
        q = Queue()
        p = Process(target=worker, args=(q,))
        p.start()
        print(q.get())  # 从子进程接收数据
        p.join()
    

    实用提示:

    • 避免使用大型对象;它们可能会减慢序列化速度。
    • 完成后在进程上使用 join(),在队列上使用 close()
    • 对于许多生产者,考虑使用 Manager().Queue()

    这可以让你在多个核心之间安全地并行处理 CPU 密集型任务。

    优先级队列

    当顺序按优先级重要时,切换到 PriorityQueue。每个条目是一个元组 (priority, data)

    from queue import PriorityQueue
    
    pq = PriorityQueue()
    pq.put((2, '低优先级任务'))
    
    pq.put((1, '高优先级任务'))
    print(pq.get()[1])  # 打印 '高优先级任务'

    使用案例:

    • 按截止日期调度任务。
    • 在网络服务器中管理任务优先级。
    • 实时事件处理。

    注意:较小的数字优先输出。对于相同优先级的情况,采用先进先出(FIFO)规则进行打破平局。

    将优先级与工作线程混合,只需将 Queue 替换为 PriorityQueue 即可。

    使用 asyncio 的队列

    在异步代码中,使用 asyncio.Queue。它与事件循环集成,并使用 await 而不是阻塞调用。

    import asyncio
    
    async def producer(q):
        for i in range(5):
            await q.put(i)
            print(f'put {i}')
    
    async def consumer(q):
        while True:
            item = await q.get()
            print(f'获得 {item}')
            q.task_done()
    
    async def main():
        q = asyncio.Queue()
        prod = asyncio.create_task(producer(q))
    
    cons = asyncio.create_task(consumer(q))
        await prod
        await q.join()
        cons.cancel()
    
    asyncio.run(main())

    这种模式使您的异步生产者和消费者在没有线程的情况下保持同步。

    常见陷阱和提示

    即使有内置的安全性,队列也可能被误用:

    • 忘记调用 task_done() 会导致 join() 挂起。
    • 没有哨兵值意味着线程永远不会退出。
    • 队列过满会意外阻塞生产者。

    最佳实践:

    • 始终发出线程退出的信号(例如,使用 None 作为哨兵)。
    • 选择较小的 maxsize 以防止内存膨胀。
    • 如果使用超时,请将 get() 包裹在 try/except 中。

    通过及早发现这些问题,您可以在流量激增时保持系统的可靠性。

    结论

    Python 的队列模块是管理并发的强大助手。从简单的 FIFO 队列到优先级调度和异步工作流,每个类都解决了明确的需求。了解何时使用 QueuePriorityQueueasyncio.Queue 有助于避免死锁、数据丢失以及线程或进程挂起。通过实际示例练习并遵循上述提示,将使您的代码更加健壮和可维护。现在轮到您了:选择一种队列类型,构建一个小型的生产者-消费者任务,看看并发如何运作。

正在查看 1 个帖子:1-1 (共 1 个帖子)
  • 哎呀,回复话题必需登录。