2
物联网云平台功能描述-3-28云平台课堂笔记
source link: https://blog.51cto.com/youyeye/10245217
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
安装requests库
C:\Users\admin>pip install requests
Requirement already satisfied: requests in d:\python\python37\lib\site-packages (2.31.0)
Requirement already satisfied: charset-normalizer<4,>=2 in d:\python\python37\lib\site-packages (from requests) (3.3.2)
Requirement already satisfied: idna<4,>=2.5 in d:\python\python37\lib\site-packages (from requests) (3.6)
Requirement already satisfied: urllib3<3,>=1.21.1 in d:\python\python37\lib\site-packages (from requests) (2.0.7)
Requirement already satisfied: certifi>=2017.4.17 in d:\python\python37\lib\site-packages (from requests) (2024.2.2)
说明已经安装完成。但是还是import不星了在vscode中。
查看vscode运行的python编译器路径:C:/Users/admin/AppData/Local/Programs/Python/Python310/python.exe
找到该编译器的script文件夹,进入cmd界面,在输入命令pip install requests。即可安装库。
import requests
url='http://httpbin.org/post'
data={
"username":"admin123",
"password":"p@ssw3rd"
}
header={
"token":"token_123456"
}
param={
"device":"device_123456",
"id":"id_123456"
}
response=requests.post(url,json=data,headers=header,params=param)
print(response.text)
{
"args": {
"device": "device_123456",
"id": "id_123456"
},
"data": "{\"username\": \"admin123\", \"password\": \"p@ssw3rd\"}",
"files": {},
"form": {},
"headers": {
"Accept": "*/*",
"Accept-Encoding": "gzip, deflate",
"Content-Length": "48",
"Content-Type": "application/json",
"Host": "httpbin.org",
"Token": "token_123456",
"User-Agent": "python-requests/2.31.0",
"X-Amzn-Trace-Id": "Root=1-66051453-00abb18710cfb5336839906c"
},
"json": {
"password": "p@ssw3rd",
"username": "admin123"
},
"origin": "121.15.167.161",
"url": "http://httpbin.org/post?device=device_123456&id=id_123456"
}
# -------- token_Compute_Cloud_IOT_tokenCompute.py
# -*- coding: utf-8 -*-
"""
Created on Thu Mar 21 18:14:59 2024
@author: lys
"""
import base64
import hmac
import time
from urllib.parse import quote
def token(access_key : str, method : str = 'shai', \
product_id : str = None, user_id : str = None, device_name : str = 'device_Name_numbers',\
et_time : int = 3600) -> str:
if user_id:
# 通过用户ID访问应用API
version = '2022-05-01'
res = 'userid/{}'.format(user_id)
else:
version = '2018-10-31'
# 通过产品ID产品API
res = 'products/{}'.format(product_id)
if device_name:
# 通过设备名称访问设备API
res = res + '/devices/{}'.format(device_name)
# print(type(res))
# print(res)
if res is None:
print("res:None")
return
# 签名方法,(md5、sha1、sha256)
if method not in 'md5 sha1 sha256':
print('method Error')
return
# et时间戳,token过期时间:3600s
# 为什么要加10分钟呢?因为et是token过期时间所以不可能是获取当前时间戳,所以要加时间,当然这个时间可自由更改
# et = str(int(time.time()) + 3600)
et = str(int(time.time()) + et_time)
# 签名字符串 sign (字符串拼接)
sign_str = et + '\n' + method + '\n' + res + '\n' + version
msg = sign_str.encode('utf-8') # 将签名字符串转为utf-8编码(方便后面进行base64编码)
# 对access_key进行base64编码(base64 decode)
key = base64.b64decode(access_key)
sign_base64 = hmac.new(key = key,msg = msg , digestmod = method).digest()
sign = base64.b64encode(sign_base64).decode('utf-8') # 将转好的base64编码再转为utf-8编码
sign = quote(sign, safe='') # url encode
res = quote(res, safe='') # url encode
# token 参数拼接
token = 'version={}&res={}&et={}&method={}&sign={}'.format(version,res,et,method,sign)
# 函数返回值 -> token [str类型]
return token
if __name__ == '__main__':
'''
用户ID:345827
产品ID:WZom3X5NQR
设备名称:device—1
密钥:1GDrclQe0DJDJYP31tg70OS4zPmpD47Nc9MruvYtplL93EOjGxlbKd1vIIw==
'''
# 用户ID
User_id = '345827'
# 产品ID
pid = 'WZom3X5NQR'
# 设备名称
device_name = input('请输入设备名称:')
# 密钥
Access_key = '1GDrclQe0DJDJYtg70OS4zPmpD47Nc9MruvYtplL93EOjGxlbKd1vIIw=='
# token过期时间
et_Time = eval(input('请输入token的过期时间(单位为秒数,推荐3600):'))
# 加密算法
Method = input('请输入token的加密方法(md5、sha1、sha256)')
# 使用md5,计算生产的token
token = token(access_key=Access_key,method=Method,user_id=User_id,et_time = et_Time)
print('下面是{}算法生成的token: \n'.format(Method) + token)
# ---- api.py
import requests
import token_Compute_Cloud_IOT_tokenCompute as net
'''
url='http://httpbin.org/post'
data={
"username":"admin123",
"password":"p@ssw3rd"
}
header={
"token":"token_123456"
}
param={
"device":"device_123456",
"id":"id_123456"
}
response=requests.post(url,json=data,headers=header,params=param)
with open('req_file.txt', 'w+') as f: # 覆盖写模式w+
f.write( '\nResponse: \n' + str(response))
f.write( '\n\ncotent: \n' + str(response.content)) # response.content 是字节对象(二进制内容)(文件)
f.write( '\n\ntext: \n' + response.text) # response.text 是字符串
print(response.text)
'''
# 用户ID
User_id = '345827'
# 产品ID
pid = 'WZom3X5NQR'
# 设备名称
device_name = input('请输入设备名称:')
# 密钥
Access_key = '1GDrclQe0DJDJYP3qMourewa1Lnv1tg70OS4zPmpD47Nc9MruvYtplL93EOjGxlbKd1vIIw=='
# token过期时间
et_Time = eval(input('请输入token的过期时间(单位为秒数,推荐3600):'))
# 加密算法
Method = input('请输入token的加密方法(md5、sha1、sha256)')
net_token = net.token(access_key=Access_key,method=Method,user_id=User_id,et_time = et_Time)
print('下面是{}算法生成的token: \n'.format(Method) + net_token)
# 主机地址
host = 'https://iot-api.heclouds.com'
# 查询设备详情
apiurl = '/device/detail'
# header参数
headers = {
'Authorization' : net_token, # 身份认证-计算后的token
'Content-type' : 'application/json' # 设置报文类型为json格式
}
# querys请求查询参数
querys = {
'product_id' : pid,
'device_name' : device_name
}
# 报文内容
body = {}
# 向服务器提交请求,获取内容
res = requests.get(url=host+apiurl , headers=headers , params=querys)
# 传入目标主机网址,请求头参数,要查询的参数(集合)
# 打印请求url
print('\n MyUrl: ' + str(res.url) + '\n')
# 打印响应报文
print('\n response.text: \n' + str(res.text) + '\n')
只用改三个地方:
- request的post方法
- post方法里面的params参数去掉,使用json参数。
- 修改apiurl字符串网址为:'/device/create'
params是查询参数,json参数是提交参数。
import requests
import token_Compute_Cloud_IOT_tokenCompute as net
'''
url='http://httpbin.org/post'
data={
"username":"admin123",
"password":"p@ssw3rd"
}
header={
"token":"token_123456"
}
param={
"device":"device_123456",
"id":"id_123456"
}
response=requests.post(url,json=data,headers=header,params=param)
with open('req_file.txt', 'w+') as f: # 覆盖写模式w+
f.write( '\nResponse: \n' + str(response))
f.write( '\n\ncotent: \n' + str(response.content)) # response.content 是字节对象(二进制内容)(文件)
f.write( '\n\ntext: \n' + response.text) # response.text 是字符串
print(response.text)
'''
# 用户ID
User_id = '345827'
# 产品ID
pid = 'WZom3X5NQR'
# 设备名称
device_name = input('请输入设备名称:')
# 密钥
Access_key = '1GDrclQe0DJDJYP3h1Lnv1tg70OS4zPmpD47Nc9MruvYtplL93EOjGxlbKd1vIIw=='
# token过期时间
et_Time = eval(input('请输入token的过期时间(单位为秒数,推荐3600):'))
# 加密算法
Method = input('请输入token的加密方法(md5、sha1、sha256)')
net_token = net.token(access_key=Access_key,method=Method,user_id=User_id,et_time = et_Time)
print('下面是{}算法生成的token: \n'.format(Method) + net_token)
# 主机地址
host = 'https://iot-api.heclouds.com'
# 查询设备详情
apiurl = '/device/create'
# header参数
headers = {
'Authorization' : net_token, # 身份认证-计算后的token
'Content-type' : 'application/json' , # 设置报文类型为json格式
'Accept': 'application/json, text/plain, */*'
}
# bodys提交参数
bodys = {
'product_id' : pid,
'device_name' : device_name
}
# 报文内容
body = {}
# 向服务器提交请求,获取内容
res = requests.post(url=host+apiurl , headers=headers , json=bodys)
# 传入目标主机网址,请求头参数,要查询的参数(集合)
# 打印请求url
print('\n MyUrl: ' + str(res.url) + '\n')
# 打印响应报文
print('\n response.text: \n' + str(res.text) + '\n')
更改三处地方:
import requests
import token_Compute_Cloud_IOT_tokenCompute as net
'''
url='http://httpbin.org/post'
data={
"username":"admin123",
"password":"p@ssw3rd"
}
header={
"token":"token_123456"
}
param={
"device":"device_123456",
"id":"id_123456"
}
response=requests.post(url,json=data,headers=header,params=param)
with open('req_file.txt', 'w+') as f: # 覆盖写模式w+
f.write( '\nResponse: \n' + str(response))
f.write( '\n\ncotent: \n' + str(response.content)) # response.content 是字节对象(二进制内容)(文件)
f.write( '\n\ntext: \n' + response.text) # response.text 是字符串
print(response.text)
'''
# 用户ID
User_id = '345827'
# 产品ID
pid = 'WZom3X5NQR'
# 设备名称
device_name = input('请输入设备名称:')
# 密钥
Access_key = '1GDrclQe0DJDJYP3hXoSm3nv1tg70OS4zPmpD47Nc9MruvYtplL93EOjGxlbKd1vIIw=='
# token过期时间
et_Time = eval(input('请输入token的过期时间(单位为秒数,推荐3600):'))
# 加密算法
Method = input('请输入token的加密方法(md5、sha1、sha256)')
net_token = net.token(access_key=Access_key,method=Method,user_id=User_id,et_time = et_Time)
print('下面是{}算法生成的token: \n'.format(Method) + net_token)
# 主机地址
host = 'https://iot-api.heclouds.com'
# 查询设备详情
apiurl = '/datapoint/history-datapoints'
# header参数
headers = {
'Authorization' : net_token, # 身份认证-计算后的token
'Content-type' : 'application/json' , # 设置报文类型为json格式
'Accept': 'application/json, text/plain, */*'
}
# bodys提交参数
bodys = {
'product_id' : pid,
'device_name' : device_name
}
# 报文内容
body = {}
# 向服务器提交请求,获取内容
res = requests.get(url=host+apiurl , headers=headers , params=bodys)
# 传入目标主机网址,请求头参数,要查询的参数(集合)
# 打印请求url
print('\n MyUrl: ' + str(res.url) + '\n')
# 打印响应报文
print('\n response.text: \n' + str(res.text) + '\n')
import requests
import token_Compute_Cloud_IOT_tokenCompute as net
'''
url='http://httpbin.org/post'
data={
"username":"admin123",
"password":"p@ssw3rd"
}
header={
"token":"token_123456"
}
param={
"device":"device_123456",
"id":"id_123456"
}
response=requests.post(url,json=data,headers=header,params=param)
with open('req_file.txt', 'w+') as f: # 覆盖写模式w+
f.write( '\nResponse: \n' + str(response))
f.write( '\n\ncotent: \n' + str(response.content)) # response.content 是字节对象(二进制内容)(文件)
f.write( '\n\ntext: \n' + response.text) # response.text 是字符串
print(response.text)
'''
# 用户ID
User_id = '345827'
# 产品ID
pid = 'WZom3X5NQR'
# 设备名称
device_name = input('请输入设备名称:')
# 密钥
Access_key = '1GDrclQe0DJDJYPzPmpD47Nc9MruvYtplL93EOjGxlbKd1vIIw=='
# token过期时间
et_Time = eval(input('请输入token的过期时间(单位为秒数,推荐3600):'))
# 加密算法
Method = input('请输入token的加密方法(md5、sha1、sha256)')
net_token = net.token(access_key=Access_key,method=Method,user_id=User_id,et_time = et_Time)
print('下面是{}算法生成的token: \n'.format(Method) + net_token)
# 主机地址
host = 'https://iot-api.heclouds.com'
# 查询设备详情
apiurl = '/device/update'
# header参数
headers = {
'Authorization' : net_token, # 身份认证-计算后的token
'Content-type' : 'application/json' , # 设置报文类型为json格式
'Accept': 'application/json, text/plain, */*'
}
# bodys提交参数
bodys = {
'product_id' : pid,
'device_name' : device_name,
"desc": "深职大测试",
"lat": "22.5",
"imsi": "113.9"
}
# 报文内容
body = {}
# 向服务器提交请求,获取内容
res = requests.post(url=host+apiurl , headers=headers , json=bodys)
# 传入目标主机网址,请求头参数,要查询的参数(集合)
# 打印请求url
print('\n MyUrl: ' + str(res.url) + '\n')
# 打印响应报文
print('\n response.text: \n' + str(res.text) + '\n')
Recommend
About Joyk
Aggregate valuable and interesting links.
Joyk means Joy of geeK