V2EX = way to explore
V2EX 是一个关于分享和探索的地方
现在注册
已注册用户请  登录
推荐学习书目
Learn Python the Hard Way
Python Sites
PyPI - Python Package Index
http://diveintopython.org/toc/index.html
Pocoo
值得关注的项目
PyPy
Celery
Jinja2
Read the Docs
gevent
pyenv
virtualenv
Stackless Python
Beautiful Soup
结巴中文分词
Green Unicorn
Sentry
Shovel
Pyflakes
pytest
Python 编程
pep8 Checker
Styles
PEP 8
Google Python Style Guide
Code Style from The Hitchhiker's Guide
djasdjds
V2EX  ›  Python

大佬求解 sqlalchemy,项目中偶然发现 postgres+flask_sqlalchemy 在 query 时存在内存泄露问题

  •  
  •   djasdjds · 2022-09-20 09:58:54 +08:00 · 2262 次点击
    这是一个创建于 840 天前的主题,其中的信息可能已经有所发展或是发生改变。

    先说环境: 1.python3.6-python3.9 都有使用,均发现有问题,gc 打印没有不可回收的对象,排除 python 本身的问题。 2.SQLAlchemy 版本 1.3-最新 1.4.41 都存在查询完内存不释放的问题,特别的是用原生 sql 查询会比 orm model 查询释放内存少的多,数据库表为了测试存放了 6000 多条数据,实际项目在正常运行中,会一段时间由于 sqlalchemy 内存不释放内存逐渐增大,直到 oom ,用了很多内存分析工具,定位出下面,不知道怎么可以解决,还是自己项目使用上哪里有问题,sqlalchemy 这么大的库,存在这种问题没有被发现

    参考了 sqlalchemy 官方文档的各种查询语句都有问题,写的 demo 是,起一个多线程,然后查询,线程销毁后,内存只释放了一部分,6000 多条数据 orm 查完后仍然有 25M 没释放 a = session.query(TaskModel).all() 发不了图:我发下运行结果: begin 57.20703125 end 106.94140625 finish!! end 82.31640625 cost: 25.109375

    使用: a = session.execute("select * from task").all() 内存释放很多: begin 57.2109375 end 104.3515625 finish!! end 61.8046875 cost: 4.59375

    # -*- coding: utf-8 -*-
    import enum
    import gc
    import os
    import threading
    # from sqlalchemy.orm import Session
    import time
    from contextlib import contextmanager
    from datetime import datetime
    
    import psutil
    from flask import Flask
    from flask_sqlalchemy import SQLAlchemy
    from sqlalchemy import Column, TIMESTAMP, VARCHAR, Integer, Text, Enum, BOOLEAN
    
    # metadata = MetaData()
    # Base = declarative_base()
    
    user = "postgres"
    pwd = ""
    host = ""
    port = 
    database = ""
    db_uri = f"postgresql+psycopg2://{user}:{pwd}@{host}:{port}/{database}"
    
    app = Flask(__name__)
    app.config["SQLALCHEMY_DATABASE_URI"] = db_uri
    app.config["SQLALCHEMY_TRACK_MODIFICATIONS"] = False
    app.config["SQLALCHEMY_RECORD_QUERIES"] = False
    # app.config["SQLALCHEMY_ECHO"] = True
    engine_options = {
        "pool_size": 50,
        "pool_timeout": 30,
        "pool_recycle": 2 * 60 * 60,
        "max_overflow": 10,
        # "echo": True,
        "query_cache_size": 0,
        "execution_options":
            {
                "compiled_cache": None,
            }
    }
    
    app.config["SQLALCHEMY_ENGINE_OPTIONS"] = engine_options
    db = SQLAlchemy(app, session_options={"expire_on_commit": False})
    
    # engine = create_engine(db_uri)
    # engine = db.get_engine()
    # engine = create_engine(db_uri, future=True, **engine_options)
    # Session = sessionmaker(autoflush=True, bind=engine)
    
    @enum.unique
    class TaskStatus(enum.Enum):
        INIT = "INIT"
        RUNNING = "RUNNING"
        SUCCESS = "SUCCESS"
        FAILED = "FAILED"
        FAILED_NEED_ROLLBACK = "FAILED_NEED_ROLLBACK"
        FAILED_NEED_CLEAR = "FAILED_NEED_CLEAR"
        TIMEOUT = "TIMEOUT"
        PARTIAL_SUCCESS = "PARTIAL_SUCCESS"
        PARTIAL_SUCCESS_NEED_CLEAR = "PARTIAL_SUCCESS_NEED_CLEAR"
        CLEAN_SUCCESS = "CLEAN_SUCCESS"
        CLEAN_FAILED = "CLEAN_FAILED"
        ROLLBACK_SUCCESS = "ROLLBACK_SUCCESS"
        ROLLBACK_FAILED = "ROLLBACK_FAILED"
    
    
    class TaskModel(db.Model):
        __tablename__ = "task"
        id = Column(VARCHAR(64), primary_key=True)  # uuid
        task_id = Column(VARCHAR(32), nullable=False)  # Internal definition
        name = Column(VARCHAR(128))
        description = Column(Text)
        
        ***省略
    
    
    @contextmanager
    def get_session():
        try:
            # session = Session()
            session = db.session
            yield session
            session.commit()
        except:
            session.rollback()
            raise
        finally:
            session.close()
            session.remove()
    
    
    def sleep_fun():
        print("sleep_fun")
        global begin_rss
        begin_rss = psutil.Process(os.getpid()).memory_info().rss / 1024 / 1024
    
        for j in range(1):
            print(j)
            # with Session(engine) as session:
            #     a = session.query(TaskModel).all()
    
            # import sqlalchemy.sql.selectable.Select
            # with Session(engine, future=True) as session:
            #     a = session.execute(select(TaskModel)).all()
    
            # with get_session() as session:
            #     stat = session.query(TaskModel)
            #     a = session.execute(str(stat)).all()
                # a = session.execute(stat).all()
    
            with get_session() as session:
                a = session.execute("select * from task").all()
                # a = session.query(TaskModel).all()
                print(psutil.Process(os.getpid()).memory_info().rss / 1024 / 1024)
                print(2222, len(a))
            # with Session(engine) as session:
            #     statement = select(TaskModel)
            #     a = session.exec(str(statement)).all()
            #     print(2222, len(a))
    
        print("begin", begin_rss)
        print("end", psutil.Process(os.getpid()).memory_info().rss / 1024 / 1024)
        print("finish!!")
    
    
    t = threading.Thread(target=sleep_fun)
    t.start()
    t.join()
    gc.collect()
    
    print("end")
    
    end_rss = psutil.Process(os.getpid()).memory_info().rss / 1024 / 1024
    print(end_rss)
    
    print("cost: ", end_rss - begin_rss)
    time.sleep(1000000)
    
    
    

    请忽略注释的部分,试了这些办法都有释放不了的情况, 之前下了 sqlalchemy 1.4.40 的源码,打断点发现内存释放不了在于 context.py 的最后 loading.instances 中,

    @classmethod
        def orm_setup_cursor_result(
            cls,
            session,
            statement,
            params,
            execution_options,
            bind_arguments,
            result,
        ):
            execution_context = result.context
            compile_state = execution_context.compiled.compile_state
    
            # cover edge case where ORM entities used in legacy select
            # were passed to session.execute:
            # session.execute(legacy_select([User.id, User.name]))
            # see test_query->test_legacy_tuple_old_select
    
            load_options = execution_options.get(
                "_sa_orm_load_options", QueryContext.default_load_options
            )
            if compile_state.compile_options._is_star:
                return result
    
            querycontext = QueryContext(
                compile_state,
                statement,
                params,
                session,
                load_options,
                execution_options,
                bind_arguments,
            )
    
            return loading.instances(result, querycontext)
    

    loading.py

    
        try:
            (process, labels, extra) = list(
                zip(
                    *[
                        query_entity.row_processor(context, cursor)
                        for query_entity in context.compile_state._entities
                    ]
                )
            )
            import sys
            print(process, labels, extra)
            if context.yield_per and (
                context.loaders_require_buffering
                or context.loaders_require_uniquing
            ):
                raise sa_exc.InvalidRequestError(
                    "Can't use yield_per with eager loaders that require uniquing "
                    "or row buffering, e.g. joinedload() against collections "
                    "or subqueryload().  Consider the selectinload() strategy "
                    "for better flexibility in loading objects."
                )
    

    一跑到这部分代码后,就释放不了内存了

    10 条回复    2023-06-02 11:23:47 +08:00
    sivacohan
        1
    sivacohan  
       2022-09-20 18:07:54 +08:00
    加个 gc.collect() 看看
    lolizeppelin
        2
    lolizeppelin  
       2022-09-21 09:39:49 +08:00
    纯查询 commit 干嘛....用事务又没见 begin

    非查询先 flush 再 commit
    djasdjds
        3
    djasdjds  
    OP
       2022-09-21 15:17:17 +08:00
    @sivacohan 查询完有手动 gc ,内存还是不变
    djasdjds
        4
    djasdjds  
    OP
       2022-09-21 15:20:19 +08:00
    commit 是项目框架统一封装的,和这个没关系,查询加不加都有内存泄露
    lookStupiToForce
        5
    lookStupiToForce  
       2022-09-22 18:49:48 +08:00
    rgengpbinvest111
        6
    rgengpbinvest111  
       2022-10-25 17:48:06 +08:00
    求问您的问题是否已解决,我也遇到类似的问题但不知道如何解决
    djasdjds
        7
    djasdjds  
    OP
       2022-11-26 16:58:01 +08:00
    无解决
    onnethy
        8
    onnethy  
       2022-12-09 15:04:13 +08:00
    @djasdjds 我这边用下面的方式解决了
    res = db.session.execute("select * from task").fetchall()
    """
    do something for res
    """
    del res
    gc.collect()
    djasdjds
        9
    djasdjds  
    OP
       2023-01-11 16:12:30 +08:00
    @onnethy 题目上已经说了,用 sql 是正常的,orm 内存不释放
    关于   ·   帮助文档   ·   博客   ·   API   ·   FAQ   ·   实用小工具   ·   1016 人在线   最高记录 6679   ·     Select Language
    创意工作者们的社区
    World is powered by solitude
    VERSION: 3.9.8.5 · 30ms · UTC 21:48 · PVG 05:48 · LAX 13:48 · JFK 16:48
    Developed with CodeLauncher
    ♥ Do have faith in what you're doing.