16

Flask 偏函数、g对象、flask-session、数据库连接池、信号、自制命令、flask-admin

 4 years ago
source link: http://www.cnblogs.com/guyouyin123/p/12534856.html
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

一、偏函数

当函数的参数个数太多,需要简化时,使用 functools.partial 可以创建一个新的函数,这个新函数可以固定住原函数的部分参数,从而在调用时更简单。

from functools import partial
def func(a1,a2,a3):
    print(a1,a2,a3)


new_func1 = partial(func,a1=1,a2=2)
new_func1(a3=3)  # 额外传值

new_func2 = partial(func,1,2)
new_func2(3)

new_func3 = partial(func,a1=1)
new_func3(a2=2,a3=3)

二、g对象

专门用来存储用户信息的g对象,g的全称的为global

g对象在一次请求中的所有的代码的地方,都是可以使用的

作用:防止和request内部的属性混乱,比如不知道request里面有没有name属性,可以直接g.name设值。只对这个请求设值

from flask import Flask, g

app = Flask(__name__)


@app.before_request
def be():
    g.name = 'jeff'

    print('befor1')


@app.route('/')
def index():
    print(g.name)
    return 'ok'


@app.route('/login')
def login():
    print(g.name)
    return 'ok'


if __name__ == '__main__':
    app.run()

g对象和session的区别

session对象是可以跨request的,只要session还未失效,不同的request的请求会获取到同一个session,但是g对象不是,g对象不需要管过期时间,请求一次就g对象就改变了一次,或者重新赋值了一次

三、flask-session

作用:将默认保存的签名cookie中的值 保存到 redis/memcached/file/Mongodb/SQLAlchemy

安装:pip3 install flask-session

使用1:

from flask import Flask,session
from flask_session import RedisSessionInterface
import redis
app = Flask(__name__)
conn=redis.Redis(host='127.0.0.1',port=6379)
#use_signer是否对key签名
app.session_interface=RedisSessionInterface(conn,key_prefix='lqz')
@app.route('/')
def hello_world():
    session['name']='lqz'
    return 'Hello World!'

if __name__ == '__main__':
    app.run()

使用2:

from redis import Redis
from flask.ext.session import Session
app.config['SESSION_TYPE'] = 'redis'
app.config['SESSION_REDIS'] = Redis(host='192.168.0.94',port='6379')
Session(app)

问题:设置cookie时,如何设定关闭浏览器则cookie失效。

response.set_cookie('k','v',exipre=None)#这样设置即可
#在session中设置
app.session_interface=RedisSessionInterface(conn,key_prefix='lqz',permanent=False)
#一般不用,我们一般都设置超时时间,多长时间后失效

问题:cookie默认超时时间是多少?如何设置超时时间

#源码expires = self.get_expiration_time(app, session)
'PERMANENT_SESSION_LIFETIME':           timedelta(days=31),#这个配置文件控制

四、数据库连接池

pymsql链接数据库

import pymysql

conn = pymysql.connect(host='127.0.0.1', port=3306, user='root', passwd='123456', db='s8day127db')
cursor = conn.cursor(cursor=pymysql.cursors.DictCursor)
# cursor.execute("select id,name from users where name=%s and pwd=%s",['lqz','123',])
cursor.execute("select id,name from users where name=%(user)s and pwd=%(pwd)s",{'user':'lqz','pwd':'123'})
obj = cursor.fetchone()
conn.commit()
cursor.close()
conn.close()

print(obj)

数据库连接池版

setting.py

from datetime import timedelta
from redis import Redis
import pymysql
from DBUtils.PooledDB import PooledDB, SharedDBConnection

class Config(object):
    DEBUG = True
    SECRET_KEY = "umsuldfsdflskjdf"
    PERMANENT_SESSION_LIFETIME = timedelta(minutes=20)
    SESSION_REFRESH_EACH_REQUEST= True
    SESSION_TYPE = "redis"
    PYMYSQL_POOL = PooledDB(
        creator=pymysql,  # 使用链接数据库的模块
        maxconnections=6,  # 连接池允许的最大连接数,0和None表示不限制连接数
        mincached=2,  # 初始化时,链接池中至少创建的空闲的链接,0表示不创建
        maxcached=5,  # 链接池中最多闲置的链接,0和None不限制
        maxshared=3,
        # 链接池中最多共享的链接数量,0和None表示全部共享。PS: 无用,因为pymysql和MySQLdb等模块的 threadsafety都为1,所有值无论设置为多少,_maxcached永远为0,所以永远是所有链接都共享。
        blocking=True,  # 连接池中如果没有可用连接后,是否阻塞等待。True,等待;False,不等待然后报错
        maxusage=None,  # 一个链接最多被重复使用的次数,None表示无限制
        setsession=[],  # 开始会话前执行的命令列表。如:["set datestyle to ...", "set time zone ..."]
        ping=0,
        # ping MySQL服务端,检查是否服务可用。# 如:0 = None = never, 1 = default = whenever it is requested, 2 = when a cursor is created, 4 = when a query is executed, 7 = always
        host='127.0.0.1',
        port=3306,
        user='root',
        password='123456',
        database='s8day127db',
        charset='utf8'
    )

class ProductionConfig(Config):
    SESSION_REDIS = Redis(host='192.168.0.94', port='6379')



class DevelopmentConfig(Config):
    SESSION_REDIS = Redis(host='127.0.0.1', port='6379')


class TestingConfig(Config):
    pass

utils/sql.py

import pymysql
from settings import Config
class SQLHelper(object):

    @staticmethod
    def open(cursor):
        POOL = Config.PYMYSQL_POOL
        conn = POOL.connection()
        cursor = conn.cursor(cursor=cursor)
        return conn,cursor

    @staticmethod
    def close(conn,cursor):
        conn.commit()
        cursor.close()
        conn.close()

    @classmethod
    def fetch_one(cls,sql,args,cursor =pymysql.cursors.DictCursor):
        conn,cursor = cls.open(cursor)
        cursor.execute(sql, args)
        obj = cursor.fetchone()
        cls.close(conn,cursor)
        return obj

    @classmethod
    def fetch_all(cls,sql, args,cursor =pymysql.cursors.DictCursor):
        conn, cursor = cls.open(cursor)
        cursor.execute(sql, args)
        obj = cursor.fetchall()
        cls.close(conn, cursor)
        return obj

五、信号

Flask框架中的信号基于blinker,其主要就是让开发者可是在flask请求过程中定制一些用户行为

安装: pip3 install blinker

内置信号:

request_started = _signals.signal('request-started')                # 请求到来前执行
request_finished = _signals.signal('request-finished')              # 请求结束后执行
 
before_render_template = _signals.signal('before-render-template')  # 模板渲染前执行
template_rendered = _signals.signal('template-rendered')            # 模板渲染后执行
 
got_request_exception = _signals.signal('got-request-exception')    # 请求执行出现异常时执行
 
request_tearing_down = _signals.signal('request-tearing-down')      # 请求执行完毕后自动执行(无论成功与否)
appcontext_tearing_down = _signals.signal('appcontext-tearing-down')# 应用上下文执行完毕后自动执行(无论成功与否)
 
appcontext_pushed = _signals.signal('appcontext-pushed')            # 应用上下文push时执行
appcontext_popped = _signals.signal('appcontext-popped')            # 应用上下文pop时执行
message_flashed = _signals.signal('message-flashed')                # 调用flask在其中添加数据时,自动触发

使用信号:

from flask import Flask,signals,render_template

app = Flask(__name__)

# 往信号中注册函数
def func(*args,**kwargs):
    print('触发型号',args,kwargs)
signals.request_started.connect(func)

# 触发信号: signals.request_started.send()
@app.before_first_request
def before_first1(*args,**kwargs):
    pass
@app.before_first_request
def before_first2(*args,**kwargs):
    pass

@app.before_request
def before_first3(*args,**kwargs):
    pass

@app.route('/',methods=['GET',"POST"])
def index():
    print('视图')
    return render_template('index.html')


if __name__ == '__main__':
    app.wsgi_app
    app.run()

一个流程中的信号触发点(了解)

a. before_first_request
b. 触发 request_started 信号
c. before_request
d. 模板渲染
    渲染前的信号 before_render_template.send(app, template=template, context=context)
        rv = template.render(context) # 模板渲染
    渲染后的信号 template_rendered.send(app, template=template, context=context)
e. after_request
f. session.save_session()
g. 触发 request_finished信号        
    如果上述过程出错:
        触发错误处理信号 got_request_exception.send(self, exception=e)
            
h. 触发信号 request_tearing_down

自定义信号(了解):

from flask import Flask, current_app, flash, render_template
from flask.signals import _signals
app = Flask(import_name=__name__)

# 自定义信号
xxxxx = _signals.signal('xxxxx')
 
def func(sender, *args, **kwargs):
    print(sender)
# 自定义信号中注册函数
xxxxx.connect(func)
@app.route("/x")
def index():
    # 触发信号
    xxxxx.send('123123', k1='v1')
    return 'Index' 
 
if __name__ == '__main__':
    app.run()

六、命令flask-script

用于实现类似于django中 python3 manage.py runserver ...类似的命令

安装:pip3 install flask-script

使用

from flask_script import Manager
app = Flask(__name__)
manager=Manager(app)
...
if __name__ == '__main__':
    manager.run()
#以后在执行,直接:python3 manage.py runserver
#python3 manage.py runserver --help

自定制命令

@manager.command
def custom(arg):
    """
    自定义命令
    python manage.py custom 123
    :param arg:
    :return:
    """
    print(arg)


@manager.option('-n', '--name', dest='name')
#@manager.option('-u', '--url', dest='url')
def cmd(name, url):
    """
    自定义命令(-n也可以写成--name)
    执行: python manage.py  cmd -n lqz -u http://www.oldboyedu.com
    执行: python manage.py  cmd --name lqz --url http://www.oldboyedu.com
    :param name:
    :param url:
    :return:
    """
    print(name, url)
#有什么用?
#把excel的数据导入数据库,定制个命令,去执行

七、flask-admin

安装

pip3 install flask_admin

简单使用

from flask import Flask
from flask_admin import Admin

app = Flask(__name__)
#将app注册到adminzhong 
admin = Admin(app)

if __name__=="mian":

    app.run()
#访问
#127.0.0.1:5000/admin端口,会得到一个空白的页面

将表模型注册到admin中

#在将表注册之前应该对app进行配置
SQLALCHEMY_DATABASE_URI = "mysql+pymysql://root:@127.0.0.1:3307/py9api?charset=utf8mb4"
SQLALCHEMY_POOL_SIZE = 5
SQLALCHEMY_POOL_TIMEOUT = 30
SQLALCHEMY_POOL_RECYCLE = -1

#导入models文件的中的表模型
from flask_admin.contrib.sqla import ModelView
from api.models import Stock,Product,Images,Category,Wxuser,Banner

admin.add_view(ModelView(Stock, db.session))
admin.add_view(ModelView(Product, db.session))

admin.add_view(ModelView(Category, db.session))

如果有个字段是图片指端

#配置上传文件的路径
#导入from flask_admin.contrib.fileadmin import FileAdmin
from flask_admin.contrib.fileadmin import FileAdmin,form
file_path = op.join(op.dirname(__file__), 'static')
admin = Admin(app)
admin.add_view(FileAdmin(file_path, '/static/', name='文件'))

#如果有个字段要是上传文件重写该方法的modleView类,假设imgae_url是文件图片的字段
class ImagesView(ModelView):

    form_extra_fields = {
        'image_url': form.ImageUploadField('Image',
                                          base_path=file_path,
                                          relative_path='uploadFile/'
                                          )
    }

admin.add_view(ImagesView(Images, db.session))

About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK