V2EX = way to explore
V2EX 是一个关于分享和探索的地方
现在注册
已注册用户请  登录
推荐学习书目
Learn Python the Hard Way
Python Sites
PyPI - Python Package Index
http://diveintopython.org/toc/index.html
Pocoo
值得关注的项目
PyPy
Celery
Jinja2
Read the Docs
gevent
pyenv
virtualenv
Stackless Python
Beautiful Soup
结巴中文分词
Green Unicorn
Sentry
Shovel
Pyflakes
pytest
Python 编程
pep8 Checker
Styles
PEP 8
Google Python Style Guide
Code Style from The Hitchhiker's Guide
wuwukai007
V2EX  ›  Python

fastsql 1.2.19 发布 (个人项目,有点丑陋)

  •  
  •   wuwukai007 · 2020-03-24 22:04:33 +08:00 · 2663 次点击
    这是一个创建于 1765 天前的主题,其中的信息可能已经有所发展或是发生改变。

    带有进度条显示的 快速迁移表的 python 包,快读 sql 生成 DataFrame,生成 CSV.

    https://github.com/tosmart01/fastsql

    • 在您 无法使用 load 或 dump 命令,或者所迁移表结构不同,需要对数据做处理时使用。
    • 标准化 多线程读写,无需关注读写过程,只需关注入库前数据的处理
    • 采用本地缓存,不占用内存,
    一、安装
    • pip install fast_sql
    二、依赖环境
    • Python3.6+
    • Mysql , Oracle
    二、示例
    import fast_sql
    
    多线程 读表生成 DataFrame
    # con 数据库连接字符串,或者 sqlalchemy 对象
    # chunksize 单个线程读取数量 默认 20000
    # show_progress 是否显示进度条
    # thread_num 线程数量
    # return Dataframe
    # 其他参数兼容 pandas read_sql
    con = "oracle+cx_oracle://wuwukai:wuwukai@localhost:1521/helowin"
    df = fast_sql.read_sql('select * from student where SNO<2000000',con,show_progress=True,
                           chunksize = 40000,
                           thread_num = 15,)
    
    Read the scheduler: 100%|█████████████████| 500001/500001 [01:20<00:00, 28192.45it/s]
    
    表迁移
    def astype_df(df):
        # 目标库 ctime 列为 str 类型,这里做 转换返回
        df.CTIME = df.CTIME.astype('str')
        return df
    
    # from_db 数据源
    # to_db 目标库
    # to_table 目标表名
    # if_exists 是否删除目标库数据 ( append,delete,other )
    # 如果 if_exists='delete' 可以指定删除语句 delete_sql = 'delete from xxx' ,默认使用源 sql delete
    # mode 迁移方式,rw 在线迁移,r 序列化到本地(需指定 save_path ),w 本地文件到数据库 (需指定 file_path)
    # delete_cache 是否删除迁移过程中 缓存文件,默认删除
    # data_processing 入库前数据是否做处理,如目标库 列 类型 不一致,列名不同等,接受一个函数,参数为入库前
    # DataFrame,需返回处理后的 DataFrame
    # chunksize 每个线程迁移数量
    # thread_num 每个线程读取数量
    # thread_w 写入线程数量
    con = create_engine("oracle+cx_oracle://wuwukai:wuwukai@localhost:1521/helowin")
    to_db = create_engine("mysql+pymysql://root:123456@localhost:3306/aps_2")
    sql = '''select * from student where SNO<2000000'''
    fast_sql.to_sql(sql,
                    from_db = con,
                    to_db = to_db,
                    to_table = 'stu',
                    if_exists='append',
                    mode='rw',show_progress=True,
                    delete_cache=True,
                    data_processing=astype_df)
        
    
    Read the scheduler:   0%|                                 | 0/500001 [00:00<?, ?it/s]
    
    Read the scheduler:  76%|█████████████▋    | 380000/500001 [01:02<00:13, 8921.37it/s]
    
    Write db Scheduler:  96%|██████████████████████████▉ | 26/27 [01:13<00:01,  1.10s/it]    
    
    Write db Scheduler: 100%|████████████████████████████| 27/27 [01:13<00:00,  1.16it/s]
    
    'finish'
    
    读表生成 csv
    sql = '''select * from student where SNO<2000000'''
    path = '/home/test.csv'
    to_db = create_engine("mysql+pymysql://root:123456@localhost:3306/aps_2")
    fast_sql.to_csv(sql,con,path_or_buf=path,show_progress=True,index=None)
    
    3 条回复    2020-03-30 15:32:15 +08:00
    liwenbest
        1
    liwenbest  
       2020-03-30 14:30:55 +08:00
    挺好的,我也经常用 python 来做些 ETL 方面的工作,有个问题是 是表类型 oracle-mysql 字段类型不一致 会报错
    wuwukai007
        2
    wuwukai007  
    OP
       2020-03-30 15:06:19 +08:00
    @liwenbest data_processing 这个参数可以在入库前改变 数据类型或者增加减少列等操作,自定义一个函数就好,参数为 DataFrame
    wuwukai007
        3
    wuwukai007  
    OP
       2020-03-30 15:32:15 +08:00
    @liwenbest 现在做不到自动处理类型,就把接口暴露出来自己处理了,如果类型一致就算了。
    关于   ·   帮助文档   ·   博客   ·   API   ·   FAQ   ·   实用小工具   ·   3314 人在线   最高记录 6679   ·     Select Language
    创意工作者们的社区
    World is powered by solitude
    VERSION: 3.9.8.5 · 21ms · UTC 04:29 · PVG 12:29 · LAX 20:29 · JFK 23:29
    Developed with CodeLauncher
    ♥ Do have faith in what you're doing.