37

没有契合的数据库迁移工具,用pymysql实现一个

 5 years ago
source link: http://yongqing.is-programmer.com/posts/213948.html?amp%3Butm_medium=referral
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

版本迭代少不了数据迁移,python有自己的数据库迁移工具migrate。如果有的是其它开发语言,或者没有契合的迁移工具。

怎么自己做一个?

环境说明

项目开发语言:java
数据库: mysql
迁移脚本: python
python工具包: pymysql

实现思路

目标是将老版本的数据转为新版本的数据。

1. show tables 查询所有的表  ---得到所有表名
2. 注意trancate table_name 保证新表干净  -- 清环境
3. python在执行sql后可以在游标的属性cursor.description中看到表的字段信息  -- 得到所有字段名、类型
4. 查询数据生成insert语句。版本之间有变化的做特殊处理  --数据映射
5. 每生成500条commit一次到新库   -- 批量commit
6. mysqldump一份数据,在开发环境做好测试,没问题就可以用了。

前方有坑

留意数据精度问题

示例

import pymysql

FROM_DB = dict(
    ip="xxxxxxxxxx",
    username="xxxx",
    password="xxxx",
    db_name= "db1"
)

TO_DB = dict(
    ip="xxxxxxxxxx",
    username="xxxx",
    password="xxxx",
    db_name="db2"
)


class WebDB:
    def __init__(self, ip, username, password, db_name='xxx'):
        self.ip = ip
        self.username = username
        self.password = password
        self.db_name = db_name
        self.conn = None
        self.cursor = None

    def __enter__(self):
        self.conn = pymysql.connect(self.ip, self.username, self.password, self.db_name)
        self.cursor = self.conn.cursor()
        return self

    def __exit__(self, exctype, excvalue, traceback):
        if self.cursor:
            self.cursor.close()

        if self.conn:
            self.conn.close()



class MigrateDB(object):
    def start(self):
        self.clean()
        self.import_data()

    def clean(self):
        with WebDB(**TO_DB)as db:
            db.cursor.execute("show tables;")
            for tables in db.cursor.fetchall():
                table_name = tables[0]
                db.cursor.execute("truncate %s;" % table_name)

    def import_data(self):
        with WebDB(**FROM_DB) as from_db:
            with WebDB(**TO_DB) as to_db:
                #所有表
                from_db.cursor.execute("show tables;")
                for from_tables in from_db.cursor.fetchall():
                    from_table_name = from_tables[0]
                    from_db.cursor.execute("select * from %s;" % from_table_name)
                    items = self.to_dict(from_db.cursor.fetchall(), from_db.cursor.description)
                    print("import table: %s" % from_table_name)
                    to_table_name = from_table_name
                    for i, item in enumerate(items):
                        # 需要映射转换的表,生成专门的sql
                        if from_table_name == "xxxx":
                            to_table_name = "xxxxx"
                            item = self.gen_insert_sql_for_xxx(item)

                        sql = self.gen_insert_sql(to_table_name, item)
                        to_db.cursor.execute(sql, args=item)
                        # 每500条commit一次
                        if i != 0 and i % 500 == 0:
                            to_db.conn.commit()
                    to_db.conn.commit()


    def to_dict(self, rows, description):
        """
        记录转为字典
        :param rows:
        :param description:
        :return:
        """
        for row in rows:
            item = dict()
            for i, field in enumerate(description):
                field_name = field[0]
                item[field_name] = row[i]
            yield item

    def gen_insert_sql(self, table_name, item):
        sql = "insert into %s(%s) values(%s)"
        keys = item.keys()
        key_str = ",".join(keys)
        value_str = ",".join(["%%(%s)s" % k for k, v in item.items()])

        return sql % (table_name, key_str, value_str)


if __name__ == "__main__":
    m = MigrateDB()
    m.start()

来源

没有契合的数据库迁移工具,用pymysql实现一个

此生必看的科学实验-水知道答案

《了凡四训》详解之改过之法

印光大师十念法(胡小林主讲第1集)

精神病为什么治不好

百病之源


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK