

造个Python轮子,实现根据Excel生成Model和数据导入脚本 - 程序设计实验室
source link: https://www.cnblogs.com/deali/p/16895215.html
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

造个Python轮子,实现根据Excel生成Model和数据导入脚本 - 程序设计实验室 - 博客园
最近遇到一个需求,有几十个Excel,每个的字段都不一样,然后都差不多是第一行是表头,后面几千上万的数据,需要把这些Excel中的数据全都加入某个已经上线的Django项目
这就需要每个Excel建个表,然后一个个导入了
这样的效率太低,不能忍
所以我造了个自动生成 Model 和导入脚本的轮子
首先拿出 pandas,它的 DataFrame 用来处理数据很方便
pandas 加载 Excel 之后,提取表头,我们要通过表头来生成数据表的字段。有些 Excel 的表头是中文的,需要先做个转换。
一开始我是想用翻译API,全都翻译成英文,不过发现免费的很慢有限额,微软、DeepL都要申请,很麻烦。索性用个拼音转换库,全都转换成拼音得了~
然后字段的长度也要确定,或者全部用不限制长度的 TextField
权衡一下,我还是做一下字段长度判定的逻辑,遍历整个表,找出各个字段最长的数据,然后再加一个偏移量,作为最大长度。
接着生成 Model 类,这里我用 jinja2 模板语言,先把大概的模板写好,然后根据提取出来的字段名啥的生成。
最后生成 admin 配置和导入脚本,同理,也是用 jinja2 模板。
简单介绍下思路,现在开始上代码。
就几行而已,Python很省代码~
首先定义俩模型
class Field(object):
def __init__(self, name: str, verbose_name: str, max_length: int = 128):
self.name = name
self.verbose_name = verbose_name
self.max_length = max_length
def __str__(self):
return f'<Field>{self.name}:{self.verbose_name}'
def __repr__(self):
return self.__str__()
Model模型
为了符合Python关于变量的命名规范,snake_name
属性是用正则表达式实现驼峰命名转蛇形命名
class Model(object):
def __init__(self, name: str, verbose_name: str, id_field: Field, fields: List[Field]):
self.name = name
self.verbose_name = verbose_name
self.id_field = id_field
self.fields: List[Field] = fields
@property
def snake_name(self):
import re
pattern = re.compile(r'(?<!^)(?=[A-Z])')
name = pattern.sub('_', self.name).lower()
return name
def __str__(self):
return f'<Model>{self.name}:{self.verbose_name}'
def __repr__(self):
return self.__str__()
使用 jinja2 实现。
本身 jinja2 是 Flask、Django 之类的框架用来渲染网页的。
不过单独使用的效果也不错,我的 DjangoStarter 框架也是用这个 jinja2 来自动生成 CRUD 代码~
Model模板
# -*- coding:utf-8 -*-
from django.db import models
class {{ model.name }}(models.Model):
"""{{ model.verbose_name }}"""
{% for field in model.fields -%}
{{ field.name }} = models.CharField('{{ field.verbose_name }}', default='', null=True, blank=True, max_length={{ field.max_length }})
{% endfor %}
class Meta:
db_table = '{{ model.snake_name }}'
verbose_name = '{{ model.verbose_name }}'
verbose_name_plural = verbose_name
Admin配置模板
@admin.register({{ model.name }})
class {{ model.name }}Admin(admin.ModelAdmin):
list_display = [{% for field in model.fields %}'{{ field.name }}', {% endfor %}]
list_display_links = None
def has_add_permission(self, request):
return False
def has_delete_permission(self, request, obj=None):
return False
def has_view_permission(self, request, obj=None):
return False
数据导入脚本
这里做了几件事:
- 使用 pandas 处理空值,填充空字符串
- 已有数据进行批量更新
- 新数据批量插入
更新逻辑麻烦一点,因为数据库一般都有每次最大更新数量的限制,所以我做了分批处理,通过 update_data_once_max_lines
控制每次最多同时更新多少条数据。
def import_{{ model.snake_name }}():
file_path = path_proc(r'{{ excel_filepath }}')
logger.info(f'读取文件: {file_path}')
xlsx = pd.ExcelFile(file_path)
df = pd.read_excel(xlsx, 0, header={{ excel_header }})
df.fillna('', inplace=True)
logger.info('开始处理数据')
id_field_list = {{ model.name }}.objects.values_list('{{ model.id_field.name }}', flat=True)
item_list = list({{ model.name }}.objects.all())
def get_item(id_value):
for i in item_list:
if i.shen_qing_ren_zheng_jian_hao_ma == id_value:
return i
return None
insert_data = []
update_data_once_max_lines = 100
update_data_sub_set_index = 0
update_data = [[]]
update_fields = set()
for index, row in df.iterrows():
if '{{ model.id_field.verbose_name }}' not in row:
logger.error('id_field {} is not existed'.format('{{ model.id_field.verbose_name }}'))
continue
if row['{{ model.id_field.verbose_name }}'] in id_field_list:
item = get_item(row['{{ model.id_field.verbose_name }}'])
{% for field in model.fields -%}
if '{{ field.verbose_name }}' in row:
if item.{{ field.name }} != row['{{ field.verbose_name }}']:
item.{{ field.name }} = row['{{ field.verbose_name }}']
update_fields.add('{{ field.name }}')
{% endfor %}
if len(update_data[update_data_sub_set_index]) >= update_data_once_max_lines:
update_data_sub_set_index += 1
update_data.append([])
update_data[update_data_sub_set_index].append(item)
else:
# {% for field in model.fields -%}{{ field.verbose_name }},{%- endfor %}
model_obj = {{ model.name }}()
{% for field in model.fields -%}
if '{{ field.verbose_name }}' in row:
model_obj.{{ field.name }} = row['{{ field.verbose_name }}']
{% endfor %}
insert_data.append(model_obj)
logger.info('开始批量导入')
{{ model.name }}.objects.bulk_create(insert_data)
logger.info('导入完成')
if len(update_data[update_data_sub_set_index]) > 0:
logger.info('开始批量更新')
for index, update_sub in enumerate(update_data):
logger.info(f'正在更新 {index * update_data_once_max_lines}-{(index + 1) * update_data_once_max_lines} 条数据')
{{ model.name }}.objects.bulk_update(update_sub, list(update_fields))
logger.info('更新完成')
剩下的全是核心代码了
先把用到的库导入
import os
import re
from typing import List, Optional
from pypinyin import pinyin, lazy_pinyin, Style
from jinja2 import Environment, PackageLoader, FileSystemLoader
或者后面直接去我的完整代码里面拿也行~
老规矩,我封装了一个类。
构造方法需要指定 Excel 文件地址,还有表头的行索引。
class ExcelToModel(object):
def __init__(self, filepath, header_index=0):
self.filepath = filepath
self.header_index = header_index
self.columns = []
self.fields: List[Field] = []
self.base_dir = os.path.dirname(os.path.abspath(__file__))
self.template_path = os.path.join(self.base_dir, 'templates')
self.jinja2_env = Environment(loader=FileSystemLoader(self.template_path))
self.load_file()
这里面有个 self.load_file()
后面再贴。
字段名中文转拼音
用了 pypinyin
这个库,感觉还不错。
转换后用正则表达式,去除符号,只保留英文和数字。
代码如下,也是放在 ExcelToModel
类里边。
@staticmethod
def to_pinyin(text: str) -> str:
pattern = r'~`!#$%^&*()_+-=|\';"":/.,?><~·!@#¥%……&*()——+-=“:’;、。,?》{《}】【\n\]\[ '
text = re.sub(r"[%s]+" % pattern, "", text)
return '_'.join(lazy_pinyin(text, style=Style.NORMAL))
拿出万能的 pandas,按照前面说的思路,提取表头转换成字段,并且遍历数据确定每个字段的最大长度,我这里偏移值是32,即在当前数据最大长度基础上加上32个字符。
def load_file(self):
import pandas as pd
xlsx = pd.ExcelFile(self.filepath)
df = pd.read_excel(xlsx, 0, header=self.header_index)
df.fillna('', inplace=True)
self.columns = list(df.columns)
for col in self.columns:
field = Field(self.to_pinyin(col), col)
self.fields.append(field)
for index, row in df.iterrows():
item_len = len(str(row[col]))
if item_len > field.max_length:
field.max_length = item_len + 32
print(field.verbose_name, field.name, field.max_length)
如果觉得这样生成表太慢,可以把确定最大长度的这块代码去掉,就下面这块代码
for index, row in df.iterrows():
item_len = len(str(row[col]))
if item_len > field.max_length:
field.max_length = item_len + 32
手动指定最大长度或者换成不限制长度的 TextField
就行。
先构造个 context 然后直接用 jinja2 的 render
功能生成代码。
为了在导入时判断数据存不存在,生成代码时要指定 id_field_verbose_name
,即Excel文件中类似“证件号码”、“编号”之类的列名,注意是Excel中的表头列名。
def find_field_by_verbose_name(self, verbose_name) -> Optional[Field]:
for field in self.fields:
if field.verbose_name == verbose_name:
return field
return None
def generate_file(self, model_name: str, verbose_name: str, id_field_verbose_name: str, output_filepath: str):
template = self.jinja2_env.get_template('output.jinja2')
context = {
'model': Model(
model_name, verbose_name,
self.find_field_by_verbose_name(id_field_verbose_name),
self.fields
),
'excel_filepath': self.filepath,
'excel_header': self.header_index,
}
with open(output_filepath, 'w+', encoding='utf-8') as f:
render_result = template.render(context)
f.write(render_result)
tool = ExcelToModel('file.xlsx')
tool.generate_file('CitizenFertility', '房价与居民生育率', '证件号码', 'output/citizen_fertility.py')
生成出来的代码都在一个文件里,请根据实际情况放到项目的各个位置。
发布到Github了
地址: https://github.com/Deali-Axy/excel_to_model
目前看来完美契合需求,极大节省工作量~
实际跑起来,不得不吐槽 Python 羸弱的性能,占内存还大… 凑合着用吧。也许后面有时间会优化一下~
__EOF__
Recommend
About Joyk
Aggregate valuable and interesting links.
Joyk means Joy of geeK