V2EX = way to explore
V2EX 是一个关于分享和探索的地方
现在注册
已注册用户请  登录
推荐学习书目
Learn Python the Hard Way
Python Sites
PyPI - Python Package Index
http://diveintopython.org/toc/index.html
Pocoo
值得关注的项目
PyPy
Celery
Jinja2
Read the Docs
gevent
pyenv
virtualenv
Stackless Python
Beautiful Soup
结巴中文分词
Green Unicorn
Sentry
Shovel
Pyflakes
pytest
Python 编程
pep8 Checker
Styles
PEP 8
Google Python Style Guide
Code Style from The Hitchhiker's Guide
vegetableChick
V2EX  ›  Python

多线程消费, 队列没有清空就退出了, 请教原因, 谢谢!

  •  1
     
  •   vegetableChick · 2022-01-18 17:42:56 +08:00 · 2753 次点击
    这是一个创建于 1065 天前的主题,其中的信息可能已经有所发展或是发生改变。

    我想通过 ThreadPoolExecutor 使用多个线程来消耗 redis 队列。但进程在队列没有消耗完的情况下退出了

    下面是实现代码

    from concurrent.futures import ThreadPoolExecutor
    
    import redis
    from redis import Redis
    
    
    
    pool = redis.ConnectionPool(
        max_connections=settings.REDIS_POOL_MAX_CLIENT,
        host=settings.REDIS_HOST,
        port=settings.REDIS_PORT,
        db=8,
        decode_responses=True
    )
    
    redis_con = Redis(connection_pool=pool)
    
    
    class BasicTask(object):
         def __init__(self,
                     consume_queue_name=None,
                     thread_num=50):
            self.consume_queue_name = consume_queue_name
            self.thread_num = thread_num
    
        def _consume(self):
            try:
                with ThreadPoolExecutor(max_workers=self.thread_num) as e:
                    e.map(self._do_request, range(0, self.thread_num))
            except Exception as e:
                self.logger.error(f"[consume error]: {e}")
    
        def _do_request(self, _):
            try:
                with redis_con as redis_conn:
                    while 1:
                        account_id_info = redis_conn.rpop(self.consume_queue_name)
                        if account_id_info:
                            try:
                                # django orm save db
                                ...
    
                            except Exception as e:
                                import traceback as tb
                                tb.print_exc()
                                self.logger.error(f"[consume error]: {e}. ")
    
                        else:
                            break
    
            except Exception as e:
                self.logger.error(f"[Unexpected Error: {e}]")
                import traceback as tb
                tb.print_exc()
    
        def run(self):
            self._consume()
    
    
    
    # run 
    BasicTask(consume_queue_name="base_list_queue").run()
    
    

    请问 bug 写在哪里了? 感谢大佬

    python3.7.3

    2 条回复    2022-01-18 18:52:44 +08:00
    MoYi123
        1
    MoYi123  
       2022-01-18 17:51:13 +08:00
    唯一的可能性就是 if account_id_info: 后面的 break 了吧, 不然都是有日志的.
    wuwukai007
        2
    wuwukai007  
       2022-01-18 18:52:44 +08:00
    不是应该 brpop 之后在开启多线程 执行任务吗,怎么多线程 brpop 了
    关于   ·   帮助文档   ·   博客   ·   API   ·   FAQ   ·   实用小工具   ·   999 人在线   最高记录 6679   ·     Select Language
    创意工作者们的社区
    World is powered by solitude
    VERSION: 3.9.8.5 · 23ms · UTC 21:47 · PVG 05:47 · LAX 13:47 · JFK 16:47
    Developed with CodeLauncher
    ♥ Do have faith in what you're doing.