37
没有契合的数据库迁移工具,用pymysql实现一个
source link: http://yongqing.is-programmer.com/posts/213948.html?amp%3Butm_medium=referral
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
版本迭代少不了数据迁移,python有自己的数据库迁移工具migrate。如果有的是其它开发语言,或者没有契合的迁移工具。
怎么自己做一个?
环境说明
项目开发语言:java 数据库: mysql 迁移脚本: python python工具包: pymysql
实现思路
目标是将老版本的数据转为新版本的数据。
1. show tables 查询所有的表 ---得到所有表名 2. 注意trancate table_name 保证新表干净 -- 清环境 3. python在执行sql后可以在游标的属性cursor.description中看到表的字段信息 -- 得到所有字段名、类型 4. 查询数据生成insert语句。版本之间有变化的做特殊处理 --数据映射 5. 每生成500条commit一次到新库 -- 批量commit 6. mysqldump一份数据,在开发环境做好测试,没问题就可以用了。
前方有坑
留意数据精度问题
示例
import pymysql FROM_DB = dict( ip="xxxxxxxxxx", username="xxxx", password="xxxx", db_name= "db1" ) TO_DB = dict( ip="xxxxxxxxxx", username="xxxx", password="xxxx", db_name="db2" ) class WebDB: def __init__(self, ip, username, password, db_name='xxx'): self.ip = ip self.username = username self.password = password self.db_name = db_name self.conn = None self.cursor = None def __enter__(self): self.conn = pymysql.connect(self.ip, self.username, self.password, self.db_name) self.cursor = self.conn.cursor() return self def __exit__(self, exctype, excvalue, traceback): if self.cursor: self.cursor.close() if self.conn: self.conn.close() class MigrateDB(object): def start(self): self.clean() self.import_data() def clean(self): with WebDB(**TO_DB)as db: db.cursor.execute("show tables;") for tables in db.cursor.fetchall(): table_name = tables[0] db.cursor.execute("truncate %s;" % table_name) def import_data(self): with WebDB(**FROM_DB) as from_db: with WebDB(**TO_DB) as to_db: #所有表 from_db.cursor.execute("show tables;") for from_tables in from_db.cursor.fetchall(): from_table_name = from_tables[0] from_db.cursor.execute("select * from %s;" % from_table_name) items = self.to_dict(from_db.cursor.fetchall(), from_db.cursor.description) print("import table: %s" % from_table_name) to_table_name = from_table_name for i, item in enumerate(items): # 需要映射转换的表,生成专门的sql if from_table_name == "xxxx": to_table_name = "xxxxx" item = self.gen_insert_sql_for_xxx(item) sql = self.gen_insert_sql(to_table_name, item) to_db.cursor.execute(sql, args=item) # 每500条commit一次 if i != 0 and i % 500 == 0: to_db.conn.commit() to_db.conn.commit() def to_dict(self, rows, description): """ 记录转为字典 :param rows: :param description: :return: """ for row in rows: item = dict() for i, field in enumerate(description): field_name = field[0] item[field_name] = row[i] yield item def gen_insert_sql(self, table_name, item): sql = "insert into %s(%s) values(%s)" keys = item.keys() key_str = ",".join(keys) value_str = ",".join(["%%(%s)s" % k for k, v in item.items()]) return sql % (table_name, key_str, value_str) if __name__ == "__main__": m = MigrateDB() m.start()
来源
Recommend
About Joyk
Aggregate valuable and interesting links.
Joyk means Joy of geeK