1

乾坤大挪移,如何将同步阻塞(sync)三方库包转换为异步非阻塞(async)模式?Python3.10实...

 1 year ago
source link: https://v3u.cn/a_id_267
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

乾坤大挪移,如何将同步阻塞(sync)三方库包转换为异步非阻塞(async)模式?Python3.10实现。

首页 - Python /2022-12-30
乾坤大挪移,如何将同步阻塞(sync)三方库包转换为异步非阻塞(async)模式?Python3.10实现。

    众所周知,异步并发编程可以帮助程序更好地处理阻塞操作,比如网络 IO 操作或文件 IO 操作,避免因等待这些操作完成而导致程序卡住的情况。云存储文件传输场景正好包含网络 IO 操作和文件 IO 操作,比如业内相对著名的七牛云存储,官方sdk的默认阻塞传输模式虽然差强人意,但未免有些循规蹈矩,不够锐意创新。在全球同性交友网站Github上找了一圈,也没有找到异步版本,那么本次我们来自己动手将同步阻塞版本改造为异步非阻塞版本,并上传至Python官方库。

    首先参见七牛云官方接口文档:https://developer.qiniu.com/kodo,新建qiniu_async.py文件:

# @Author:Liu Yue (v3u.cn)
# @Software:Vscode
# @Time:2022/12/30

import base64
import hmac
import time
from hashlib import sha1
import json
import httpx
import aiofiles

class Qiniu:

def __init__(self, access_key, secret_key):
"""初始化"""
self.__checkKey(access_key, secret_key)
self.__access_key = access_key
self.__secret_key = secret_key.encode('utf-8')

def get_access_key(self):
return self.__access_key

def get_secret_key(self):
return self.__secret_key

def __token(self, data):
hashed = hmac.new(self.__secret_key,data.encode('utf-8'), sha1)
return self.urlsafe_base64_encode(hashed.digest())

def token(self, data):
return '{0}:{1}'.format(self.__access_key, self.__token(data))

def token_with_data(self, data):
data = self.urlsafe_base64_encode(data)
return '{0}:{1}:{2}'.format(
self.__access_key, self.__token(data), data)

def urlsafe_base64_encode(self,data):

if isinstance(data, str):
data = data.encode('utf-8')

ret = base64.urlsafe_b64encode(data)

data = ret.decode('utf-8')

return data


@staticmethod
def __checkKey(access_key, secret_key):
if not (access_key and secret_key):
raise ValueError('invalid key')


def upload_token(
self,
bucket,
key=None,
expires=3600,
policy=None,
strict_policy=True):
"""生成上传凭证

Args:
bucket: 上传的空间名
key: 上传的文件名,默认为空
expires: 上传凭证的过期时间,默认为3600s
policy: 上传策略,默认为空

Returns:
上传凭证
"""
if bucket is None or bucket == '':
raise ValueError('invalid bucket name')

scope = bucket
if key is not None:
scope = '{0}:{1}'.format(bucket, key)

args = dict(
scope=scope,
deadline=int(time.time()) + expires,
)

return self.__upload_token(args)

@staticmethod
def up_token_decode(up_token):
up_token_list = up_token.split(':')
ak = up_token_list[0]
sign = base64.urlsafe_b64decode(up_token_list[1])
decode_policy = base64.urlsafe_b64decode(up_token_list[2])
decode_policy = decode_policy.decode('utf-8')
dict_policy = json.loads(decode_policy)
return ak, sign, dict_policy

def __upload_token(self, policy):
data = json.dumps(policy, separators=(',', ':'))
return self.token_with_data(data)


@staticmethod
def __copy_policy(policy, to, strict_policy):
for k, v in policy.items():
if (not strict_policy) or k in _policy_fields:
to[k] = v

    这里有两个很关键的异步非阻塞三方库,分别是httpx和aiofiles,对应处理网络IO和文件IO阻塞问题:

pip3 install httpx
pip3 install aiofiles

    随后按照文档流程通过加密方法获取文件上传token,这里无须进行异步改造,因为并不涉及IO操作:

q = Qiniu(access_key,access_secret)

token = q.upload_token("空间名称")

print(token)

    程序返回:

➜  mydemo git:(master) ✗ /opt/homebrew/bin/python3.10 "/Users/liuyue/wodfan/work/mydemo/src/test.py"
q06bq54Ps5JLfZyP8Ax-qvByMBdu8AoIVJpMco2m:8RjIo9a4CxHM3009DwjbMxDzlU8=:eyJzY29wZSI6ImFkLWgyMTEyIiwiZGVhZGxpbmUiOjE2NzIzNjg2NTd9

    接着添加文件流推送方法,先看官方原版逻辑:

def put_data(
up_token, key, data, params=None, mime_type='application/octet-stream', check_crc=False, progress_handler=None,
fname=None, hostscache_dir=None, metadata=None):
"""上传二进制流到七牛
Args:
up_token: 上传凭证
key: 上传文件名
data: 上传二进制流
params: 自定义变量,规格参考 https://developer.qiniu.com/kodo/manual/vars#xvar
mime_type: 上传数据的mimeType
check_crc: 是否校验crc32
progress_handler: 上传进度
hostscache_dir: host请求 缓存文件保存位置
metadata: 元数据
Returns:
一个dict变量,类似 {"hash": "<Hash string>", "key": "<Key string>"}
一个ResponseInfo对象
"""
final_data = b''
if hasattr(data, 'read'):
while True:
tmp_data = data.read(config._BLOCK_SIZE)
if len(tmp_data) == 0:
break
else:
final_data += tmp_data
else:
final_data = data

crc = crc32(final_data)
return _form_put(up_token, key, final_data, params, mime_type,
crc, hostscache_dir, progress_handler, fname, metadata=metadata)

def _form_put(up_token, key, data, params, mime_type, crc, hostscache_dir=None, progress_handler=None, file_name=None,
modify_time=None, keep_last_modified=False, metadata=None):
fields = {}
if params:
for k, v in params.items():
fields[k] = str(v)
if crc:
fields['crc32'] = crc
if key is not None:
fields['key'] = key

fields['token'] = up_token
if config.get_default('default_zone').up_host:
url = config.get_default('default_zone').up_host
else:
url = config.get_default('default_zone').get_up_host_by_token(up_token, hostscache_dir)
# name = key if key else file_name

fname = file_name
if not fname or not fname.strip():
fname = 'file_name'

# last modify time
if modify_time and keep_last_modified:
fields['x-qn-meta-!Last-Modified'] = rfc_from_timestamp(modify_time)

if metadata:
for k, v in metadata.items():
if k.startswith('x-qn-meta-'):
fields[k] = str(v)

r, info = http._post_file(url, data=fields, files={'file': (fname, data, mime_type)})
if r is None and info.need_retry():
if info.connect_failed:
if config.get_default('default_zone').up_host_backup:
url = config.get_default('default_zone').up_host_backup
else:
url = config.get_default('default_zone').get_up_host_backup_by_token(up_token, hostscache_dir)
if hasattr(data, 'read') is False:
pass
elif hasattr(data, 'seek') and (not hasattr(data, 'seekable') or data.seekable()):
data.seek(0)
else:
return r, info
r, info = http._post_file(url, data=fields, files={'file': (fname, data, mime_type)})

return r, info

    这里官方使用两个方法,先试用put_data方法将字符串转换为二进制文件流,随后调用_form_put进行同步上传操作,这里_form_put这个私有方法是可复用的,既兼容文件流也兼容文件实体,写法上非常值得我们借鉴,弄明白了官方原版的流程后,让我们撰写文件流传输的异步版本:

# 上传文件流
async def upload_data(self,up_token, key,data,url="http://up-z1.qiniup.com",params=None,mime_type='application/octet-stream',file_name=None,metadata=None):

data.encode('utf-8')

fields = {}
if params:
for k, v in params.items():
fields[k] = str(v)

if key is not None:
fields['key'] = key
fields['token'] = up_token

fname = file_name
if not fname or not fname.strip():
fname = 'file_name'

async with httpx.AsyncClient() as client:

# 调用异步使用await关键字
res = await client.post(url,data=fields,files={'file': (fname,data,mime_type)})

print(res.text)

    这里我们声明异步方法upload_data,通过encode直接转换文件流,并使用异步httpx.AsyncClient()对象将文件流推送到官网接口地址:up-z1.qiniup.com  

    随后进行测试:

import asyncio
q = qiniu_async.Qiniu("accesskey","accesssecret")

token = q.upload_token("空间名称")

#文件流上传
asyncio.run(q.upload_data(token,"3343.txt","123测试"))

    程序返回:

➜  mydemo git:(master) ✗ /opt/homebrew/bin/python3.10 "/Users/liuyue/wodfan/work/mydemo/src/test.py"
{"hash":"FtnQXAXft5AsOH1mrmXGaRzSt-95","key":"33434.txt"}

    接口会返回文件流的hash编码,没有问题。

    接着查看文件上传流程:

def put_file(up_token, key, file_path, params=None,
mime_type='application/octet-stream', check_crc=False,
progress_handler=None, upload_progress_recorder=None, keep_last_modified=False, hostscache_dir=None,
part_size=None, version=None, bucket_name=None, metadata=None):
"""上传文件到七牛
Args:
up_token: 上传凭证
key: 上传文件名
file_path: 上传文件的路径
params: 自定义变量,规格参考 https://developer.qiniu.com/kodo/manual/vars#xvar
mime_type: 上传数据的mimeType
check_crc: 是否校验crc32
progress_handler: 上传进度
upload_progress_recorder: 记录上传进度,用于断点续传
hostscache_dir: host请求 缓存文件保存位置
version: 分片上传版本 目前支持v1/v2版本 默认v1
part_size: 分片上传v2必传字段 默认大小为4MB 分片大小范围为1 MB - 1 GB
bucket_name: 分片上传v2字段 空间名称
metadata: 元数据信息
Returns:
一个dict变量,类似 {"hash": "<Hash string>", "key": "<Key string>"}
一个ResponseInfo对象
"""
ret = {}
size = os.stat(file_path).st_size
with open(file_path, 'rb') as input_stream:
file_name = os.path.basename(file_path)
modify_time = int(os.path.getmtime(file_path))
if size > config.get_default('default_upload_threshold'):
ret, info = put_stream(up_token, key, input_stream, file_name, size, hostscache_dir, params,
mime_type, progress_handler,
upload_progress_recorder=upload_progress_recorder,
modify_time=modify_time, keep_last_modified=keep_last_modified,
part_size=part_size, version=version, bucket_name=bucket_name, metadata=metadata)
else:
crc = file_crc32(file_path)
ret, info = _form_put(up_token, key, input_stream, params, mime_type,
crc, hostscache_dir, progress_handler, file_name,
modify_time=modify_time, keep_last_modified=keep_last_modified, metadata=metadata)
return ret, info

    这里官方使用的是标准库上下文管理器同步读取文件,改写为异步方法:

# 上传文件实体
async def upload_file(self,up_token,key,path,url="http://up-z1.qiniup.com",params=None,mime_type='application/octet-stream',file_name=None,metadata=None):


async with aiofiles.open(path, mode='rb') as f:
contents = await f.read()

fields = {}
if params:
for k, v in params.items():
fields[k] = str(v)

if key is not None:
fields['key'] = key
fields['token'] = up_token

fname = file_name
if not fname or not fname.strip():
fname = 'file_name'

async with httpx.AsyncClient() as client:

# 调用异步使用await关键字
res = await client.post(url,data=fields,files={'file': (fname,contents,mime_type)})

print(res.text)

    通过aiofiles异步读取文件后,在通过httpx.AsyncClient()进行异步传输。

    需要注意的是,这里默认传输到up-z1.qiniup.com接口,如果是不同区域的云存储服务器,需要更改url参数的值,具体服务器接口列表请参照官网文档。

    至此,文件流和文件异步传输就改造好了。

  上传至Python官方库

    为了方便广大七牛云用户使用异步传输版本库,可以将qiniu-async上传到Python官方库,首先注册成为Python官方库的开发者:pypi.org/

    随后在项目根目录下新建setup.py文件:

import setuptools
import pathlib

here = pathlib.Path(__file__).parent.resolve()
long_description = (here / "README.md").read_text(encoding="utf-8")

setuptools.setup(
name="qiniu-async",
version="1.0.1",
author="LiuYue",
author_email="[email protected]",
description="qiniu_async python library",
long_description=long_description,
long_description_content_type="text/markdown",
url="https://github.com/qiniu-async",
packages=setuptools.find_packages(),
license="Apache 2.0",
classifiers=[
"Development Status :: 3 - Alpha",
"Intended Audience :: Developers",
"Programming Language :: Python :: 3",
"Programming Language :: Python :: 3.7",
"Programming Language :: Python :: 3.8",
"Programming Language :: Python :: 3.9",
"Programming Language :: Python :: 3.10",
"Programming Language :: Python :: 3 :: Only",
"License :: OSI Approved :: MIT License",
"Operating System :: OS Independent",

],
keywords="qiniu, qiniu_async, async",
py_modules=[
'qiniu_async'
],
install_requires=["aiofiles","httpx"],
)

    这是安装文件,主要为了声明该模块的名称、作者、版本以及依赖库。

    随后本地打包文件:

python3 setup.py sdist

    程序会根据setup.py文件生成压缩包:

➜  qiniu_async tree
.
├── README.md
├── dist
│   └── qiniu-async-1.0.1.tar.gz
├── https:
│   └── github.com
│   └── zcxey2911
│   └── qiniu-async.git
├── qiniu_async.egg-info
│   ├── PKG-INFO
│   ├── SOURCES.txt
│   ├── dependency_links.txt
│   ├── requires.txt
│   └── top_level.txt
├── qiniu_async.py
└── setup.py

    接着安装twine库, 准备提交Python官网:

pip3 install twine

    随后在根目录运行命令提交:

twine upload dist/*

    在官网进行查看:https://pypi.org/project/qiniu-async/

    随后本地就可以直接通过pip命令句进行安装了:

pip install qiniu-async -i https://pypi.org/simple

    非常方便。

    云端存储,异步加持,猛虎添翼,未敢拥钵独飨,除了通过pip安装qiniu-async库,也奉上Github项目地址:https://github.com/zcxey2911/qiniu-async  ,与众乡亲同飨。


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK