基于FastAPI和Streamlit的AI应用开发

6/8/2021 FlaskFastAPIStreamlit全局解释器锁内存占用分析执行耗时分析

# 1. Flask及FastAPI服务封装

# 1.1 Flask简介

Flask是一个使用Python编写的轻量级Web应用框架。Flask最显著的特点是它是一个“微”框架,轻便灵活,但同时又易于扩展。默认情况下,Flask 只相当于一个内核,不包含数据库抽象层、用户认证、表单验证、发送邮件等其它Web框架经常包含的功能。Flask依赖用各种灵活的扩展来给Web应用添加额外功能。

与Django的对比:Django是一个开源的Python Web应用框架,采用了MVT的框架模式,即模型M,视图V和模版T。Django是一个"大而全"的重量级Web框架,其自带大量的常用工具和组件,甚至还自带了管理后台Admin,适合快速开发功能完善的企业级网站。

Flask项目地址:https://github.com/pallets/flask (opens new window)

# 1.2 Flask通用模板

为了方便日常功能开发,这里放一个自己平时用的 Flask 通用模板,专注于业务逻辑的编写即可。

完整示例代码已在Github上开源:https://github.com/Logistic98/flask-demo (opens new window)

# 1.2.1 常规POST请求

server.py

# -*- coding: utf-8 -*-

from flask import Flask, jsonify
from flask_cors import CORS
from pre_request import pre, Rule

from log import logger
from code import ResponseCode, ResponseMessage

# 创建一个服务
app = Flask(__name__)
CORS(app, supports_credentials=True)

"""
# 方法功能说明
"""
@app.route(rule='/api/moduleName/methodName', methods=['POST'])
def methodName():

    # 参数校验
    rule = {
        "text": Rule(type=str, required=True, gte=3, lte=255),
        "type": Rule(type=int, required=True, gte=1, lte=1)
    }
    try:
        params = pre.parse(rule=rule)
    except Exception as e:
        logger.error(e)
        fail_response = dict(code=ResponseCode.PARAM_FAIL, msg=ResponseMessage.PARAM_FAIL, data=None)
        logger.error(fail_response)
        return jsonify(fail_response)

    # 获取参数
    text = params.get("text")

    # 业务处理模块
    result = text + ",hello world!"
    logger.info("测试日志记录")

    # 成功的结果返回
    success_response = dict(code=ResponseCode.SUCCESS, msg=ResponseMessage.SUCCESS, data=result)
    logger.info(success_response)
    return jsonify(success_response)


if __name__ == '__main__':
    # 解决中文乱码问题
    app.config['JSON_AS_ASCII'] = False
    # 启动服务,指定主机和端口
    app.run(host='0.0.0.0', port=5000, debug=False, threaded=True)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50

log.py

# -*- coding: utf-8 -*-

import logging

logger = logging.getLogger(__name__)
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')

# 输出到控制台
console = logging.StreamHandler()
console.setLevel(logging.INFO)
console.setFormatter(formatter)
logger.addHandler(console)

# 输出到文件
logger.setLevel(level=logging.INFO)
handler = logging.FileHandler("./server.log")
handler.setLevel(logging.INFO)
handler.setFormatter(formatter)
logger.addHandler(handler)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19

code.py

# -*- coding: utf-8 -*-


class ResponseCode(object):
    SUCCESS = 200
    PARAM_FAIL = 400
    BUSINESS_FAIL = 500


class ResponseMessage(object):
    SUCCESS = "请求成功"
    PARAM_FAIL = "参数校验失败"
    BUSINESS_FAIL = "业务处理失败"
1
2
3
4
5
6
7
8
9
10
11
12
13

response.py

# -*- coding: utf-8 -*-

from code import ResponseMessage, ResponseCode


class ResMsg(object):
    """
    封装响应文本
    """
    def __init__(self, data=None, code=ResponseCode.SUCCESS, msg=ResponseMessage.SUCCESS):
        self._data = data
        self._msg = msg
        self._code = code

    def update(self, code=None, data=None, msg=None):
        """
        更新默认响应文本
        :param code:响应状态码
        :param data: 响应数据
        :param msg: 响应消息
        :return:
        """
        if code is not None:
            self._code = code
        if data is not None:
            self._data = data
        if msg is not None:
            self._msg = msg

    def add_field(self, name=None, value=None):
        """
        在响应文本中加入新的字段,方便使用
        :param name: 变量名
        :param value: 变量值
        :return:
        """
        if name is not None and value is not None:
            self.__dict__[name] = value

    @property
    def data(self):
        """
        输出响应文本内容
        :return:
        """
        body = self.__dict__
        body["data"] = body.pop("_data")
        body["msg"] = body.pop("_msg")
        body["code"] = body.pop("_code")
        return body
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50

# 1.2.2 常规GET请求

如果是GET请求,修改两处即可

# 声明处
@app.route(rule='/moduleName/methodName', methods=['GET'])
# 接参处
id = request.args.get("id")
1
2
3
4

# 1.2.3 以base64格式传输图片

server.py

# -*- coding: utf-8 -*-

from uuid import uuid1

from flask import Flask, jsonify
from flask_cors import CORS
from pre_request import pre, Rule

from code import ResponseCode, ResponseMessage
from log import logger
from utils import base64_to_img, create_date_dir

# 创建一个服务
app = Flask(__name__)
CORS(app, supports_credentials=True)

"""
# 方法功能说明
"""
@app.route(rule='/api/moduleName/methodName', methods=['POST'])
def methodName():

    # 参数校验
    rule = {
        "img": Rule(type=str, required=True),
        "file_name": Rule(type=str, required=False)
    }
    try:
        params = pre.parse(rule=rule)
    except Exception as e:
        logger.error(e)
        fail_response = dict(code=ResponseCode.PARAM_FAIL, msg=ResponseMessage.PARAM_FAIL, data=None)
        logger.error(fail_response)
        return jsonify(fail_response)

    # 获取参数
    image_b64 = params.get("img")
    file_name = params.get("file_name")
    if file_name is None:
        file_name = '{}.jpg'.format(uuid1())

    # 将base64字符串解析成图片保存
    img_root_path = "./img"
    img_base_path = create_date_dir(img_root_path)
    img_path = img_base_path + file_name
    try:
        base64_to_img(image_b64, img_path)
    except Exception as e:
        logger.error(e)
        fail_response = dict(code=ResponseCode.PARAM_FAIL, msg=ResponseMessage.PARAM_FAIL, data=None)
        logger.error(fail_response)
        return jsonify(fail_response)

    # 下面对保存的图片进行若干处理
    result = image_b64
    logger.info("测试日志记录")

    # 处理完成后删除生成的图片文件
    # os.remove(img_path)

    # 成功的结果返回
    success_response = dict(code=ResponseCode.SUCCESS, msg=ResponseMessage.SUCCESS, data=result)
    logger.info(success_response)
    return jsonify(success_response)


if __name__ == '__main__':
    # 解决中文乱码问题
    app.config['JSON_AS_ASCII'] = False
    # 启动服务 指定主机和端口
    app.run(host='0.0.0.0', port=5000, debug=False, threaded=True)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71

utils.py

# -*- coding: utf-8 -*-

import base64
import os
import time


# 解析base64生成图像文件
def base64_to_img(image_b64, img_path):
    imgdata = base64.b64decode(image_b64)
    file = open(img_path, 'wb')
    file.write(imgdata)
    file.close()


# 在指定目录下创建当前日期为目录名的子目录
def create_date_dir(file_root_path):
    now_str = time.strftime("%Y%m%d", time.localtime())
    file_base_path = file_root_path + '/' + now_str + '/'
    if not os.path.exists(file_root_path):
        os.makedirs(file_root_path)
    if not os.path.exists(file_base_path):
        os.makedirs(file_base_path)
    return file_base_path
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24

log.py、code.py与response.py同上

client.py

# -*- coding: utf-8 -*-

import base64
import json
import requests


def img_flask_test():
    # 测试请求
    url = 'http://{0}:{1}/api/moduleName/methodName'.format("127.0.0.1", "5000")
    f = open('./test_img/test.png', 'rb')
    # base64编码
    base64_data = base64.b64encode(f.read())
    f.close()
    base64_data = base64_data.decode()
    # 传输的数据格式
    data = {'img': base64_data, 'file_name': '测试中文.png'}
    # post传递数据
    headers = {'Content-Type': 'application/json'}
    r = requests.post(url, headers=headers, data=json.dumps(data))
    print(r.text)


if __name__ == '__main__':
    img_flask_test()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25

注:如果出现中文乱码,可以尝试使用r.text.encode().decode('unicode_escape')获取

# 1.2.4 以文件的形式传输

server.py

# -*- coding: utf-8 -*-

import os
from flask import Flask, jsonify, request
from flask_cors import CORS

from code import ResponseCode, ResponseMessage
from log import logger
from utils import create_date_dir


# 创建一个服务
app = Flask(__name__)
CORS(app, supports_credentials=True)

"""
# 方法功能说明
"""
@app.route(rule='/api/moduleName/methodName', methods=['POST'])
def methodName():

    # 获取参数并参数校验
    file = request.files.get('file')
    if file is None:
        fail_response = dict(code=ResponseCode.PARAM_FAIL, msg=ResponseMessage.PARAM_FAIL, data=None)
        logger.error(fail_response)
        return jsonify(fail_response)
    file_name = file.filename
    suffix = os.path.splitext(file_name)[-1]  # 获取文件扩展名

    # 校验扩展名
    suffix_list = [".avi", ".mov", ".rmvb", ".rm", ".flv", ".mp4", ".3gp", ".mpeg", ".mpg", ".dat", ".asf", ".navi", ".mkv", ".webm", ".ra", ".wmv"]
    if suffix not in suffix_list:
        fail_response = dict(code=ResponseCode.PARAM_FAIL, msg=ResponseMessage.PARAM_FAIL, data=None)
        logger.error(fail_response)
        return jsonify(fail_response)

    # 保存上传的文件
    file_root_path = "./file"
    file_base_path = create_date_dir(file_root_path)
    upload_path = file_base_path + file_name
    file.save(upload_path)

    # 下面对保存的文件进行若干处理
    result = upload_path
    logger.info("测试日志记录")

    # 处理完成后删除上传的文件
    # os.remove(upload_path)

    # 成功的结果返回
    success_response = dict(code=ResponseCode.SUCCESS, msg=ResponseMessage.SUCCESS, data=result)
    logger.info(success_response)
    return jsonify(success_response)


if __name__ == '__main__':
    # 解决中文乱码问题
    app.config['JSON_AS_ASCII'] = False
    # 启动服务 指定主机和端口
    app.run(host='0.0.0.0', port=5000, debug=False, threaded=True)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61

utils.py

# -*- coding: utf-8 -*-

import os
import time


# 在指定目录下创建当前日期为目录名的子目录
def create_date_dir(file_root_path):
    now_str = time.strftime("%Y%m%d", time.localtime())
    file_base_path = file_root_path + '/' + now_str + '/'
    if not os.path.exists(file_root_path):
        os.makedirs(file_root_path)
    if not os.path.exists(file_base_path):
        os.makedirs(file_base_path)
    return file_base_path
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15

log.py、code.py与response.py同上

client.py

# -*- coding: utf-8 -*-

import requests


def file_flask_test():
    # 测试请求
    url = 'http://{0}:{1}/api/moduleName/methodName'.format("127.0.0.1", "5000")
    # post传递数据
    file_path = './test_file/test.mp4'
    files = {'file': open(file_path, "rb")}
    r = requests.post(url, files=files)
    print(r.text)


if __name__ == '__main__':
    file_flask_test()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17

# 1.3 Flask常见问题

# 1.3.1 Flask跨域问题

Step1:引入flask-cors库

$ pip install flask-cors
1

Step2:配置CORS

flask-cors 有两种用法,一种为全局使用,一种对指定的路由使用。

其中CORS提供了一些参数,常用的我们可以配置 originsmethodsallow_headerssupports_credentials

[1] 全局使用

from flask import Flask, request
from flask_cors import CORS

app = Flask(__name__)
CORS(app, supports_credentials=True)
1
2
3
4
5

[2] 局部使用

from flask import Flask, request
from flask_cors import cross_origin

app = Flask(__name__)

@app.route('/')
@cross_origin(supports_credentials=True)
def hello():
    name = request.args.get("name", "World")
    return f'Hello, {name}!'
1
2
3
4
5
6
7
8
9
10

# 1.3.2 Flask中文乱码问题

[1] 发送请求乱码

不管是dump还是dumps,中文乱码加入ensure_ascii=False即可。

json.dump(content, f, ensure_ascii=False)
1

[2] 接收返回值乱码

接收返回值乱码问题,给app配置app.config[‘JSON_AS_ASCII’] = False即可。

if __name__ == "__main__":
    app.config['JSON_AS_ASCII'] = False
    app.run(host='0.0.0.0', port='5000')
1
2
3

# 1.3.3 JSON解析问题

	request_body = request.get_json()
1

这种方法获取请求体中的JSON,有时会因为空格出现问题,导致请求400。为了避免这种情况,接参之后,可以对其去除全部空格。

    request_data = request.get_data(as_text=True)
    request_data = ''.join(request_data.split())  
    request_body = json.loads(request_data)
1
2
3

注:如果入参里要保留空格,则不能通过此方式来处理。

# 1.3.4 Flask并发调用问题

服务端:通过设置app.run()的参数,来达到多线程的效果。多进程或多线程只能选择一个,不能同时开启。

# 1.threaded : 多线程支持,默认为False,即不开启多线程;
app.run(threaded=True)
# 2.processes:进程数量,默认为1.
app.run(processes=True)
1
2
3
4

客户端:通过grequests进行并发请求。

requests是发送接口请求非常主流的库,但是requests发送请求是串行的,即阻塞的。发送完一条请求才能发送另一条请求。为了提升测试效率,一般我们需要并行发送请求。这里可以使用多线程,或者协程,gevent或者aiohttp,然而使用起来,都相对麻烦。grequests是一个基于gevent+requests编写的一个并发发送请求的库,使用起来非常简单。

项目地址:https://github.com/spyoungtech/grequests (opens new window)

依赖安装:

$ pip install grequests
1

示例代码:

# -*- coding: utf-8 -*-

import time
import grequests

start = time.time()
req_list = [grequests.post('http://httpbin.org/post', data={'a':1, 'b':2}) for i in range(10)]
res_list = grequests.map(req_list)
result_list = []
for res in res_list:
    result_list.append(res.text)
print(result_list)
print(len(result_list))
print(time.time()-start)
1
2
3
4
5
6
7
8
9
10
11
12
13
14

# 1.3.5 请求时出现ProxyError问题

使用request请求时有时会遇到requests.exceptions.ProxyError报错,请求时禁用系统代理即可解决此问题。

proxies = { "http": None, "https": None}
requests.get("url", proxies=proxies)
1
2

# 1.3.6 启动时无法导入server报错

情景描述:在 Pychram 里右键点击 Run 'Flask (server.py)',出现 Error: While importing 'server', an ImportError was raised 报错,但没有具体原因。

解决办法:该用 python3 server.py 运行,出现了具体的报错,是因为导包出现问题了。

# 1.3.7 调试时开启Debug模式热部署

Flask自带Debug模式,它可以支持热部署,调试时可以将它开启,代码有变更的话,可以不用重启服务就更新生效变动情况。

   app.run(host='0.0.0.0', port=5000, debug=True)
1

# 1.3.8 限制并发调用使其排队

情景描述:服务器的计算资源不足,部署的算法服务在并发情况下会出现服务挂掉的情形。

解决办法:计算资源不足的情况,可以限制并发调用使其排队来临时解决该问题,当然最好还是通过优化代码让它支持并发。(计算资源充足的情况,可以配置Nginx负载均衡来解决)。

# -*- coding: utf-8 -*-

from flask import Flask, request, jsonify
from flask_cors import CORS
import threading
import time
from queue import Queue

app = Flask(__name__)
CORS(app, supports_credentials=True)

request_queue = Queue()


def handle_request():
    while True:
        request_data, event, response_data = request_queue.get()

        # 模拟处理请求所需的时间
        print(f"处理请求: {request_data}")
        time.sleep(2)

        # 模拟处理结果
        response_data["response"] = "测试输出"
        response_data["status"] = 200
        response_data["time"] = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())

        event.set()  # 处理完成,设置事件
        request_queue.task_done()


threading.Thread(target=handle_request, daemon=True).start()


@app.route(rule='/native', methods=['POST'])
def index():
    request_data = request.json
    event = threading.Event()  # 为这个请求创建一个事件
    response_data = {}
    request_queue.put((request_data, event, response_data))  # 将请求数据和事件放入队列
    event.wait()  # 等待事件被处理线程设置
    return jsonify(response_data)


if __name__ == '__main__':
    app.config['JSON_AS_ASCII'] = False
    app.run(host='0.0.0.0', port=5000, debug=False, threaded=True)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47

测试脚本:执行如下脚本进行测试,可以看到虽然是并发请求,但依旧是排队输出结果。

# -*- coding: utf-8 -*-


import threading
import requests
import json


def send_post_request(url, payload):
    """
    向指定的URL发送POST请求。
    :param url: API的URL。
    :param payload: 要发送的数据。
    :return: None
    """
    headers = {
        "Content-Type": "application/json"
    }

    response = requests.post(url, headers=headers, data=json.dumps(payload))
    try:
        response_json = response.json()
        print(response_json)
    except ValueError:
        print("Response could not be decoded as JSON:", response.text)


def threaded_requests(url, payload, num_threads, total_requests):
    """
    创建并启动多线程以达到指定的请求总量。
    """
    rounds = (total_requests + num_threads - 1) // num_threads  # 计算需要的轮数
    for _ in range(rounds):
        threads = []
        for _ in range(num_threads):
            if total_requests <= 0:
                break  # 如果已经达到请求总量,停止创建新线程
            thread = threading.Thread(target=send_post_request, args=(url, payload))
            thread.start()
            threads.append(thread)
            total_requests -= 1

        for thread in threads:
            thread.join()


if __name__ == '__main__':
    api_url = 'http://127.0.0.1:5000/native'
    payload = {
        "prompt": "测试输入",
        "history": []
    }
    num_threads = 5      # 并发数
    total_requests = 10  # 指定要执行的请求总数

    threaded_requests(api_url, payload, num_threads, total_requests)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56

# 1.4 Flask全局配置

# 1.4.1 打印日志到控制台并写入文件

方式一:只将日志写入文件,控制台不打印的话,在文件开头加上这个即可

import logging

# 日志记录
logging.basicConfig(filename='server.log', level=logging.INFO,
                    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
1
2
3
4
5
6

方式二:打印日志到控制台并写入文件,可以写一个日志输出配置类 log.py

# -*- coding: utf-8 -*-

import logging

logger = logging.getLogger(__name__)
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')

# 输出到控制台
console = logging.StreamHandler()
console.setLevel(logging.INFO)
console.setFormatter(formatter)
logger.addHandler(console)

# 输出到文件
logger.setLevel(level=logging.INFO)
handler = logging.FileHandler("./server.log")
handler.setLevel(logging.INFO)
handler.setFormatter(formatter)
logger.addHandler(handler)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19

使用时直接调用即可。

logger.info("logger.info")
1

# 1.4.2 Flask全局统一封装返回值格式

当前主流的 Web 应用开发通常采用前后端分离模式,前端和后端各自独立开发,然后通过数据接口沟通前后端,完成项目。定义一个统一的数据下发格式,有利于提高项目开发效率,减少各端开发沟通成本。对Flask全局统一封装返回值格式可以减少大量重复代码。

code.py

# -*- coding: utf-8 -*-

class ResponseCode(object):
    SUCCESS = 200
    FAIL = 500

class ResponseMessage(object):
    SUCCESS = "请求成功"
    FAIL = "请求失败"
1
2
3
4
5
6
7
8
9

response.py

# -*- coding: utf-8 -*-

from code import ResponseMessage, ResponseCode


class ResMsg(object):
    """
    封装响应文本
    """
    def __init__(self, data=None, code=ResponseCode.SUCCESS, msg=ResponseMessage.SUCCESS):
        self._data = data
        self._msg = msg
        self._code = code

    def update(self, code=None, data=None, msg=None):
        """
        更新默认响应文本
        :param code:响应状态码
        :param data: 响应数据
        :param msg: 响应消息
        :return:
        """
        if code is not None:
            self._code = code
        if data is not None:
            self._data = data
        if msg is not None:
            self._msg = msg

    def add_field(self, name=None, value=None):
        """
        在响应文本中加入新的字段,方便使用
        :param name: 变量名
        :param value: 变量值
        :return:
        """
        if name is not None and value is not None:
            self.__dict__[name] = value

    @property
    def data(self):
        """
        输出响应文本内容
        :return:
        """
        body = self.__dict__
        body["data"] = body.pop("_data")
        body["msg"] = body.pop("_msg")
        body["code"] = body.pop("_code")
        return body
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50

test_server.py

# -*- coding: utf-8 -*-

from flask import Flask, jsonify
from flask_cors import CORS

from code import ResponseCode, ResponseMessage

# 创建一个服务
app = Flask(__name__)
CORS(app, supports_credentials=True)


@app.route("/test",methods=["GET"])
def test():
	test_dict = dict(name="zhang",age=18)
	data = dict(code = ResponseCode.SUCCESS, msg = ResponseMessage.SUCCESS, data = test_dict)
	return jsonify(data)


if __name__ == '__main__':
    # 解决中文乱码问题
    app.config['JSON_AS_ASCII'] = False
    # 启动服务,指定主机和端口
    app.run(host='0.0.0.0', port=5000, debug=False, threaded=True)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24

# 1.4.3 使用pre-request校验Flask入参

项目介绍:用于验证请求参数的 python 框架,专为 Flask 设计

项目地址:https://github.com/Eastwu5788/pre-request (opens new window)

官方文档:https://pre-request.readthedocs.io/en/master/index.html (opens new window)

依赖安装:pip install pre-request

使用示例:先定义一个 rule 字典,然后使用 params = pre.parse(rule=rule) 校验参数,之后取值使用 params.get("xxx") 即可。

# -*- coding: utf-8 -*-

from flask import Flask
from flask_cors import CORS
from pre_request import pre, Rule

# 创建一个服务
app = Flask(__name__)
CORS(app, supports_credentials=True)

rule = {
    "userName": Rule(type=str, required=True, gte=3, lte=20, dest="user_name"),
    "gender": Rule(type=int, required=True, enum=[1, 2]),
    "age": Rule(type=int, required=True, gte=18, lte=60),
    "country": Rule(type=str, required=False, gte=2, default="中国")
}

@app.route("/user/info", methods=["POST"])
def user_info_handler():
    params = pre.parse(rule=rule)
    userName = params.get("userName")
    gender = params.get("gender")
    age = params.get("age")
    country = params.get("country")
    return "success"


if __name__ == '__main__':
    # 解决中文乱码问题
    app.config['JSON_AS_ASCII'] = False
    # 启动服务 指定主机和端口
    app.run(host='0.0.0.0', port=5000, debug=False, threaded=True)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32

# 1.4.4 Flask-Doc生成接口文档

基本介绍:Flask-Doc 可以根据代码注释生成文档页面,支持Markdown、离线文档下载、在线调试。

项目地址:https://github.com/kwkwc/flask-docs (opens new window)

官方文档:https://github.com/kwkwc/flask-docs/blob/master/README.zh-CN.md (opens new window)

依赖安装:pip install Flask-Docs

使用示例:examples目录里有官方示例,参照里面的sample_app.py编写即可,以下是一些配置项。

# 使用 CDN
# app.config["API_DOC_CDN"] = True

# 禁用文档页面
# app.config["API_DOC_ENABLE"] = False

# SHA256 加密的授权密码,例如这里是 admin
# echo -n admin | shasum -a 256
# app.config["API_DOC_PASSWORD_SHA2"] = "8c6976e5b5410415bde908bd4dee15dfb167a9c873fc4bb8a81f6f2ab448a918"

# 允许显示的方法
# app.config["API_DOC_METHODS_LIST"] = ["GET", "POST", "PUT", "DELETE", "PATCH"]

# 自定义 url_prefix
# app.config["API_DOC_URL_PREFIX"] = "/docs/api"

# 需要排除的 RESTful Api 类名
# app.config["API_DOC_RESTFUL_EXCLUDE"] = ["Todo"]

# 需要显示的 Api 蓝图名称
# app.config["API_DOC_MEMBER"] = ["api", "platform"]

# 需要排除的子成员 Api 函数名称
# app.config["API_DOC_MEMBER_SUB_EXCLUDE"] = ["delete_data"]

# 自动生成请求参数 markdown
# app.config["API_DOC_AUTO_GENERATING_ARGS_MD"] = True

# 禁止以 markdown 处理所有文档
# app.config["API_DOC_ALL_MD"] = False
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30

文档效果:

Flask-Doc

另注:也可以使用 flasgger (opens new window) 来生成 Swagger 风格的接口文档。

# 1.4.5 跨文件全局变量的定义与使用

global关键字可以定义一个变量为全局变量,但是这个仅限于在一个文件中调用全局变量,跨文件就会报错。 既然在一个文件里面可以生效的话,那么我们就专门为全局变量定义一个“全局变量管理模块”就好了。

gol.py

# -*- coding: utf-8 -*-

def _init():  
    global _global_dict
    _global_dict = {}

def set_value(key, value):
    """ 定义一个全局变量 """
    _global_dict[key] = value

def get_value(key, defValue=None):
    """ 获得一个全局变量,不存在则返回默认值 """
    try:
        return _global_dict[key]
    except KeyError:
        return defValue
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16

定义处

import gol

gol._init() # 先必须在主模块初始化(只需要一次即可)
gol.set_value('name', 'zhangsan')
gol.set_value('age', 23)
1
2
3
4
5

调用处

import gol

name = gol.get_value('name')
age = gol.get_value('age')
1
2
3
4

# 1.5 Flask服务打包部署

一般使用Docker来部署Flask项目,它的基本概念及使用就不再赘述了,不会的话见我的另一篇博客:Docker容器化及项目环境管理 (opens new window)

# 1.5.1 导出项目依赖

方法一:使用pip freeze命令导出所有依赖,再进行筛选。

$ pip freeze > requirements.txt
1

注:建议对项目单独建一个conda虚拟环境,再导出依赖,这样导出的依赖就这一个项目的,就不用手动删除无用的了。

方法二:使用pipreqs库导出本项目的依赖,生成的也是requirements.txt文件。

$ pip install pipreqs
$ cd /root/test-project          // 切换到项目根目录
$ pipreqs ./ --encoding=utf8     // 需要带上编码的指定,否则会报GBK编码错误
1
2
3

注意这里还有个坑如下,这是因为本机开了翻墙代理导致的,把代理软件关了就好了。

requests.exceptions.SSLError: HTTPSConnectionPool(host='pypi.python.org', port=443): Max retries exceeded with url: /pypi/cv2/json (Caused by SSLError(SSLEOFError(8, 'EOF occurred in violation of protocol (_ssl.c:1125)')))`
1

另注:可以使用 pipdeptree 工具分析依赖关系

$ pip install pipdeptree
$ pipdeptree
1
2

pipdeptree分析依赖关系

# 1.5.2 使用Docker部署Flask项目

编写Dockerfile,示例如下:

# 基于python3.7镜像创建新镜像
FROM python:3.7
# 创建容器内部目录
RUN mkdir /code
# 将项目复制到内部目录
ADD test-project /code/
# 切换到工作目录
WORKDIR /code
# 安装项目依赖
RUN pip install -r requirements.txt
# 安装vim命令
RUN apt-get update && apt-get install vim -y 
# 放行端口
EXPOSE 5000
# 启动项目
ENTRYPOINT ["nohup","python","server.py","&"]
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16

Step2:将项目和Dockerfile上传到服务器并制作镜像运行容器,示例如下:

$ cd /root/deploy                                                       // 切换到存放项目和Dockerfile的目录
$ docker build -t test-flask-image .                                    // 使用Dockerfile构建镜像
$ docker run -d -p 5000:5000 --name test-flask -e TZ="Asia/Shanghai" test-flask-image:latest  // 通过镜像运行容器
1
2
3

我们可以打包导出镜像,方便迁移到其他服务器上部署。

$ docker save test-image > test-image.v1.dockerimage  
1

# 1.6 FastAPI基本介绍

# 1.6.1 FastAPI简介

FastAPI是一个现代、快速(高性能)的 Web 框架,用于构建基于 Python 的 API。它基于 Starlette 和 Pydantic 库构建而成,提供了强大的功能和高效的性能。

FastAPI的关键特性包括:

  • 快速:性能非常高,可与NodeJS和Go媲美(得益于Starlette和Pydantic),是目前最快的Python框架之一。
  • 编码速度快:开发功能的速度提高约200%到300%。
  • 更少的错误:减少约40%的人为引入的错误。
  • 直观:优秀的编辑器支持,随处都有自动补全功能,减少调试时间。
  • 易用:设计为易于使用和学习,减少阅读文档的时间。
  • 简洁:尽量减少代码重复,每个参数声明都有多种特性。
  • 健壮:构建可用于生产环境的服务,并带有自动交互式文档。
  • 基于标准:基于API的开放标准:OpenAPI(以前称为Swagger)和JSON Schema。

FastAPI官方文档

# 1.6.2 FastAPI与Flask对比

在 Flask 的那个时代,代码执行是单线程、同步的,这意味着要依次处理请求,前一个请求完成前,其他请求消耗在等待 I/O 操作上。而 asyncio 是最好的解决方案,它让 I/O 成为异步操作,无需等待任务完成即可获取任务结果并继续处理其他任务请求。

而 FastAPI 天生支持并发和异步代码,代码写对了就可以在效率上达到极致,所以它被认为是目前最快的 Python 框架,效率和 NodeJS 或 Go 接近。当速度和性能至关重要时,FastAPI 是最佳选择。

下图是 Python 官方的开发者调查中 Web 框架占比的变化,需要注意这个占比数据包含了现有系统,所以 Django 和 Flask 不容易跌得很快,但是趋势是很明显的。

  • 最近 1-2 年,Python 相关的知名新项目中只要有 Web 的,基本都使用 FastAPI。
  • 在 Github 上,FastAPI 的 Star 数已经超过了 Flask。

Python官方的开发者调查中Web框架占比的变化

# 1.7 FastAPI服务封装

# 1.7.1 服务封装

这里以部署一个LLM服务为例,使用CPU来运行Qwen2-0.5B模型,实际部署应该用CUDA环境来运行。

# -*- coding: utf-8 -*-

from fastapi import FastAPI, Request
from transformers import AutoTokenizer, AutoModelForCausalLM
import uvicorn
import json
import datetime
import torch


DEVICE = "cpu"  
app = FastAPI()


@app.post("/")
async def create_item(request: Request):
    global model, tokenizer
    json_post_raw = await request.json()
    json_post = json.dumps(json_post_raw)
    json_post_list = json.loads(json_post)
    prompt = json_post_list.get('prompt')  

    messages = [
        {"role": "system", "content": "You are a helpful assistant."},
        {"role": "user", "content": prompt}
    ]

    input_ids = tokenizer.apply_chat_template(messages, tokenize=False, add_generation_prompt=True)
    model_inputs = tokenizer([input_ids], return_tensors="pt").to(DEVICE)
    generated_ids = model.generate(model_inputs.input_ids, max_new_tokens=512)
    generated_ids = [
        output_ids[len(input_ids):] for input_ids, output_ids in zip(model_inputs.input_ids, generated_ids)
    ]
    response = tokenizer.batch_decode(generated_ids, skip_special_tokens=True)[0]
    now = datetime.datetime.now()
    time = now.strftime("%Y-%m-%d %H:%M:%S")
    answer = {
        "response": response,
        "status": 200,
        "time": time
    }
    log = "[" + time + "] " + '", prompt:"' + prompt + '", response:"' + repr(response) + '"'
    print(log) 
    return answer 


if __name__ == '__main__':
    model_name_or_path = './models/Qwen2-0_5B'
    tokenizer = AutoTokenizer.from_pretrained(model_name_or_path, use_fast=False)
    model = AutoModelForCausalLM.from_pretrained(model_name_or_path, device_map=None, torch_dtype=torch.float32).to(DEVICE)
    uvicorn.run(app, host='0.0.0.0', port=6006, workers=4)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51

注:workers配置是为了指定启动多少个独立的工作进程来处理请求。每个工作进程是独立的,它会处理自己的请求,避免单个进程因处理某个请求而导致阻塞。这个参数的主要目的是提高并发性能,尤其是在 CPU 密集型任务或需要处理大量请求时。

# 1.7.2 服务测试

FastAPI自带Swagger风格的接口文档,访问地址为: http://localhost:6006/docs,与服务的端口号一致。

# -*- coding: utf-8 -*-

import requests
import json

def get_completion(prompt):
    headers = {'Content-Type': 'application/json'}
    data = {"prompt": prompt}
    response = requests.post(url='http://127.0.0.1:6006', headers=headers, data=json.dumps(data))
    return response.json()['response']

if __name__ == '__main__':
    print(get_completion('解释一下量子计算'))
1
2
3
4
5
6
7
8
9
10
11
12
13

# 2. Streamlit框架

# 2.1 Streamlit基本介绍

# 2.1.1 Streamlit简介

Streamlit是一个用于快速创建数据应用程序的Python库。它提供了一种简单而直观的方式来构建交互式Web应用,这些应用可以展示数据可视化、接受用户输入,并实时更新显示结果。使用Streamlit,你无需具备Web开发或前端开发经验,只需使用Python就能创建出功能强大的AI应用程序。

Streamlit官方文档

# 2.1.2 Streamlit与Gradio对比

选型建议:如果需要快速制作demo且功能简单,Gradio是不错的选择。但如果有复杂的可视化和自定义需求,Streamlit更为合适。

Gradio:案例列表 (opens new window)

  • 优点:组件封装好,适合机器学习模型展示,快速创建可共享链接,并且可以在Jupyter Notebook中直接展示。
  • 缺点:扩展性差,定制组件困难,不适合复杂的数据可视化需求。如果官方组件无法完全满足需求,就不建议使用了。

Streamlit:案例列表 (opens new window)

  • 优点:默认组件库更为精细化,并且组件的类型比较多。支持自定义组件和复杂可视化,灵活性强,社区支持好。
  • 缺点:Streamlit由于更加灵活,并且内置功能较多,完全熟练使用需要一定的时间上手,难度相对大一些。

# 2.2 Streamlit实验环境

# 2.2.1 安装Streamlit库

$ pip3 install streamlit
1

# 2.2.2 运行入门示例

Streamlit 提供了一些入门示例,执行如下命令即可。

$ streamlit hello
1

执行后 streamlit 会自动打开浏览器加载一个本地页面 http://localhost:8501,这里面有一些demo和配套的代码。

Streamlit入门示例

# 2.3 Streamlit组件使用

# 2.3.1 Markdown支持

导入 streamlit 后,就可以直接使用 st.markdown() 初始化,调用不同的方法,就可以往文档对象中填入内容。

  • st.title():文章大标题
  • st.header():一级标题
  • st.subheader():二级标题
  • st.text():文本
  • st.code():代码,同时可设置代码的语言,显示的时候会高亮
  • st.latex():latex 公式
  • st.caption():小字体文本

streamlit_demo.py

# -*- coding: utf-8 -*-

import streamlit as st

# markdown
st.markdown('Streamlit Demo')

# 设置网页标题
st.title('一个傻瓜式构建可视化 web的 Python 神器 -- streamlit')

# 展示一级标题
st.header('1. 安装')

st.text('和安装其他包一样,安装 streamlit 非常简单,一条命令即可')
code1 = '''pip3 install streamlit'''
st.code(code1, language='bash')

# 展示一级标题
st.header('2. 使用')

# 展示二级标题
st.subheader('2.1 生成 Markdown 文档')

# 纯文本
st.text('导入 streamlit 后,就可以直接使用 st.markdown() 初始化')

# 展示代码,有高亮效果
code2 = '''import streamlit as st
st.markdown('Streamlit Demo')'''
st.code(code2, language='python')
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30

需要注意的是它的运行方式与普通Python脚本有所不同,应该使用streamlit命令去运行。

$ streamlit run streamlit_demo.py
1

Streamlit组件-Markdown支持

# 2.3.2 数据图表支持

[1] 表格组件

关于数据的表格展示,Streamlit 由两个组件进行支持。

  • table:普通的表格,用于静态数据的展示
  • dataframe:高级的表格,可以进行数据的操作,比如排序
# -*- coding: utf-8 -*-

import numpy as np
import pandas as pd
import streamlit as st

df = pd.DataFrame(
    np.random.randn(10, 5),
    columns=('第%d列' % (i+1) for i in range(5))
)

st.table(df)
1
2
3
4
5
6
7
8
9
10
11
12

Streamlit组件-表格

[2] 监控组件

若你需要做一个监控面板, Streamlit 也为你提供了 metric 组件。

# -*- coding: utf-8 -*-

import streamlit as st

col1, col2, col3 = st.columns(3)
col1.metric("Temperature", "70 °F", "1.2 °F")
col2.metric("Wind", "9 mph", "-8%")
col3.metric("Humidity", "86%", "4%")
1
2
3
4
5
6
7
8

Streamlit组件-监控

[3] 原生图表组件

Streamlit 原生支持多种图表,例如:

1)st.line_chart:折线图

# -*- coding: utf-8 -*-

import numpy as np
import pandas as pd
import streamlit as st

chart_data = pd.DataFrame(
    np.random.randn(20, 3),
    columns=['a', 'b', 'c'])

st.line_chart(chart_data)
1
2
3
4
5
6
7
8
9
10
11

Streamlit组件-折线图

2)st.map:地图

# -*- coding: utf-8 -*-

import numpy as np
import pandas as pd
import streamlit as st

df = pd.DataFrame(
    np.random.randn(1000, 2) / [50, 50] + [37.76, -122.4],
    columns=['lat', 'lon']
)
st.map(df)
1
2
3
4
5
6
7
8
9
10
11

Streamlit组件-地图

[4] 外部图表组件

Streamlit 的一些原生图表组件,虽然做到了傻瓜式,但仅能输入数据、高度和宽度,如果你想更漂亮的图表,就像 matplotlib.pyplot、Altair、vega-lite、Plotly、Bokeh、PyDeck、Graphviz 那样,Streamlit 也提供了支持:st.pyplot、st.bokeh_chart、st.altair_chart、st.altair_chart、st.vega_lite_chart、st.plotly_chart、st.pydeck_chart、st.graphviz_chart。

# 2.3.3 用户操作支持

平时在网页上、APP 上能看到的交互组件,Streamlit 几乎都能支持。

  • download_button:文件下载、file_uploader:文件上传
  • checkbox:复选框、radio:单选框
  • selectbox:下拉单选框、multiselect:下拉多选框
  • slider:滑动条、select_slider:选择条
  • button:按钮、color_picker:颜色选择器
  • text_input:文本输入框、text_area:文本展示框
  • number_input:数字输入框,支持加减按钮
  • date_input:日期选择框、time_input:时间选择框

Streamlit组件-用户操作支持

# 2.3.4 多媒体组件

想要在页面上播放图片、音频和视频,可以使用 streamlit 的这三个组件:st.image、st.audio、st.video

Streamlit组件-多媒体

# 2.3.5 状态组件

状态组件用来向用户展示当前程序的运行状态,包括:

  • progress:进度条,如游戏加载进度
  • spinner:等待提示
  • balloons:页面底部飘气球,表示祝贺
  • error:显示错误信息
  • warning:显示报警信息
  • info:显示常规信息
  • success:显示成功信息
  • exception:显示异常信息(代码错误栈)

Streamlit组件-状态

# 2.3.6 页面布局

Streamlit 是自上而下渲染的,组件在页面上的排列顺序与代码的执行顺序一致,实际上 Streamlit 还提供了多种多样的布局。

Streamlit组件-布局

# 2.3.7 流程控制系统

Streamlit 是自上而下逐步渲染出来的,若你的应用场景需要对渲染做一些控制,Streamlit 也有提供对应的方法。

  • st.stop:可以让 Streamlit 应用停止而不向下执行,如:验证码通过后,再向下运行展示后续内容。
  • st.form:表单,Streamlit 在某个组件有交互后就会重新执行页面程序,而有时候需要等一组组件都完成交互后再刷新(如:登录填用户名和密码),这时候就需要将这些组件添加到 form 中。
  • st.form_submit_button:在 form 中使用,提交表单。

# 2.3.8 缓存特性提升速度

当用户在页面上做一些操作的时候,比如输入数据,都会触发整个 Streamlit 应用代码的重新执行,如果其中有读取外部数据的步骤(如 GB量级的数据),那这种性能损耗是非常可怕的。

但 Streamlit 提供了一个缓存装饰器,当要重新执行代码渲染页面的时候,就会先去缓存里查一下,如果代码或者数据没有发生变化,就直接调用缓存的结果即可。

Streamlit缓存特性

# 2.4 Streamlit制作页面

以接入前面使用FastAPI封装的大模型服务为例,制作一个简易的大模型问答页,Streamlit代码:

# -*- coding: utf-8 -*-

import streamlit as st
import requests

st.title("💬 Qwen2 LLM Chat")

if "messages" not in st.session_state:
    st.session_state["messages"] = [{"role": "assistant", "content": "有什么可以帮您的?"}]

for msg in st.session_state.messages:
    st.chat_message(msg["role"]).write(msg["content"])


def get_response_from_api(prompt):
    api_url = "http://localhost:6006/"
    headers = {"Content-Type": "application/json"}
    payload = {"prompt": prompt}
    response = requests.post(api_url, json=payload, headers=headers)
    if response.status_code == 200:
        return response.json().get("response", "无响应")
    else:
        return f"Error: {response.status_code}"


if prompt := st.chat_input():
    st.session_state.messages.append({"role": "user", "content": prompt})
    st.chat_message("user").write(prompt)
    response = get_response_from_api(prompt)
    st.session_state.messages.append({"role": "assistant", "content": response})
    st.chat_message("assistant").write(response)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31

页面效果:

Streamlit制作的LLM问答页

# 3. Python常用工具函数

# 3.1 系统与文件目录的基本操作

# 3.1.1 基本文件和目录操作

import os
import shutil

os.getcwd()                # 获取当前路径
os.listdir()               # 将当前目录的文件及目录保存成一个列表
os.path.exists(dir_path)   # 检查文件或目录是否存在
os.makedirs(dir_path)      # 创建目录
os.chdir(dir_path)         # 切换目录
os.remove(file_path)       # 删除指定文件
os.removedirs(dir_path)    # 删除空目录
shutil.rmtree(dir_path)    # 递归删除目录(可为空,也可不为空)
os.path.abspath(dir_path)  # 相对路径对应的绝对路径
os.chdir(dir_path)         # 更改路径
os.path.isdir(path)        # 判断路径是否是目录
os.path.isfile(path)       # 判断路径是否是文件
os.system('open {}'.format(path)) # 打开指定目录
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16

# 3.1.2 复制某个文件并重命名

# 复制某个文件并重命名:sample-原始文件路径、new_path-新文件目录、file_name-新文件名称
def copy_rename_file(sample,new_path,file_name):
    if not os.path.exists(new_path):
        os.makedirs(new_path)
    new_file = os.path.join(new_path, file_name)
    shutil.copy(sample, new_file)
    return new_file

# 单纯的只是文件重命名
os.rename(old_file_name, new_file_name) 
1
2
3
4
5
6
7
8
9
10

# 3.1.3 获取文件大小及创建、修改时间

# -*- coding: utf-8 -*-

import time
import os

# 把时间戳转化为时间
def TimeStampToTime(timestamp):
    timeStruct = time.localtime(timestamp)
    return time.strftime('%Y-%m-%d %H:%M:%S', timeStruct)

# 获取文件的大小,结果保留两位小数,单位为KB
def get_FileSize(filePath):
    fsize = os.path.getsize(filePath)
    fsizeFormat = round(fsize / float(1024), 2)
    return fsizeFormat

# 获取文件的创建时间
def get_FileCreateTime(filePath):
    t = os.path.getctime(filePath)
    return TimeStampToTime(t)

# 获取文件的修改时间
def get_FileModifyTime(filePath):
    t = os.path.getmtime(filePath)
    return TimeStampToTime(t)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25

# 3.1.4 检查路径是否有中文

# 检查路径是否有中文
zhmodel = re.compile(u'[\u4e00-\u9fa5]')
match = zhmodel.search(path)
if match:
	print("The path cannot contain Chinese!")
1
2
3
4
5

# 3.1.5 读取指定目录下的子目录列表

# 读取指定目录下的所有文件夹保存成列表
def read_dir_to_list(file_dir_path):
    file_dir_list = os.listdir(file_dir_path)
    return file_dir_list
1
2
3
4

# 3.1.6 级联遍历目录获取所有文件路径

import os

# 级联遍历目录,获取目录下的所有文件路径
def find_filepaths(dir):
    result = []
    for root, dirs, files in os.walk(dir):
        for name in files:
            filepath = os.path.join(root, name)
            if os.path.exists(filepath):
                result.append(filepath)
    return result
1
2
3
4
5
6
7
8
9
10
11

# 3.1.7 从文件路径列表筛选出指定后缀文件

从文件路径列表筛选出指定后缀的文件

import fnmatch

# 从文件路径列表中筛选出指定后缀的文件(suffix需要带上通配符,如*.py)
def getSufFilePath(fileList, suffix):
    result = fnmatch.filter(fileList, suffix)
    return result
  
# 从文件路径中筛选出多种指定后缀的文件名(suffixList取值示例:['jpg','jpeg', 'bmp', 'png', 'gif'])
def getSufListFilePath(dirPath, suffixList):
    result = [fn for fn in os.listdir(dirPath)
                  if any(fn.endswith(ext) for ext in suffixList)]
    return result
1
2
3
4
5
6
7
8
9
10
11
12

注:也可使用glob库来实现

import glob

img_path_list = glob.glob('./input/*.jpg')
1
2
3

筛选出扩展名符合条件的文件路径列表

# -*- coding: utf-8 -*-

import os

# 级联遍历⽬录,获取⽬录下的所有⽂件路径
def find_filepaths(dir):
    result = []
    for root, dirs, files in os.walk(dir):
        for name in files:
            filepath = os.path.join(root, name)
            if os.path.exists(filepath):
                result.append(filepath)
    return result


# 筛选出扩展名符合条件的文件路径列表(扩展名不带通配符,extList例如['.jpg', '.png'])
def checkDirOrFilePath(path, extList):

    file_path_list = []
    if os.path.isdir(path):  # 判断路径是否是目录
        file_path_list = find_filepaths(path)
    elif os.path.isfile(path):  # 判断路径是否是文件
        file_path_list.append(path)
    elif path.find(",") != -1:  # 判断路径是否是逗号分隔的多选文件
        file_path_list = path.split(",")
    elif path.find(";") != -1:  # 判断路径是否是分号分隔的多选文件
        file_path_list = path.split(";")

    result_list = []
    for file_path in file_path_list:
        file_dir, file_full_name = os.path.split(file_path)
        file_name, file_ext = os.path.splitext(file_full_name)
        if file_ext in extList:
            result_list.append(file_path)

    return result_list
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36

# 3.1.8 递归获取某目录下某后缀的文件路径

程序分为两步,第一步,采用递归的方式获得文件夹下所有文件的路径列表;第二步,从文件路径列表中根据后缀利用.endswith(后缀)的方法筛选指定文件。

import os

# 从指定path下递归获取所有文件
def getAllFile(path, fileList):
    dirList = []    # 存放文件夹路径的列表
    for ff in os.listdir(path):
        wholepath = os.path.join(path, ff)
        if os.path.isdir(wholepath):
            dirList.append(wholepath)   # 如果是文件添加到结果文件列表中
        if os.path.isfile(wholepath):
            fileList.append(wholepath)  # 如果是文件夹,存到文件夹列表中
    for dir in dirList:
        getAllFile(dir, fileList)   # 对于dirList列表中的文件夹,递归提取其中的文件,fileList一直往下传,所有的文件路径都会被保存在这个列表中

# 从文件路径列表中筛选出指定后缀的文件
def getSufFilePath(fileList, suffix):
    for ff in fileList[:]:
        if not ff.endswith(suffix):
                fileList.remove(ff)
                
if __name__ == '__main__':
    flist = []
    findpath = r'./testdir'
    getAllFile(findpath, flist)
    print('allfile:', len(flist))	# filepath下的文件总数
    getSufFilePath(flist, '.txt')

    print('Docfile:', len(flist))	# filepath下的指定类型文件总数(这里是.txt文件的数量)
    for ff in flist:
        print(ff)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30

# 3.1.9 根据md5进行文件去重

import os
import hashlib

# 获取md5值
def get_md5(file):
    file = open(file,'rb')
    md5 = hashlib.md5(file.read())
    file.close()
    md5_values = md5.hexdigest()
    return md5_values

if __name__ == '__main__':
    file_path = "./data"
    os.chdir(file_path)
    file_list = os.listdir(file_path)
    md5_list =[]
    for file in file_list:
        md5 = get_md5(file)
        if md5 not in md5_list:
            md5_list.append(md5)
        else:
            os.remove(file)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22

# 3.1.10 将文件转成文件流提供下载

# 检验是否含有中文字符
def is_contains_chinese(strs):
    for _char in strs:
        if '\u4e00' <= _char <= '\u9fa5':
            return True
    return False


# 将文件转成文件流提供下载
def download_file(file_path):

    # 文件路径、文件名、后缀分割
    file_dir, file_full_name = os.path.split(file_path)
    file_name, file_ext = os.path.splitext(file_full_name)

    # 文件名如果包含中文则进行编码
    if is_contains_chinese(file_name):
        file_name = urllib.parse.quote(file_name)
    new_file_name = file_name + file_ext

    # 流式读取下载
    def send_file():
        with open(file_path, 'rb') as targetfile:
            while 1:
                data = targetfile.read(20 * 1024 * 1024)   # 每次读取20M
                if not data:
                    break
                yield data
    response = Response(send_file(), content_type='application/octet-stream')
    response.headers["Content-disposition"] = 'attachment; filename=%s' % new_file_name
    return response
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31

# 3.1.11 将网络图片转存成base64

import base64
import requests as req
from io import BytesIO

# 将网络图片转存成base64
def urltobase64(url):
    # 发送请求并将图片保存在内存
    try:
        response = req.get(url)
        http_code = response.status_code
        if http_code == 200:
            # 得到图片的base64编码
            ls_f = base64.b64encode(BytesIO(response.content).read())
            imgdata = str(ls_f, 'utf-8')
        else:
            imgdata = ""
    except Exception as e:
        print(e)
        imgdata = ""
    return imgdata
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20

# 3.1.12 起始时间及执行时间统计

import time
from decimal import Decimal

start_time = time.time()
start_time_str = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())

time.sleep(2.22222)

end_time = time.time()
end_time_str = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())

time_consuming_str = str(Decimal(str((end_time - start_time) * 1000)).quantize(Decimal('0.00'))) + 'ms'

print(start_time_str)
print(end_time_str)
print(time_consuming_str)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16

获取指定日期是星期几

import datetime
import calendar

date_week = datetime.date(2022, 2, 22).weekday()
print(calendar.day_name[date_week])
1
2
3
4
5

获取当前日期是星期几

from datetime import date
import calendar

curr_date = date.today()
print(calendar.day_name[curr_date.weekday()])
1
2
3
4
5

# 3.1.13 设置超时操作

import time
import eventlet

eventlet.monkey_patch()
# 设置超时时间为2秒
with eventlet.Timeout(2, False):
   time.sleep(3)
   print('没有跳过这条输出')
   
print('End')
1
2
3
4
5
6
7
8
9
10

# 3.1.14 检查文件编码

import chardet

# 检查文件编码
def check_charset(file_path):
    with open(file_path, "rb") as f:
        data = f.read(4)
        charset = chardet.detect(data)['encoding']
    return charset
1
2
3
4
5
6
7
8

# 3.1.15 文件路径、文件名、后缀分割

import os

file_path = "/root/tmp/test.pdf"
file_dir, file_full_name = os.path.split(file_path)
print(file_dir)         # /root/tmp
print(file_full_name)   # test.pdf
file_name, file_ext = os.path.splitext(file_full_name)
print(file_name)        # test
print(file_ext)         # .pdf
1
2
3
4
5
6
7
8
9

# 3.1.16 引用内部目录代码找不到相对路径

场景描述:外层代码做服务封装,内层代码是核心算法代码,但内层代码是相对路径写的,在外层调用时找不到该文件,将相对路径改写成如下格式即可解决。

current_dir = os.path.dirname(os.path.abspath(__file__))
file_path = os.path.join(current_dir, 'demo_file.txt')
1
2

# 3.1.17 URL解析及获取HTTP请求的状态码

URL解析

from urllib.parse import urlparse

o = urlparse("https://www.imiao.top/avatar.png")

print(o)
print(o.path)

>>> 输出结果
ParseResult(scheme='https', netloc='www.imiao.top', path='/avatar.png', params='', query='', fragment='')
/avatar.png
1
2
3
4
5
6
7
8
9
10

获取HTTP请求的状态码

def get_status(url):
    r = requests.get(url, allow_redirects=False)
    return r.status_code
1
2
3

# 3.1.18 执行系统命令及脚本

方式一:使用 os.system("command")

这是python自带的执行shell命令的方法,返回值为0则表示命令执行成功,但是使用system()无法将执行的结果保存起来。

import os

print(os.system("touch test.txt"))
1
2
3

方式二:使用os.popen("command")

上面的 os.system() 方法无法查看 shell 命令返回的结果,通过 os.popen() 返回的是 file 对象,对其进行 read() 操作可以看到执行的输出。

import os

f = os.popen("ls -l")  # 返回的是一个文件对象
print(f.read())        # 通过文件的read()读取所返回的内容
1
2
3
4

注:对于无返回值的命令,依然可以用该方法。

# 3.1.19 监听系统剪贴板

安装pyperclip模块

$ pip install pyperclip
1

基本使用

pyperclip.copy(text)       # 把text字符串中的字符复制到剪切板
text = pyperclip.paste()   # 把剪切板上的字符串复制到text
1
2

监听剪切板:

# -*- coding: utf-8 -*-

import pyperclip
import time


class monitor():
    def clipboard_get(self):
        """获取剪贴板数据"""
        data = pyperclip.paste()
        return data

    def main(self):
        """后台脚本:每隔0.1秒,读取剪切板文本"""
        recent_txt = self.clipboard_get()  # 存放最近一次剪切板文本
        while True:
            txt = self.clipboard_get()  # txt 存放当前剪切板文本
            if txt != recent_txt:
                recent_txt = txt
                return recent_txt
            # 检测间隔(延迟0.1秒)
            time.sleep(0.1)


if __name__ == '__main__':
    while True:
        t = monitor().main()
        print(t)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28

# 3.1.20 ZIP格式压缩与解压文件

# -*- coding: utf-8 -*-

import zipfile
import os
from tqdm import tqdm


def zip_file(src_dir, save_name='default'):
    '''
    压缩文件夹下所有文件及文件夹
    默认压缩文件名:文件夹名
    默认压缩文件路径:文件夹上层目录
    '''
    if save_name == 'default':
        zip_name = src_dir + '.zip'
    else:
        if save_name is None or save_name == '':
            zip_name = src_dir + '.zip'
        else:
            zip_name = save_name + '.zip'

    z = zipfile.ZipFile(zip_name, 'w', zipfile.ZIP_DEFLATED)
    for dirpath, dirnames, filenames in os.walk(src_dir):
        fpath = dirpath.replace(src_dir, '')
        fpath = fpath and fpath + os.sep or ''
        for filename in filenames:
            z.write(os.path.join(dirpath, filename), fpath + filename)
    z.close()
    return True


def zip_file_by_extension(src_dir, extension, save_name='default'):
    '''
    根据文件扩展名筛选需要压缩的文件到压缩包
    注:针对单层目录情况,文件夹下的子文件不会被压缩
    '''
    if save_name == 'default':
        zip_name = src_dir + '.zip'
    else:
        if save_name is None or save_name == '':
            zip_name = src_dir + '.zip'
        else:
            zip_name = save_name + '.zip'

    z = zipfile.ZipFile(zip_name, 'w', zipfile.ZIP_DEFLATED)
    file_list = os.listdir(src_dir)
    for filename in tqdm(file_list):
        if not os.path.isfile(os.path.join(src_dir, filename)):
            continue

        if filename.split('.')[-1] == extension:
            print(r'Compressing file:', filename, end='\n')
            z.write(os.path.join(src_dir, filename), filename)
    z.close()
    return True


def get_zip_file_list(src_dir, extension):
    '''
    根据扩展名获取需要压缩的文件列表
    '''
    file_list = os.listdir(src_dir)
    zip_list = []
    for filename in file_list:
        if not os.path.isfile(os.path.join(src_dir, filename)):
            continue
        if filename.split('.')[-1] == extension:
            zip_list.append(filename)
    print(zip_list)


def unzip_file(zip_src, dst_dir):
    '''
    解压缩
    '''
    r = zipfile.is_zipfile(zip_src)
    if r:
        fz = zipfile.ZipFile(zip_src, 'r')
        for file in fz.namelist():
            fz.extract(file, dst_dir)
    else:
        print('This is not zip')
        return False
    return True
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84

注:它支持解密 ZIP 中的加密文件,但是目前不能创建一个加密的文件,而且解密非常慢,因为它是使用原生 Python 而不是 C 实现的。详见:zipfile官方文档 (opens new window)

# 3.1.21 获取系统信息

可以使用 platform 模块来获取系统信息

import platform


def showinfo(tip, info):
    print("{}:{}".format(tip,info))


if __name__ == '__main__':
    showinfo("操作系统及版本信息", platform.platform())
    showinfo('获取系统版本号', platform.version())
    showinfo('获取系统名称', platform.system())
    showinfo('系统位数', platform.architecture())
    showinfo('计算机类型', platform.machine())
    showinfo('计算机名称', platform.node())
    showinfo('处理器类型', platform.processor())
    showinfo('计算机相关信息', platform.uname())
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16

输出结果:

操作系统及版本信息:macOS-13.1-arm64-arm-64bit
获取系统版本号:Darwin Kernel Version 22.2.0: Fri Nov 11 02:03:51 PST 2022; root:xnu-8792.61.2~4/RELEASE_ARM64_T6000
获取系统名称:Darwin
系统位数:('64bit', '')
计算机类型:arm64
计算机名称:MacBookPro2021.local
处理器类型:arm
计算机相关信息:uname_result(system='Darwin', node='MacBookPro2021.local', release='22.2.0', version='Darwin Kernel Version 22.2.0: Fri Nov 11 02:03:51 PST 2022; root:xnu-8792.61.2~4/RELEASE_ARM64_T6000', machine='arm64')
1
2
3
4
5
6
7
8

# 3.1.22 创建当前日期为目录名的子目录

import os
import time

# 在指定目录下创建当前日期为目录名的子目录
def create_date_dir(file_root_path):
    now_str = time.strftime("%Y%m%d", time.localtime())
    file_base_path = file_root_path + '/' + now_str + '/'
    if not os.path.exists(file_root_path):
        os.makedirs(file_root_path)
    if not os.path.exists(file_base_path):
        os.makedirs(file_base_path)
    return file_base_path
1
2
3
4
5
6
7
8
9
10
11
12

# 3.1.23 保持结构复制文件及子目录至新目录

import os
import shutil


def copy_dir_tree(src_dir, dest_dir):
    """
    复制源目录中的所有文件和子目录到目标目录中,
    保留原有的目录结构,只复制那些最近修改过的文件。
    """
    for root, dirs, files in os.walk(src_dir):
        # 获取当前目录相对于源目录的相对路径
        rel_path = os.path.relpath(root, src_dir)
        # 创建目标目录中的相应子目录
        dest_root = os.path.join(dest_dir, rel_path)
        os.makedirs(dest_root, exist_ok=True)
        # 复制该目录下最近修改过的文件到目标目录中
        for name in files:
            src_file = os.path.join(root, name)
            dest_file = os.path.join(dest_root, name)
            if os.path.exists(dest_file):
                # 如果目标文件已经存在,只复制那些最近修改过的文件
                src_mtime = os.stat(src_file).st_mtime
                dest_mtime = os.stat(dest_file).st_mtime
                if src_mtime <= dest_mtime:
                    continue
            shutil.copy2(src_file, dest_file)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26

# 3.1.24 获取git.log的代码提交信息

# -*- coding: utf-8 -*-

from git.repo import Repo
import datetime


# 拉取git.log信息
def get_git_log(start_date, code_path):
    repo = Repo(path=code_path)
    # 执行git命令,参数用逗号隔开
    repo = str(
        repo.git.log("--since='%s'" % str(start_date) + " 00:00:00", "--until='%s'" % str(start_date) + " 23:59:59",
                     "--no-merges", "--branches", "--shortstat", "--pretty=startLog%an-%cd"))
    # 去掉换行符
    repo = repo.replace('\n', '')
    # 由于git.log命名中,故意增加了startLog字符,用于分割提交记录
    repo = repo.split("startLog")
    # 将得到的list去除空字符串
    repo = [x.strip() for x in repo if x.strip() != '']
    return repo


if __name__ == "__main__":
    code_path = '/root/your_project'
    begin = datetime.date(2022, 1, 1)
    end = datetime.date(2022, 12, 31)
    # 循环begin和end期间的每一天
    for d in range((end - begin).days + 1):
        day = begin + datetime.timedelta(d)
        a = get_git_log(day, code_path)
        print(a)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31

输出结果的格式如下:

['yoyo-Thu Jan 26 20:49:50 2022 +0800 406 files changed, 3279 insertions(+), 3279 deletions(-)', 'yoyo-Thu Jan 26 19:50:02 2022 +0800 404 files changed, 3223 insertions(+), 3223 deletions(-)', 'yoyo-Thu Jan 26 11:14:30 2022 +0800 410 files changed, 4641 insertions(+), 4638 deletions(-)']
['yoyo-Fri Jan 27 19:24:06 2022 +0800 405 files changed, 4697 insertions(+), 4697 deletions(-)', 'yoyo-Fri Jan 27 10:19:28 2022 +0800 406 files changed, 5398 insertions(+), 5392 deletions(-)']
1
2

# 3.1.25 过滤html代码提取内容文本

import re

def del_html_tag(html_str):
    # 定义script的正则表达式
    reg_ex_script = re.compile(r"<script[^>]*?>[\s\S]*?<\/script>", re.IGNORECASE)
    # 定义style的正则表达式
    reg_ex_style = re.compile(r"<style[^>]*?>[\s\S]*?<\/style>", re.IGNORECASE)
    # 定义HTML标签的正则表达式
    reg_ex_html = re.compile(r"<[^>]+>", re.IGNORECASE)
    # 定义HTML标签的正则表达式
    reg_ex_html2 = re.compile(r"\{[^\}]+\}", re.IGNORECASE)
    # 过滤script标签
    html_str = re.sub(reg_ex_script, '', html_str)
    # 过滤style标签
    html_str = re.sub(reg_ex_style, '', html_str)
    # 过滤html标签
    html_str = re.sub(reg_ex_html, '', html_str)
    # 过滤html标签
    html_str = re.sub(reg_ex_html2, '', html_str)
    # 返回文本字符串
    return html_str.strip()

# 使用示例
html_content = """<html><head><style type="text/css">body {font-size:12px;}</style></head>
<body><p>Hello, World!</p><script type="text/javascript">alert('Hello, World!');</script></body></html>"""

text_content = del_html_tag(html_content)
print(text_content)  # 输出: Hello, World!
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28

# 3.2 解析Excel和CSV文件

需要安装的依赖库

$ pip install xlrd==1.2.0
$ pip install xlwt 
$ pip install pandas
1
2
3

注意事项:

[1] 新版 xlrd 报 Excel xlsx file; not supported错误(原因:xlrd更新到了2.0.1版本,只支持.xls文件,不支持.xlsx)

[2] Python3.9使用xlrd时报错:AttributeError: 'ElementTree' object has no attribute 'getiterator'

找到xlrd依赖源码里的 xlsx.py 文件,将两个地方的 getiterator() 修改成 iter()。

# 3.2.1 Excel与CSV转字典列表

# Excel转字典列表
def excel_to_dict(path):

    # 判断是否为文件路径
    if os.path.exists(path):
        workbook = xlrd.open_workbook(path)
    else:
        workbook = xlrd.open_workbook(filename=path.name, file_contents=path.read())

    # 根据sheet索引或者名称获取sheet内容
    data_sheet = workbook.sheets()[0]
    # 获取sheet名称,行数,列数据
    sheet_nrows = data_sheet.nrows
    sheet_ncols = data_sheet.ncols

    # excel转dict
    get_data = []
    for i in range(1, sheet_nrows):
        # 定义一个空字典
        sheet_data = {}
        for j in range(sheet_ncols):
            # 获取单元格数据
            c_cell = data_sheet.cell_value(i, j)
            # 循环每一个有效的单元格,将字段与值对应存储到字典中
            sheet_data[data_sheet.row_values(0)[j]] = c_cell
        # 再将字典追加到列表中
        get_data.append(sheet_data)
    # 返回从excel中获取到的数据:以列表存字典的形式返回
    return get_data


# CSV转字典列表
def csv_to_dict(path):
    get_data = []
    with open(path, 'r',encoding="GBK") as f:
        reader = csv.reader(f)
        fieldnames = next(reader)
        csv_reader = csv.DictReader(f, fieldnames=fieldnames)
        for row in csv_reader:
            d = {}
            for k, v in row.items():
                d[k] = v
            get_data.append(d)
    return get_data
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44

# 3.2.2 字典列表转Excel与CSV

# 将字典列表导出xls
def export_xls(path, dic_data):
    data_list = []
    # 循环得到每一个data
    for data in dic_data:
        # 循环得到data字典里的所有键值对的值
        for value in data.values():
            # 将得到的值放入空列表中
            data_list.append(value)
    # 创建一个新的列表生成式并赋给一个变量new_list.
    # 这个列表生成式主要是将数据每3个为一个新的元素存入新的列表中,即列表套列表
    new_list = [data_list[i:i + 3] for i in range(0, len(data_list), 3)]
    # 生成一个xlwt.Workbook对象
    xls = xlwt.Workbook()
    # 调用对象的add_sheet方法
    sheet = xls.add_sheet('Sheet1', cell_overwrite_ok=True)
    # 创建我们需要的第一行的标头数据
    heads = ['id', 'message', 'result']
    ls = 0
    # 将标头循环写入表中
    for head in heads:
        sheet.write(0, ls, head)
        ls += 1
    i = 1
    # 将数据分两次循环写入表中 外围循环行
    for list in new_list:
        j = 0
        # 内围循环列
        for data in list:
            sheet.write(i, j, data)
            j += 1
        i += 1
    # 最后将文件save保存
    xls.save(path)


# 将字典列表导出xlsx
def export_xlsx(path, dic_data):
    # 将字典列表转换为DataFrame
    pf = pd.DataFrame(list(dic_data))
    # 指定字段顺序
    order = ['id', 'message', 'result']
    pf = pf[order]
    # 指定生成的Excel表格名称
    file_path=pd.ExcelWriter(path)
    # 替换空单元格
    pf.fillna(' ', inplace=True)
    # 输出
    pf.to_excel(file_path, encoding='utf-8', index=False)
    # 保存表格
    file_path.save()


# 将字典列表导出csv
def export_csv(path, dic_data):
    with open(path, 'w', newline='') as f:
        fieldnames = ['id', 'message', 'result']
        writer = csv.DictWriter(f, fieldnames=fieldnames)
        writer.writeheader()
        for item in dic_data:
            writer.writerow(item)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61

# 3.2.3 其他操作Excel和CSV的示例

[1] 读写操作xlsx示例

# -*- coding: utf-8 -*-

import openpyxl
import xlrd

# 将数据写入到xlsx文件里(新创建文件)
def write_xlsx(path, sheetname, value):
    index = len(value)
    workbook = openpyxl.Workbook()
    sheet = workbook.active
    sheet.title = sheetname
    for i in range(0, index):
        for j in range(0, len(value[i])):
            sheet.cell(row=i+1, column=j+1, value=str(value[i][j]))
    workbook.save(path)

# 将数据追加写入到xlsx文件里(已有文件追加写入)
def append_write_xlsx(path, sheetname, value):
    workbook = openpyxl.load_workbook(path)
    sheet = workbook[sheetname]
    sheet.append(value)
    workbook.save(path)

# 读取xlsx文件信息
def read_xlsx(path, sheetname):
    wb = xlrd.open_workbook(path)
    sh = wb.sheet_by_name(sheetname)
    result = {}
    for i in range(1, sh.nrows):
        result[sh.row_values(i)[0]] = sh.row_values(i)[1]
    return result

if __name__ == '__main__':
    
    path = './test.xlsx'
    sheetname = '测试'
    head_value = [['id', 'name']]
    body_value = ['001', 'zhangsan']

    write_xlsx(path, sheetname, head_value)
    append_write_xlsx(path, sheetname, body_value)
    result = read_xlsx(path, sheetname)
    print(result)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43

[2] 新建csv文件并写入数据

import csv
def create_csv():
    csv_path = "./test.csv"
    with open(csv_path,'w', newline='', encoding='GBK') as f:
        csv_write = csv.writer(f)
        csv_head = ["good","bad"]
        csv_write.writerow(csv_head)
1
2
3
4
5
6
7

注:newline=''是为了解决csv的隔行空行问题。选择GBK编码,否则使用Excel打开会出现乱码问题。

[3] 操作csv文件实现对特定列排序

def sort_csv(csv_path):
    datas = []  # 用于存放排序过的数据
    with open(csv_path, 'r', encoding='GBK') as f:
        table = []
        index = 0
        for line in f:
            index = index + 1
            if index == 1:
                continue
            col = line.split(',')
            col[1] = int(col[1].strip("\n"))
            table.append(col)
        table_sorted = sorted(table, key=itemgetter(1), reverse=True)  # 精确的按照第2列排序
        for row in table_sorted:
            datas.append(row)
    f.close()
    with open(csv_path, "w", newline='', encoding='GBK') as csvfile:
        writer = csv.writer(csvfile)
        csv_head = ["关键词", "词频"]
        writer.writerow(csv_head)
        for data in datas:
            writer.writerow(data)
    csvfile.close()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23

# 3.3 读写配置文件

# 3.3.1 读写JSON文件里的配置信息

配置文件config.json:

 {
     "DB_URL": "127.0.0.1:1521/orcl",
     "DB_USER": "test",
     "DB_PASSWORD": "123456"
 }
1
2
3
4
5

工具函数:

import json

# 将字典写入json
def dict_to_json_file(dict, path):
    with open(path, "w", encoding="utf-8") as f:
        json.dump(dict, f)

# 读取json为字典
def read_json_to_dict(path):
    with open(path, "r", encoding="utf-8") as f:
        confstr = f.read()
        conf = json.loads(confstr)
    return conf
1
2
3
4
5
6
7
8
9
10
11
12
13

调用示例:

conf_path = './config/config.json'
conf = read_json_to_dict(conf_path)
conn = cx_Oracle.connect(conf['DB_USER'], conf['DB_PASSWORD'], conf['DB_URL'])
1
2
3

注意事项:

1)JSON文件不要同时进行读写,写入时可能会出现无法解析导致读取失败的情况。

2)可以使用如下方式格式化 JSON 输出。

import json
print(json.dumps(json_data, sort_keys=True, indent=4, ensure_ascii=False))
1
2

# 3.3.2 读写INI文件里的配置信息

配置文件config.ini:

[SOURCE_ES]
host = 111.111.111.111
port = 9200
user = elastic
password = elastic
timeout = 60
1
2
3
4
5
6

读取Section内容:

from configparser import ConfigParser

def read_config():
    cfg = ConfigParser()
    cfg.read('./config.ini', encoding='utf-8')
    host = cfg.get('TARGET_ES', 'host')
    port = cfg.get('TARGET_ES', 'port')
    user = cfg.get('TARGET_ES', 'user')
    password = cfg.get('TARGET_ES', 'password')
    timeout = cfg.get('TARGET_ES', 'timeout')
    es_dict = {}
    es_dict['host'] = host
    es_dict['port'] = port
    es_dict['user'] = user
    es_dict['password'] = password
    es_dict['timeout'] = timeout
    return es_dict
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17

批量读取ini文件里的所有配置项(建议用这个来读取):

# -*- coding: utf-8 -*-

from configparser import ConfigParser


def read_config(config_path):
    cfg = ConfigParser()
    cfg.read(config_path, encoding='utf-8')
    section_list = cfg.sections()
    config_dict = {}
    for section in section_list:
        section_item = cfg.items(section)
        for item in section_item:
            config_dict[item[0]] = item[1]
    return config_dict


if __name__ == '__main__':
    config_path = "./config.ini"
    print(read_config(config_path))
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20

修改Section内容:

    cfg = ConfigParser()
    cfg.read('./config.ini', encoding='utf-8')
    cfg.set("SOURCE_ES", "timeout", "3600")
    cfg.write(open('./config.ini', "r+", encoding='utf-8'))
1
2
3
4

新增Section内容:

    cfg = ConfigParser()
    cfg.add_section("TARGET_ES")
    cfg.set("TARGET_ES", "host", "222.222.222.222")
    cfg.set("TARGET_ES", "port", "9201")
    # cfg.write(open('./config.ini', "w"))  # 删除原文件重新写入
    cfg.write(open('./config.ini', "a"))  # 追加模式写入
1
2
3
4
5
6

删除Section内容:

    cfg = ConfigParser()
    cfg.read('./config.ini', encoding='utf-8')
    # cfg.remove_option('TARGET_ES', "host")  # 删除Section下的某项
    cfg.remove_section('TARGET_ES')           # 删除整个Section模块
    cfg.write(open('./config.ini', "w"))
1
2
3
4
5

# 3.3.3 读写txt文件

# -*- coding: utf-8 -*-

# 按行追加写入txt文件(没有文件会新创建文件)
def write_content_to_txt(txt_path, content):
    a = open(txt_path, 'a')
    a.write(content + '\n')
    a.close()

# 按行读取txt文件的内容,保存成列表
def read_txt_to_list(txt_path):
    result = []
    with open(txt_path, 'r') as f:
        for line in f:
            result.append(line.strip('\n'))
    return result

if __name__ == '__main__':
    txt_path = './test.txt'
    write_content_to_txt(txt_path, 'zhangsan')
    write_content_to_txt(txt_path, 'lisi')
    result = read_txt_to_list(txt_path)
    print(result)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22

注:还有个列表推导式的写法

with open('test.txt', 'r',encoding='utf8') as f:
	for i in f:
		data.append([j for j in i.split()])

>>> 输出结果
[['field_1','field_2'],['1','2'],['11','22']]
1
2
3
4
5
6

# 3.3.4 生成xml文件

generate_xml.py

# -*- coding: utf-8 -*-

from xml.etree.ElementTree import Element
from xml.etree.ElementTree import SubElement
from xml.etree.ElementTree import ElementTree

# 美化xml:elemnt为传进来的Elment类,参数indent用于缩进,newline用于换行
def pretty_xml(element, indent, newline, level=0):
    # 判断element是否有子元素
    if element:
        # 如果element的text没有内容
        if element.text == None or element.text.isspace():
            element.text = newline + indent * (level + 1)
        else:
            element.text = newline + indent * (level + 1) + element.text.strip() + newline + indent * (level + 1)
    temp = list(element)  # 将elemnt转成list
    for subelement in temp:
        # 如果不是list的最后一个元素,说明下一个行是同级别元素的起始,缩进应一致
        if temp.index(subelement) < (len(temp) - 1):
            subelement.tail = newline + indent * (level + 1)
        else:  # 如果是list的最后一个元素, 说明下一行是母元素的结束,缩进应该少一个
            subelement.tail = newline + indent * level
        # 对子元素进行递归操作
        pretty_xml(subelement, indent, newline, level=level + 1)


if __name__ == '__main__':
    
    # generate root node
    root = Element('root')
    # generate first child-node head
    head = SubElement(root, 'head')
    # child-node of head node
    title = SubElement(head, 'title')
    title.text = "Title"
    # generate second child-node body
    body = SubElement(root, 'body')
    body.text = "Content"
    tree = ElementTree(root)

    root = tree.getroot()  # 得到根元素,Element类
    pretty_xml(root, '\t', '\n')  # 执行美化方法

    # write out xml data
    tree.write('result.xml', encoding = 'utf-8')
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45

生成效果:

<root>
	<head>
		<title>Title</title>
	</head>
	<body>Content</body>
</root>
1
2
3
4
5
6

# 3.3.5 解析yaml格式文件

将yaml文件转字典

import yaml

f = open('./config.yaml', 'r')
yaml_str = f.read()
config_dict = yaml.load(yaml_str, Loader=yaml.FullLoader)
1
2
3
4
5

将字典转成对象

class Dict(dict):
    __setattr__ = dict.__setitem__
    __getattr__ = dict.__getitem__

def dict2obj(dictObj):
    if not isinstance(dictObj, dict):
        return dictObj
    d = Dict()
    for k, v in dictObj.items():
        d[k] = dict2obj(v)
    return d
    
# 测试数据    
params = {
    "name": "login",
    "params": {
        "transactionId": "cc258bdb3dd4d6bba2",
        "platformType": "第三方平台",
        "uid": 9
    }
}

# 转换字典成为对象,可以用"."方式访问对象属性
res = dict2obj(params)
print(res.name)
print(res.params.uid)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26

# 3.4 列表数组字符串的处理

# 3.4.1 列表元素去重及统计出现次数

两个列表求差集并去重

# 两个列表求差集,在B中但不在A中(注:重复元素也会被去除)
def list_diff(listA, listB):
    result = list(set(listB).difference(set(listA)))
    return result
1
2
3
4

列表元素直接去重

# 列表元素直接去重
old_list = [2, 1, 3, 4, 1]
new_list = list(set(old_list))
print(new_list)

>>> [1,2,3,4]
1
2
3
4
5
6

统计列表中各个元素出现的次数

from collections import Counter

# 统计列表中各个元素出现的次数
test_list = [1, 2, 3, 1, 1, 2]
result = Counter(test_list)
print(result)

>>>{1: 3, 2: 2, 3: 1}
1
2
3
4
5
6
7
8

# 3.4.2 逗号分隔的字符串与列表互转

逗号分隔字符串转列表

>>> mStr = '192.168.1.1,192.168.1.2,192.168.1.3'
>>> mStr.split(",")
['192.168.1.1', '192.168.1.2', '192.168.1.3']
1
2
3

列表转逗号分隔字符串

result = ",".join(str(i) for i in result_list)
1

# 3.4.3 比较数组是否完全相等

import numpy as np

a = np.array([1,2,3])
b = np.array([1,2,3])
print((a==b).all())

>>> True
1
2
3
4
5
6
7

# 3.4.4 实现replaceAll功能

# 实现replaceAll的功能
def replaceAll(input, toReplace, replaceWith):
    while (input.find(toReplace) > -1):
        input = input.replace(toReplace, replaceWith)
    return input
1
2
3
4
5

处理空白字符:

# 处理空白字符
text = replaceAll(replaceAll(replaceAll(replaceAll(text, '\r', ' '), '\n', ' '), '\u3000', ' '), '\x01', ' ')
1
2

# 3.4.5 将List拆分成多个指定长度的小List

#  将List拆分成若干个指定长度的小List
def list_of_groups(list, length):
    return [list[i:i + length] for i in range(0, len(list), length)]
    
list = [i for i in range(15)]
length = 2
result = list_of_groups(list, length)
print(result)

>>> [[0, 1], [2, 3], [4, 5], [6, 7], [8, 9], [10, 11], [12, 13], [14]]
1
2
3
4
5
6
7
8
9
10

# 3.4.6 按指定长度分段切割字符串或列表

# 按指定长度分段切割字符串或列表
def cut(obj, sec):
    return [obj[i:i+sec] for i in range(0,len(obj),sec)]
1
2
3

# 3.4.7 字符串四舍五入保留两位小数

from decimal import Decimal

# 字符串四舍五入保留两位小数
def str_get_two_decimal(str):
    return Decimal(str).quantize(Decimal('0.00'))
1
2
3
4
5

# 3.4.8 将两个相同长度的List转字典

keys = ['a', 'b', 'c']
values = [1, 2, 3]
dictionary = dict(zip(keys, values))
print(dictionary)

>>> {'a': 1, 'c': 3, 'b': 2}
1
2
3
4
5
6

# 3.4.9 检查字符串里的中文字符

# 检验是否全是中文字符
def is_all_chinese(strs):
    for _char in strs:
        if not '\u4e00' <= _char <= '\u9fa5':
            return False
    return True

# 检验是否含有中文字符
def is_contains_chinese(strs):
    for _char in strs:
        if '\u4e00' <= _char <= '\u9fa5':
            return True
    return False
1
2
3
4
5
6
7
8
9
10
11
12
13

# 3.4.10 浏览器URL编码以及反编码

import urllib.parse

test_str = '测试 文本'
# 首先模仿浏览器生产的编码格式,不管是中文或者空格都会转码
str_encode = urllib.parse.quote(test_str)
print(str_encode)
# 使用 unquote 进行反编码,这个步骤基本都是服务端接受浏览器传递的数据时候处理
str_decode = urllib.parse.unquote(str_encode)
print(str_decode)
1
2
3
4
5
6
7
8
9

# 3.4.11 去除列表的最后一个元素

pop方法和del方法如果对空列表进行操作,会报错中断执行,切片方法不会因此报错,继续保持空列表向下运行

1)pop方法

list = [1,2,3,4]
list.pop()
print(list)

>>> [1, 2, 3]
1
2
3
4
5

2)del方法

list = [1,2,3,4]
del(list[-1])
print(list)

>>> [1, 2, 3]
1
2
3
4
5

3)切片

list = [1,2,3,4]
list = list[0:-1]
print(list)

>>> [1, 2, 3]
1
2
3
4
5

# 3.4.12 查找字符串里所有子串位置

# 查找所有子串位置
def find_all(sub_str, str):
    index_list = []
    if str is not None and str != "" and sub_str is not None and sub_str != "":
        index = str.find(sub_str)
        while index != -1:
            index_list.append(index)
            index = str.find(sub_str, index + 1)
    return index_list
1
2
3
4
5
6
7
8
9

# 3.4.13 对列表里字典根据key进行排序

# 对列表的字典根据某一个key进行排序
def sort_list_dict_by_key(input_list, key):
    sorted_list = sorted(input_list, key=lambda dict: int(dict[key]))
    sorted_list.reverse()
    return sorted_list


if __name__ == '__main__':
    input_list = [
        {'follower_num': '111', 'user_name': 'zhangsan'},
        {'follower_num': '1', 'user_name': 'lisi'},
        {'follower_num': '11', 'user_name': 'wanger'}
    ]
    sorted_list = sort_list_dict_by_key(input_list, 'follower_num')
    for i in sorted_list:
        print(i)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16

# 3.4.14 从列表中随机取一个值

import random

list = ['佛山', '南宁', '北海', '杭州', '南昌', '厦门', '温州']
a = random.choice(list)
print(a)
1
2
3
4
5

# 3.4.15 复制列表

# -*- coding: utf-8 -*-

list1 = ['Google', 'Taobao', 'Baidu']
list2 = list1.copy()
print(list1)
print(list2)
list1.append('Github')
print(list1)
print(list2)

输出结果
['Google', 'Taobao', 'Baidu']
['Google', 'Taobao', 'Baidu']
['Google', 'Taobao', 'Baidu', 'Github']
['Google', 'Taobao', 'Baidu']
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15

注:复制列表不能list2=list1,这样是个引用地址,list1发生改变时,list2也会随之更改。

# 3.5 字符编码问题

# 3.5.1 base64编码出现b的问题

去除的方法:[1] decode为utf-8编码、[2] str转化为utf-8编码

# -*- coding: utf-8 -*-

import base64

# 生成测试数据
before_base64 = 'abc'.encode()
after_base64 = base64.b64encode(before_base64)
print(after_base64)

# 方式一:decode为utf-8编码
method_one_base64 = after_base64.decode('utf-8')
print(method_one_base64)

# 方式二:str转化为utf-8编码
method_two_base64 = str(after_base64, 'utf-8')
print(method_two_base64)

>>> b'YWJj'
>>> YWJj
>>> YWJj
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20

# 3.5.2 字典中文转码问题

尝试过.encode().decode('unicode_escape')# -*- coding: utf-8 -*-str()等方式仍然不行,最后将字典改成json格式,使用.get("key")方式取值解决了问题。

# -*- coding: utf-8 -*-

import json

d = {'s': '测试', 'd': u'\u4ea4\u6362\u673a'}
d1 = json.dumps(d1)
print(d1)
d2 = json.loads(j)
print(d2)

>>> {"s": "\u6d4b\u8bd5", "d": "\u4ea4\u6362\u673a"}
>>> {'s': '测试', 'd': '交换机'}
1
2
3
4
5
6
7
8
9
10
11
12

# 3.5.3 系统字符编码问题

情景描述:在本地可以运行,但程序放到某个docker容器内,执行就出现字符编码问题。

报错信息:UnicodeEncodeError: 'ascii' codec can't encode characters in position 21-27: ordinal not in range(128)

解决方案:执行程序前先执行一下如下命令即可

$ export LC_ALL="en_US.utf8"
1

# 3.5.4 unicode编码转换

# 中文转unicode
s1 = u"你好"
print(s1.encode("unicode_escape"))

# unicode转中文
s2 = r'\u4f60\u597d'
print(s2.encode().decode("unicode_escape"))
1
2
3
4
5
6
7

可以写一个简单的脚本,用于将文件中的unicode转中文

# -*- coding: utf-8 -*-

import os


# 按行读取file文件的内容,保存成列表
def read_file_to_list(file_path):
    result = []
    with open(file_path, 'r') as f:
        for line in f:
            result.append(line.strip('\n'))
    return result


# 按行追加写入file文件(没有文件会新创建文件)
def write_content_to_file(file_path, content):
    a = open(file_path, 'a')
    a.write(content + '\n')
    a.close()


# 将文件里的unicode编码转换成中文
def unicode2chinese(file_path):
    file_dir, file_full_name = os.path.split(file_path)
    out_file_path = file_dir + "/convert_" + file_full_name
    content_list = read_file_to_list(file_path)
    for content in content_list:
        write_content_to_file(out_file_path,content.encode().decode("unicode_escape"))


if __name__ == '__main__':
    file_path = '/root/main.py'
    unicode2chinese(file_path)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33

# 3.6 加密解密算法

依赖安装:

$ pip install pycryptodome
1

存在的坑:在使用的时候导入模块是有问题的,这时只要修改一下文件夹名称就可以解决这个问题,找到这个路径安装位置\Lib\site-packages,下面有一个文件夹叫做crypto,将小写c改成大写C就可以了。

# 3.6.1 RSA加密解密

RSA加密算法是一种非对称加密算法,所谓非对称,就是指该算法加密和解密使用不同的密钥,即使用加密密钥进行加密、解密密钥进行解密。在RSA算法中,加密密钥PK是公开信息,而解密密钥SK是需要保密的。加密算法E和解密算法D也都是公开的。虽然解密密钥SK是由公开密钥PK决定的,由于无法计算出大数n的欧拉函数phi(N),所以不能根据PK计算出SK。

# -*- coding: utf-8 -*-

from Crypto.Cipher import PKCS1_OAEP, AES
from Crypto.PublicKey import RSA
from Crypto.Random import get_random_bytes


def create_rsa_keys(code):
    """
    生成RSA私钥和公钥
    :param code: 密码
    :return:
    """
    # 生成 2048 位的 RSA 密钥
    key = RSA.generate(2048)
    encrypted_key = key.exportKey(passphrase=code, pkcs=8, protection="scryptAndAES128-CBC")
    # 生成私钥
    with open('private_rsa_key.bin', 'wb') as f:
        f.write(encrypted_key)
    # 生成公钥
    with open('rsa_public.pem', 'wb') as f:
        f.write(key.publickey().exportKey())


def file_encryption(file_name, public_key):
    """
    文件加密
    :param file_name: 文件路径名
    :param public_key: 公钥
    :return:
    """
    # 二进制只读打开文件,读取文件数据
    with open(file_name, 'rb') as f:
        data = f.read()
    file_name_new = file_name + '.rsa'
    with open(file_name_new, 'wb') as out_file:
        # 收件人秘钥 - 公钥
        recipient_key = RSA.import_key(open(public_key).read())
        # 一个 16 字节的会话密钥
        session_key = get_random_bytes(16)
        # Encrypt the session key with the public RSA key
        cipher_rsa = PKCS1_OAEP.new(recipient_key)
        out_file.write(cipher_rsa.encrypt(session_key))
        # Encrypt the data with the AES session key
        cipher_aes = AES.new(session_key, AES.MODE_EAX)
        cipher_text, tag = cipher_aes.encrypt_and_digest(data)
        out_file.write(cipher_aes.nonce)
        out_file.write(tag)
        out_file.write(cipher_text)
    return file_name_new


def file_decryption(file_name, code, private_key):
    """
    文件解密
    :param file_name: 文件路径名
    :param code: 密码
    :param private_key: 私钥
    :return:
    """
    with open(file_name, 'rb') as f_in:
        # 导入私钥
        private_key = RSA.import_key(open(private_key).read(), passphrase=code)
        # 会话密钥, 随机数, 消息认证码, 机密的数据
        enc_session_key, nonce, tag, cipher_text = [f_in.read(x) for x in (private_key.size_in_bytes(), 16, 16, -1)]
        cipher_rsa = PKCS1_OAEP.new(private_key)
        session_key = cipher_rsa.decrypt(enc_session_key)
        cipher_aes = AES.new(session_key, AES.MODE_EAX, nonce)
        # 解密
        data = cipher_aes.decrypt_and_verify(cipher_text, tag)
    # 文件重命名
    out_file_name = file_name.replace('.rsa', '')
    with open(out_file_name, 'wb') as f_out:
        f_out.write(data)
    return out_file_name


if __name__ == '__main__':
    create_rsa_keys("test_rsa_key")
    file_encryption("test.txt", "rsa_public.pem")
    file_decryption("test.txt.rsa", "test_rsa_key", "private_rsa_key.bin")
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81

# 3.6.2 AES加密解密

AES加密为最常见的对称加密算法(微信小程序的加密传输就是用的这个加密算法)。对称加密算法也就是加密和解密用相同的密钥,具体的加密流程如下图:

AES加密传输流程

# -*- coding: utf-8 -*-

import base64
from Crypto.Cipher import AES

'''
采用AES对称加密算法
'''

# str不是16的倍数那就补足为16的倍数
def add_to_16(value):
    while len(value) % 16 != 0:
        value += '\0'
    return str.encode(value)  

# 加密方法
def encrypt_file(key, input_file_path, encoding, output_file_path):
    # 一次性读取文本内容
    with open(input_file_path, 'r', encoding=encoding) as f:
        # print(text) 测试打印读取的数据
        # 待加密文本
        mystr = f.read()
    text = base64.b64encode(mystr.encode('utf-8')).decode('ascii')
    # 初始化加密器
    aes = AES.new(add_to_16(key), AES.MODE_ECB)
    # 先进行aes加密
    encrypt_aes = aes.encrypt(add_to_16(text))
    # 用base64转成字符串形式
    encrypted_text = str(base64.encodebytes(encrypt_aes), encoding='utf-8')  # 执行加密并转码返回bytes
    # print(encrypted_text) 测试打印加密数据
    # 写入加密数据到文件
    with open(output_file_path, "w") as bankdata:
        bankdata.write(encrypted_text)

# 解密方法
def decrypt_file(key, file_path, encoding):
    # 密文
    with open(file_path, 'r', encoding=encoding) as f:
        # print(text) 测试打印读取的加密数据
        # 待解密文本
        text = f.read()
    # 初始化加密器
    aes = AES.new(add_to_16(key), AES.MODE_ECB)
    # 优先逆向解密base64成bytes
    base64_decrypted = base64.decodebytes(text.encode(encoding='utf-8'))
    # bytes解密
    decrypted_text = str(aes.decrypt(base64_decrypted),encoding='utf-8')
    decrypted_text = base64.b64decode(decrypted_text.encode('utf-8')).decode('utf-8')
    print(decrypted_text)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49

# 4. Python文件处理及实用功能

# 4.1 根据IP或域名获取地理位置信息

# 4.1.1 获取本机IP地址

def get_host_ip():
    """
    查询本机ip地址
    :return: ip
    """
    try:
        s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
        s.connect(('8.8.8.8', 80))
        ip = s.getsockname()[0]
    finally:
        s.close()
    return ip
1
2
3
4
5
6
7
8
9
10
11
12

# 4.1.2 从日志中正则提取IP地址

# -*- coding: utf-8 -*-

import re


# 查找匹配正则表达式的字符串
def find_by_regex(string, regex):
    match_list = re.findall(regex, string)
    return match_list


# 按行读取文件的内容,保存成列表
def read_file_to_list(file_path):
    result = []
    with open(file_path, 'r', encoding='gbk') as f:
        for line in f:
            result.append(line.strip('\n'))
    return result


# 按行追加写入txt文件(没有文件会新创建文件)
def write_content_to_txt(txt_path, content):
    a = open(txt_path, 'a')
    a.write(content + '\n')
    a.close()


if __name__ == '__main__':
    ipv4_regex = "(?:(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.){3}(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)"
    content_list = read_file_to_list("./test.log")
    # 遍历日志文件,正则匹配IP地址进行提取
    ip_match_list = []
    for content in content_list:
        match_list = find_by_regex(content, ipv4_regex)
        if match_list != None and len(match_list) > 0:
            for match_str in match_list:
                if match_str not in ip_match_list:
                    ip_match_list.append(match_str)
                    print(match_str)
    # 将结果写入到文件中
    output_path = './ipv4_address.txt'
    for ip_match in ip_match_list:
        write_content_to_txt(output_path, ip_match)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43

# 4.1.3 获取地理位置信息

可以借助GeoIP2-python和GeoLite.mmdb两个开源项目来获取。

GeoIP2-python:https://github.com/maxmind/GeoIP2-python (opens new window)(GeoIP2 web 服务客户端和数据库阅读器的 Python 代码)

GeoLite.mmdb:https://github.com/P3TERX/GeoLite.mmdb (opens new window)(MaxMind 的 GeoIP2 GeoLite2 国家、城市和 ASN 数据库)

依赖安装:

$ pip install geoip2
$ 把GeoLite2-City.mmdb下载下来,放到项目目录里
1
2

示例代码:

# -*- coding: utf-8 -*-

import socket
import geoip2.database

reader = geoip2.database.Reader('GeoLite2-City.mmdb')


# 通过域名获取IP(输入为IP的话保持不变)
def get_ip_by_domain(domain):
    address = socket.getaddrinfo(domain, None)
    return address[0][4][0]


# 查询IP地址对应的地理信息
def ip_get_location(ip):
    # 载入指定IP相关数据
    response = reader.city(ip)
    # 读取国家代码
    country_iso_code = str(response.country.iso_code)
    # 读取国家名称
    country_name = str(response.country.name)
    # 读取国家名称(中文显示)
    country_name_cn = str(response.country.names['zh-CN'])
    # 读取州(国外)/省(国内)名称
    country_specific_name = str(response.subdivisions.most_specific.name)
    # 读取州(国外)/省(国内)代码
    country_specific_iso_code = str(response.subdivisions.most_specific.iso_code)
    # 读取城市名称
    city_name = str(response.city.name)
    # 获取纬度
    location_latitude = str(response.location.latitude)
    # 获取经度
    location_longitude = str(response.location.longitude)
    # 返回结果
    result_dic = {}
    result_dic['ip'] = ip
    result_dic['country_iso_code'] = country_iso_code
    result_dic['country_name'] = country_name
    result_dic['country_name_cn'] = country_name_cn
    result_dic['country_specific_name'] = country_specific_name
    result_dic['country_specific_iso_code'] = country_specific_iso_code
    result_dic['city_name'] = city_name
    result_dic['location_latitude'] = location_latitude
    result_dic['location_longitude'] = location_longitude
    return result_dic
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46

# 4.1.4 获取手机号归属地、区号、运营商

依赖安装:

$ pip install phone 
1

示例代码:

# -*- coding: utf-8 -*-

from phone import Phone

if __name__ == "__main__":
    phoneNum = 'your_phone_number(11)'
    info = Phone().find(phoneNum)
    print(info)
    try:
        phone = info['phone']
        province = info['province']
        city = info['city']
        zip_code = info['zip_code']
        area_code = info['area_code']
        phone_type = info['phone_type']
    except:
        print('none')
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17

# 4.2 二维码相关操作

# 4.2.1 二维码基本结构

二维码结构整体划分为功能图形和编码区两大部分,功能图形又细分为:空白区、位置探测图形、位置探测图形分隔符、定位图形、校正图形,而编码区细分为:格式信息、版本信息、数据和纠错码字,每一部分的功能如下:

  • 空白区:留白,不需要做任何处理
  • 位置探测图形:协助扫描软件定位二维码码
  • 位置探测图形分隔符:区分功能图形和编码区
  • 定位图形:指示标识密度和确定坐标系
  • 校正图形:校正图形的数量和位置
  • 格式信息:存放格式化数据的信息
  • 版本信息:二维码的规格,二维码符号共有 40 种规格的矩阵
  • 数据和纠错码字:实际保存的二维码信息和纠错码字(用于修正二维码损坏带来的错误)

二维码结构

# 4.2.2 二维码生成解析与美化

引入myqr与zxing库

$ pip install myqr
$ pip install zxing
1
2

二维码生成解析与美化的示例:

# -*- coding: utf-8 -*-

from MyQR import myqr
import zxing

# 生成普通二维码
myqr.run(words='https://www.imiao.top/', version=1, save_name='myqr.png')

# 生成图片背景二维码(colorized:False 为黑白,True 为彩色)
myqr.run(words='https://www.imiao.top/', version=1, picture='bg.png',  colorized=True, save_name='pic_myqr.png')

# 生成动图二维码
myqr.run(words='https://www.imiao.top/', version=1, picture='bg.gif', colorized=True, save_name='gif_myqr.gif')

# 识别二维码
reader = zxing.BarCodeReader()
barcode = reader.decode('gif_myqr.gif')
print(barcode.parsed)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18

# 4.3 写入与读取docx文档

依赖引入:使用 python-docx (opens new window) 库写入与读取docx文档

$ pip install python-docx
1

# 4.3.1 写入docx文档

write_docx.py

# -*- coding: utf-8 -*-

from docx import Document
from docx.shared import Pt
from docx.shared import Inches
from docx.oxml.ns import qn

# 打开文档
document = Document()
# 加入不同等级的标题
document.add_heading(u'MS WORD写入测试', 0)
document.add_heading(u'一级标题', 1)
document.add_heading(u'二级标题', 2)
# 添加文本
paragraph = document.add_paragraph(u'我们在做文本测试!')
# 设置字号
run = paragraph.add_run(u'设置字号、')
run.font.size = Pt(24)
# 设置英文字体
run = paragraph.add_run('Set Font,')
run.font.name = 'Consolas'
# 设置中文字体
run = paragraph.add_run(u'设置中文字体、')
run.font.name = u'宋体'
r = run._element
r.rPr.rFonts.set(qn('w:eastAsia'), u'宋体')
# 设置斜体
run = paragraph.add_run(u'斜体、')
run.italic = True
# 设置粗体
run = paragraph.add_run(u'粗体').bold = True
# 增加引用
document.add_paragraph('Intense quote', style='Intense Quote')
# 增加无序列表
document.add_paragraph(
    u'无序列表元素1', style='List Bullet'
)
document.add_paragraph(
    u'无序列表元素2', style='List Bullet'
)
# 增加有序列表
document.add_paragraph(
    u'有序列表元素1', style='List Number'
)
document.add_paragraph(
    u'有序列表元素2', style='List Number'
)
# 增加图像
document.add_picture('img.jpg', width=Inches(5))
# 增加表格
table = document.add_table(rows=1, cols=3)
hdr_cells = table.rows[0].cells
hdr_cells[0].text = 'Name'
hdr_cells[1].text = 'Id'
hdr_cells[2].text = 'Desc'
# 再增加3行表格元素
for i in range(3):
    row_cells = table.add_row().cells
    row_cells[0].text = 'test'+str(i)
    row_cells[1].text = str(i)
    row_cells[2].text = 'desc'+str(i)
# 增加分页
document.add_page_break()
# 保存文件
document.save(u'测试.docx')
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65

写入效果:

pyhton-docx写入docx文档

# 4.3.2 读取docx文档

read_docx.py

# -*- coding: utf-8 -*-

import docx

# 获取文档对象
file = docx.Document("测试.docx")
print("段落数:"+str(len(file.paragraphs)))

# 输出段落编号及段落内容
for i in range(len(file.paragraphs)):
    print("第"+str(i)+"段的内容是:"+file.paragraphs[i].text)
1
2
3
4
5
6
7
8
9
10
11

读取效果:

段落数:11
第0段的内容是:MS WORD写入测试
第1段的内容是:一级标题
第2段的内容是:二级标题
第3段的内容是:我们在做文本测试!设置字号、Set Font,设置中文字体、斜体、粗体
第4段的内容是:Intense quote
第5段的内容是:无序列表元素1
第6段的内容是:无序列表元素2
第7段的内容是:有序列表元素1
第8段的内容是:有序列表元素2
第9段的内容是:
第10段的内容是:
1
2
3
4
5
6
7
8
9
10
11
12

# 4.4 图片文件处理

# 4.4.1 压缩图片大小

以下是python+opncv实现图片压缩的示例代码:

# -*- coding: utf-8 -*-

import os
import cv2

"""
# Features:使用 opencv 实现图片压缩,compress_config为图片压缩配置,说明如下:
# [cv2.IMWRITE_PNG_COMPRESSION, 9]  无损压缩(取值范围:0~9,数值越小,压缩比越低)
# [cv2.IMWRITE_JPEG_QUALITY, 80]  有损压缩(取值范围:0~100,数值越小,压缩比越高,图片质量损失越严重)
"""
class Compress_img:

    def __init__(self, img_path, compress_config):
        self.img_path = img_path
        self.img_name = img_path.split('/')[-1]
        self.compress_config = compress_config

    def compress_img_CV(self, show=False):
        old_fsize = os.path.getsize(self.img_path)
        # 读取并压缩图片
        img_resize = cv2.imread(self.img_path)
        cv2.imwrite(self.img_path, img_resize, self.compress_config)
        new_fsize = os.path.getsize(self.img_path)
        # 计算压缩率
        compress_rate = str(round(new_fsize / old_fsize * 100, 2)) + "%"
        print("%s 图片已压缩," % (self.img_path), "压缩率为:", compress_rate)
        # 查看压缩后的图片
        if show:
            cv2.imshow(self.img_name, img_resize)
            cv2.waitKey(0)

if __name__ == '__main__':

    img_path = './test.jpg'
    compress_para = [cv2.IMWRITE_PNG_COMPRESSION, 9]
    compress = Compress_img(img_path, compress_para)
    compress.compress_img_CV()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37

注:OpenCV无法读取中文路径文件,请使用全英文路径。

# 4.4.2 图片添加盲水印

如果你想保护自己的原创图片,那最好的方式就是为图片添加盲水印,盲水印就是图片有水印但人眼看不出来,需要通过程序才能提取水印,相当于隐形“盖章”,可以用在数据泄露溯源、版权保护等场景。下面使用阿里巴巴安全团队出品的 blind_watermark (opens new window) 库对图片添加盲水印。

[1] 添加文本水印

from blind_watermark import WaterMark

# 设置密码,默认是 1
bwm1 = WaterMark(password_img=1, password_wm=1)
# 读取原始图片
bwm1.read_img('input/001.jpg')
# 定义水印文本
wm = '@imiao.top'
# 合并文本并输出新的图片
bwm1.read_wm(wm, mode='str')
bwm1.embed('output/001.jpg')
# 输出结果
len_wm = len(bwm1.wm_bit)
print('Put down the length of wm_bit {len_wm}'.format(len_wm=len_wm))
1
2
3
4
5
6
7
8
9
10
11
12
13
14

[2] 提取文本水印

bwm1 = WaterMark(password_img=1, password_wm=1)
wm_extract = bwm1.extract('output/001.jpg', wm_shape=len_wm, mode='str')
print(wm_extract)
1
2
3

注:该库还支持添加和提取图片形式的盲水印,而能添加多大的盲水印图片取决于原始图片,不可超过其大小,不便于批量处理,在此就不放示例了。

# 4.4.3 获取图片缩略图

# -*- coding: utf-8 -*-

from PIL import Image


def get_thumbnail_pic(input_img_path, output_img_path):
    im = Image.open(input_img_path)
    im.thumbnail((80, 80))
    print(im.format, im.size, im.mode)
    im.save(output_img_path, 'JPEG')


if __name__=='__main__':
    input_img_path = './input/001.jpg'
    output_img_path = './output/001.jpeg'
    get_thumbnail_pic(input_img_path, output_img_path)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16

# 4.4.4 使用cv2库绘制图形

引入cv2库

$ pip install opencv-python
1

绘制矩形和直线示例:

# -*- coding: utf-8 -*-

import numpy as np
import cv2 as cv

img = np.zeros((320, 320, 3), np.uint8)  # 生成一个空灰度图像

# 绘制矩形
ptLeftTop = (60, 60)
ptRightBottom = (260, 260)
point_color = (0, 255, 0)
thickness = 1
lineType = 4
cv.rectangle(img, ptLeftTop, ptRightBottom, point_color, thickness, lineType)

# 绘制直线
ptStart = (60, 60)
ptEnd = (260, 260)
point_color = (0, 0, 255)
thickness = 1
lineType = 4
cv.line(img, ptStart, ptEnd, point_color, thickness, lineType)

cv.namedWindow("CV Test")
cv.imshow('CV Test', img) # 显示绘图
cv.waitKey(5000)  # 显示5000ms后消失,设置为0永不消失
cv.destroyAllWindows()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27

# 4.4.5 将多张图片合并成一张图片

# -*- coding: utf-8 -*-

import os
import PIL.Image as Image


def resize_by_width(infile, image_size):
    """按照宽度进行所需比例缩放"""
    im = Image.open(infile)
    (x, y) = im.size
    lv = round(x / image_size, 2) + 0.01
    x_s = int(x // lv)
    y_s = int(y // lv)
    print("x_s", x_s, y_s)
    out = im.resize((x_s, y_s))
    return out


def get_new_img_xy(infile, image_size):
    """返回一个图片的宽、高像素"""
    im = Image.open(infile)
    (x, y) = im.size
    lv = round(x / image_size, 2) + 0.01
    x_s = x // lv
    y_s = y // lv
    return x_s, y_s


# 定义图像拼接函数
def image_compose(image_colnum, image_size, image_rownum, image_names, image_save_path, x_new, y_new):
    to_image = Image.new('RGB', (image_colnum * x_new, image_rownum * y_new))  # 创建一个新图
    # 循环遍历,把每张图片按顺序粘贴到对应位置上
    total_num = 0
    for y in range(1, image_rownum + 1):
        for x in range(1, image_colnum + 1):
            from_image = resize_by_width(image_names[image_colnum * (y - 1) + x - 1], image_size)
            to_image.paste(from_image, ((x - 1) * x_new, (y - 1) * y_new))
            total_num += 1
            if total_num == len(image_names):
                break
    return to_image.save(image_save_path)  # 保存新图


def get_image_list_fullpath(dir_path):
    file_name_list = os.listdir(dir_path)
    image_fullpath_list = []
    for file_name_one in file_name_list:
        file_one_path = os.path.join(dir_path, file_name_one)
        if os.path.isfile(file_one_path):
            image_fullpath_list.append(file_one_path)
        else:
            img_path_list = get_image_list_fullpath(file_one_path)
            image_fullpath_list.extend(img_path_list)
    return image_fullpath_list


def merge_images(image_dir_path, image_size, image_colnum):
    # 获取图片集地址下的所有图片名称
    image_fullpath_list = get_image_list_fullpath(image_dir_path)
    print("image_fullpath_list", len(image_fullpath_list), image_fullpath_list)
    image_save_path = r'{}.jpg'.format(image_dir_path)  # 图片转换后的地址
    image_rownum_yu = len(image_fullpath_list) % image_colnum
    if image_rownum_yu == 0:
        image_rownum = len(image_fullpath_list) // image_colnum
    else:
        image_rownum = len(image_fullpath_list) // image_colnum + 1

    x_list = []
    y_list = []
    for img_file in image_fullpath_list:
        img_x, img_y = get_new_img_xy(img_file, image_size)
        x_list.append(img_x)
        y_list.append(img_y)

    print("x_list", sorted(x_list))
    print("y_list", sorted(y_list))
    x_new = int(x_list[len(x_list) // 5 * 4])
    y_new = int(y_list[len(y_list) // 5 * 4])
    print(" x_new, y_new", x_new, y_new)
    image_compose(image_colnum, image_size, image_rownum, image_fullpath_list, image_save_path, x_new, y_new)  # 调用函数


if __name__ == '__main__':
    image_dir_path = r'/root/images'  # 图片集地址
    image_size = 128  # 每张小图片的大小
    image_colnum = 10  # 合并成一张图后,一行有几个小图
    merge_images(image_dir_path, image_size, image_colnum)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87

# 4.4.6 读取图片元数据

# -*- coding: utf-8 -*-

from PIL import Image
from PIL.ExifTags import TAGS
from prettytable import PrettyTable

image_filename = "test.jpg"

# initialiting prettytable object
table = PrettyTable()

# setting table feilds name
table.field_names = ["MetaTags", "Values"]

# load image
my_image = Image.open(image_filename)

# get EXIF standared Data of the image
img_exif_data = my_image.getexif()
for id in img_exif_data:
    tag_name = TAGS.get(id, id)
    data = img_exif_data.get(id)
    # if data in bytes
    if isinstance(data, bytes):
        data = data.decode()
    table.add_row([tag_name, data])

print(table)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28

返回结果(有多少MetaTags就显示多少)

+----------------+---------------------+
|    MetaTags    |        Values       |
+----------------+---------------------+
| ResolutionUnit |          2          |
|   ExifOffset   |         110         |
|    DateTime    | 2022:10:20 17:16:30 |
|  XResolution   |         72.0        |
|  YResolution   |         72.0        |
+----------------+---------------------+
1
2
3
4
5
6
7
8
9

# 4.4.7 调整图片的透明度

# -*- coding: UTF-8 -*-

from PIL import Image


# 调整图片的透明度,用于制作背景
def addTransparency(img, factor):
    img = img.convert('RGBA')
    img_blender = Image.new('RGBA', img.size, (0, 0, 0, 0))
    img = Image.blend(img_blender, img, factor)
    return img


if __name__ == '__main__':
    img = Image.open("original_background.jpeg")
    img = addTransparency(img, factor=0.7)
    img.save("background.png")
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17

# 4.4.8 修改图片分辨率大小

# -*- coding: utf-8 -*-

from PIL import Image


def transfer(input_file, output_file):
    im = Image.open(input_file)
    reim=im.resize((16, 16))  # 宽*高
    reim.save(output_file, dpi=(200.0, 200.0))  # 200.0,200.0分别为想要设定的dpi值


if __name__ == '__main__':
    input_file = "logo.ico"
    output_file = "logo_16x16.ico"
    transfer(input_file, output_file)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15

# 4.4.9 绘制网络拓扑图

# -*- coding: utf-8 -*-

import networkx as nx
import matplotlib.pyplot as plt

# 创建一个空的图形对象
G = nx.Graph()

# 添加节点到图形对象
G.add_node("Router")
G.add_node("Computer1")
G.add_node("Computer2")
G.add_node("Printer")

# 添加边连接节点
G.add_edge("Router", "Computer1")
G.add_edge("Router", "Computer2")
G.add_edge("Router", "Printer")
G.add_edge("Computer1", "Printer")
G.add_edge("Computer2", "Printer")

# 绘制拓扑图
nx.draw(G, with_labels=True)

# 显示拓扑图
plt.savefig('./network.jpg')
plt.show()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27

# 4.4.10 视频关键帧抽取

# -*- coding: utf-8 -*-

import cv2
import operator
import numpy as np
import os
from scipy.signal import argrelextrema

def smooth(x, window_len=13, window='hanning'):
    """使用具有所需大小的窗口使数据平滑。
    This method is based on the convolution of a scaled window with the signal.
    The signal is prepared by introducing reflected copies of the signal
    (with the window size) in both ends so that transient parts are minimized
    in the begining and end part of the output signal.
    该方法是基于一个标度窗口与信号的卷积。
    通过在两端引入信号的反射副本(具有窗口大小)来准备信号,
    使得在输出信号的开始和结束部分中将瞬态部分最小化。
    input:
        x: the input signal输入信号
        window_len: the dimension of the smoothing window平滑窗口的尺寸
        window: the type of window from 'flat', 'hanning', 'hamming', 'bartlett', 'blackman'
            flat window will produce a moving average smoothing.
            平坦的窗口将产生移动平均平滑
    output:
        the smoothed signal平滑信号

    example:
    import numpy as np
    t = np.linspace(-2,2,0.1)
    x = np.sin(t)+np.random.randn(len(t))*0.1
    y = smooth(x)

    see also:
    numpy.hanning, numpy.hamming, numpy.bartlett, numpy.blackman, numpy.convolve
    scipy.signal.lfilter

    TODO: 如果使用数组而不是字符串,则window参数可能是窗口本身
    """
    print(len(x), window_len)
    s = np.r_[2 * x[0] - x[window_len:1:-1],
              x, 2 * x[-1] - x[-1:-window_len:-1]]

    if window == 'flat':  # moving average平移
        w = np.ones(window_len, 'd')
    else:
        w = getattr(np, window)(window_len)
    y = np.convolve(w / w.sum(), s, mode='same')
    return y[window_len - 1:-window_len + 1]


class Frame:
    """class to hold information about each frame
    用于保存有关每个帧的信息
    """

    def __init__(self, id, diff):
        self.id = id
        self.diff = diff

    def __lt__(self, other):
        if self.id == other.id:
            return self.id < other.id
        return self.id < other.id

    def __gt__(self, other):
        return other.__lt__(self)

    def __eq__(self, other):
        return self.id == other.id and self.id == other.id

    def __ne__(self, other):
        return not self.__eq__(other)


def rel_change(a, b):
    x = (b - a) / max(a, b)
    print(x)
    return x


def getEffectiveFrame(videopath, dir):
    # 如果文件目录不存在则创建目录
    if not os.path.exists(dir):
        os.makedirs(dir)
    (filepath, tempfilename) = os.path.split(videopath)  # 分离路径和文件名
    (filename, extension) = os.path.splitext(tempfilename)  # 区分文件的名字和后缀
    # Setting fixed threshold criteria设置固定阈值标准
    USE_THRESH = False
    # fixed threshold value固定阈值
    THRESH = 0.8
    # Setting fixed threshold criteria设置固定阈值标准
    USE_TOP_ORDER = False
    # Setting local maxima criteria设置局部最大值标准
    USE_LOCAL_MAXIMA = True
    # Number of top sorted frames排名最高的帧数
    NUM_TOP_FRAMES = 50
    # smoothing window size平滑窗口大小
    len_window = int(50)
    print("target video :" + videopath)
    print("frame save directory: " + dir)
    # load video and compute diff between frames加载视频并计算帧之间的差异
    cap = cv2.VideoCapture(str(videopath))
    prev_frame = None
    frame_diffs = []
    frames = []
    success, frame = cap.read()
    i = 0
    while (success):
        luv = cv2.cvtColor(frame, cv2.COLOR_BGR2LUV)
        curr_frame = luv
        if curr_frame is not None and prev_frame is not None:
            # logic here
            diff = cv2.absdiff(curr_frame, prev_frame)  # 获取差分图
            diff_sum = np.sum(diff)
            diff_sum_mean = diff_sum / (diff.shape[0] * diff.shape[1])  # 平均帧
            frame_diffs.append(diff_sum_mean)
            frame = Frame(i, diff_sum_mean)
            frames.append(frame)
        prev_frame = curr_frame
        i = i + 1
        success, frame = cap.read()
    cap.release()

    # compute keyframe
    keyframe_id_set = set()
    if USE_TOP_ORDER:
        # sort the list in descending order以降序对列表进行排序
        frames.sort(key=operator.attrgetter("diff"), reverse=True)  # 排序operator.attrgetter
        for keyframe in frames[:NUM_TOP_FRAMES]:
            keyframe_id_set.add(keyframe.id)
    if USE_THRESH:
        print("Using Threshold")  # 使用阈值
        for i in range(1, len(frames)):
            if (rel_change(np.float(frames[i - 1].diff), np.float(frames[i].diff)) >= THRESH):
                keyframe_id_set.add(frames[i].id)
    if USE_LOCAL_MAXIMA:
        print("Using Local Maxima")  # 使用局部极大值
        diff_array = np.array(frame_diffs)
        sm_diff_array = smooth(diff_array, len_window)  # 平滑
        frame_indexes = np.asarray(argrelextrema(sm_diff_array, np.greater))[0]  # 找极值
        for i in frame_indexes:
            keyframe_id_set.add(frames[i - 1].id)  # 记录极值帧数

    # save all keyframes as image将所有关键帧另存为图像
    cap = cv2.VideoCapture(str(videopath))
    success, frame = cap.read()
    idx = 0
    num = 0
    while (success):
        if idx in keyframe_id_set:
            num = num + 1
            name = filename + '_' + str(num) + ".jpg"
            cv2.imwrite(dir + name, frame)
            keyframe_id_set.remove(idx)
        idx = idx + 1
        success, frame = cap.read()
    cap.release()

    
if __name__ == "__main__":
    videopath = './data/demo.mp4'    # Video path of the source file源文件的视频路径
    dir = './data/keyframe/'  # Directory to store the processed frames存储已处理帧的目录
    getEffectiveFrame(videopath, dir)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163

# 4.5 发送Gmail邮件通知

# 4.5.1 Gmail开启IMAP并创建单独应用密码

登录Gmail——点击设置按钮(齿轮样式)——查看所有设置——转发和POP/IMAP——启用IAMP

IMAP设置

登录你的 Google账号管理 (opens new window),选择”安全性“,”登录Google“模块里有个”应用专用密码设置“,点开之后选择应用、选择设备,生成一个16位的单独应用密码。

# 4.5.2 发送普通文本邮件及带附件邮件

发送普通文本邮件

import smtplib
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText

username = "你的gmail邮箱"
password = "你的单独应用密码"
mail_from = "你的gmail邮箱"
mail_to = "用于接收消息的目标邮箱"
mail_subject = "Test Subject"
mail_body = "This is a test message"

mimemsg = MIMEMultipart()
mimemsg['From']=mail_from
mimemsg['To']=mail_to
mimemsg['Subject']=mail_subject
mimemsg.attach(MIMEText(mail_body, 'plain'))
connection = smtplib.SMTP(host='smtp.gmail.com', port=587)
connection.starttls()
connection.login(username,password)
connection.send_message(mimemsg)
connection.quit()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21

发送带附件邮件

import smtplib
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
from email.mime.base import MIMEBase
from email import encoders

username = "你的gmail邮箱"
password = "你的单独应用密码"
mail_from = "你的gmail邮箱"
mail_to = "用于接收消息的目标邮箱"
mail_subject = "Test Subject"
mail_body = "This is a test message"
mail_attachment="./test.txt"
mail_attachment_name="test.txt"

mimemsg = MIMEMultipart()
mimemsg['From']=mail_from
mimemsg['To']=mail_to
mimemsg['Subject']=mail_subject
mimemsg.attach(MIMEText(mail_body, 'plain'))

with open(mail_attachment, "rb") as attachment:
    mimefile = MIMEBase('application', 'octet-stream')
    mimefile.set_payload((attachment).read())
    encoders.encode_base64(mimefile)
    mimefile.add_header('Content-Disposition', "attachment; filename= %s" % mail_attachment_name)
    mimemsg.attach(mimefile)
    connection = smtplib.SMTP(host='smtp.gmail.com', port=587)
    connection.starttls()
    connection.login(username,password)
    connection.send_message(mimemsg)
    connection.quit()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32

# 5. Python常用进阶知识

# 5.1 Python程序性能优化

# 5.1.1 进程、线程、协程对比

进程是资源分配的单位,线程是操作系统调度的单位。进程切换需要的资源很最大,效率很低;线程切换需要的资源一般,效率一般;协程切换任务资源很小,效率高。

  • [1] 进程:一个程序运行起来后,代码及用到的资源称之为进程,它是操作系统分配资源的基本单元。

  • [2] 线程:一条线程指的是进程中一个单一顺序的控制流,一个进程中可以并发多个线程,每条线程并行执行不同的任务。

  • [3] 协程:协程是Python中另外一种实现多任务的方式,只不过比线程更小占用更小执行单元。

以下是一个体现进程、线程和协程区别的通俗举例。

方式 1:单进程多线程

  • 进程(生产线): 有一条生产线,拥有所有需要生产产品的资源和设备。
  • 线程(工人): 在这一条生产线上,多个工人(线程)共享这些资源,同时工作以提高效率。
  • 优点: 资源共享,提高效率。
  • 缺点: 线程之间可能需要同步和协调,否则可能导致资源竞争。

方式 2:多进程多线程

  • 进程(生产线): 有多条生产线,每条生产线都有自己的资源和设备。
  • 线程(工人): 每条生产线上都有多个工人。
  • 优点: 高度并行,资源互不干扰。
  • 缺点: 消耗更多的资源和财力,可能需要更复杂的管理和协调。

方式 3:多进程多线程协程

  • 协程(灵活的工人): 在多进程多线程的基础上,工人(线程)在等待某些条件(如等待某个工序完成)时,可以转去做其他事情。
  • 优点: 充分利用CPU时间,提高效率。
  • 缺点: 需要更精细的调度和管理,否则可能导致逻辑复杂。

总体来说,单进程多线程适合资源有限但需要快速响应的场景。多进程多线程适用于高度并行且资源充足的场景。而协程则适用于I/O密集或者需要高度异步的场景,它能更充分地利用系统资源。根据具体需求和资源限制,可以选择最适合的并发模型。

# 5.1.2 全局解释器锁GIL

Python代码的执行由Python解释器来控制。Python在设计之初就考虑要在主循环中,同时只有一个线程在执行,就像单CPU的系统中运行多个进程那样,内存中可以存放多个程序,但任意时刻,只有一个程序在CPU中运行。同样地,虽然Python解释器可以运行多个线程,只有一个线程在解释器中运行。

对Python虚拟机的访问由全局解释器锁(GIL)来控制,正是这个锁能保证同时只有一个线程在运行。在多线程环境中,Python虚拟机按照以下方式执行。1)设置GIL。2)切换到一个线程去执行。3)运行。4)把线程设置为睡眠状态。5)解锁GIL。6)再次重复以上步骤。

对所有面向I/O的程序来说,GIL会在这个I/O调用之前被释放,以允许其他线程在这个线程等待I/O的时候运行。如果某线程并未使用很多I/O操作,它会在自己的时间片内一直占用处理器和GIL。也就是说,I/O密集型的Python程序比计算密集型的Python程序更能充分利用多线程的好处。

比方我有一个4核的CPU,那么这样一来,在单位时间内每个核只能跑一个线程,然后时间片轮转切换。但是Python不一样,它不管你有几个核,单位时间多个核只能跑一个线程,然后时间片轮转。看起来很不可思议?但是这就是GIL搞的鬼。任何Python线程执行前,必须先获得GIL锁,然后,每执行100条字节码,解释器就自动释放GIL锁,让别的线程有机会执行。这个GIL全局锁实际上把所有线程的执行代码都给上了锁,所以,多线程在Python中只能交替执行,即使100个线程跑在100核CPU上,也只能用到1个核。通常我们用的解释器是官方实现的CPython,要真正利用多核,除非重写一个不带GIL的解释器。

因此,这就解释了为什么有些情况多线程比单线程还慢。处理海量数据的时候,要选择多进程去实现,而不是多线程。

实际情景:我需要处理2400亿条基因数据,只有一台性能非常高的服务器。最初写了个多线程的处理程序,性能为1000w/h,而单线程却能达到6000w/h,当时觉得非常不可思议,为什么多线程比单线程还慢,得知全局解释器锁GIL后,知道了Python的多线程是个假多线程。后来把处理程序改成了多进程后,性能达到了60000w/h。

另注:开发者 Sam Gross 在 2022 Python 语言峰会上带来了一个新提案,完全移除 CPython 解释器的 GIL- 全局解释器锁。现在 Python 团队已经正式接受了删除 GIL 的这个提议,并将其设置为可选模式。删除全局解释器锁 GIL,可以真正的解放多线程性能。

# 5.2 使用Python多线程

# 5.2.1 使用threadpool实现多线程并行

可以使用 threadpool 来实现多线程并行。

# -*- coding: utf-8 -*-

from time import sleep
import time
from decimal import Decimal
import threadpool


def func(dic, c):
    sleep(1)
    dic['count'] += c


if __name__ == "__main__":

    start_time = time.time()
    start_time_str = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())

    d = dict()
    d['count'] = 0
    l = [1, 2, 3]

    pool = threadpool.ThreadPool(3)  # 成立线程池
    params = [([d, x], None) for x in l]  # 拼接 func 的参数
    print(params)
    requests = threadpool.makeRequests(func, params) 
    [pool.putRequest(req) for req in requests]
    pool.wait()
    print(f'dic={d}')

    end_time = time.time()
    end_time_str = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
    time_consuming_str = str(Decimal(str((end_time - start_time) * 1000)).quantize(Decimal('0.00'))) + 'ms'
    print("===处理结束,总共耗时:" + time_consuming_str)
    
>>> 结果输出:
[([{'count': 0}, 1], None), ([{'count': 0}, 2], None), ([{'count': 0}, 3], None)]
dic={'count': 6}
===处理结束,总共耗时:1005.56ms
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39

代码说明:

  • makeRequests():函数原型为makeRequests(callable_, args_list, callback=None, exc_callback=_handle_thread_exception),可以很方便地创建多个工作请求(调用函数相同、参数不同)。
  • putRequest():函数原型为 putRequest(self, request, block=True, timeout=None),将工作请求放入到工作队列中,并执行。
  • wait():阻塞,直到线程池中所有的线程都结束。
  • 内存共享:多线程之间是可以共享任何内存的,因此可以对其进行累加。如果是多进程,则不可以共享内存,如果需要共享,需要其他的手段才可以。

# 5.2.2 使用joblib实现多CPU并行执行

可以用 joblib 来实现多线程并行执行for循环操作。

项目地址:https://github.com/joblib/joblib (opens new window),官方文档:https://joblib.readthedocs.io/en/latest/ (opens new window)

# -*- coding: utf-8 -*-

import time
from decimal import Decimal

from joblib import Parallel, delayed
import multiprocessing


def processInput(i):
    time.sleep(1)
    return i * i


if __name__ == '__main__':

    start_time = time.time()
    start_time_str = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())

    inputs = range(10)
    num_cores = multiprocessing.cpu_count()
    print(num_cores)
    results = Parallel(n_jobs=num_cores)(delayed(processInput)(i) for i in inputs)
    print(results)

    end_time = time.time()
    end_time_str = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
    time_consuming_str = str(Decimal(str((end_time - start_time) * 1000)).quantize(Decimal('0.00'))) + 'ms'
    print("===处理结束,总共耗时:" + time_consuming_str)
    
>>> 结果输出
8
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
===处理结束,总共耗时:2439.54ms
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34

解释说明:

  • num_cores = multiprocessing.cpu_count()用于获取CPU核心数,Parallel(n_jobs=num_cores) 指定使用几个CPU,默认是分配给不同的CPU。后面的delayed(processInput)表示要用的函数是processInput。

  • 何时选用并行:这个问题其实是最关键的,并行其实也不一定一直会很快。并行处理的流程如下图所示,这里拆分、合并和CPU之间的通信都是会产生时间消耗的。如果本身的任务很小,其实消耗的时间反而会更多。另外,往往实际的并行不能达到几个CPU就有几倍速度也正是这个原因。

    使用Parallel并行执行任务的流程

# 5.2.3 多线程并行下载

[1] 多线程批量并行下载小文件

对于批量下载小文件的情况,可以将url存成列表,多线程批量并行下载。

# -*- coding: utf-8 -*-

import os
import requests
from joblib import Parallel, delayed


def download_file(file_url):
    try:
        num = 0
        while num < retry:
            base_url, file_name = os.path.split(file_url)
            save_path = output_path + "/" + file_name
            res = requests.get(file_url, headers=headers, timeout=timeout)
            if res.status_code == 200:
                with open(file_path, "wb") as f:
                    f.write(res.content)
                break
            else:
                num = num + 1
    except Exception as e:
        print("{}下载失败,异常为{}".format(file_url, e))
        
if __name__ == '__main__':

    # 输出文件夹
    output_path = './output'
    # 线程数
    thread_num = 10
    # 重试次数
    retry = 3
    # http请求headers
    headers = {
        'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/57.0.2987.133 Safari/537.36'
    }
    # http请求超时设置
    timeout = 20

    # 创建输出文件夹
    if not os.path.exists(output_path):
        os.mkdir(output_path)

    # 文件url下载列表
    file_url_list = ["https://xxx.xxx.xxx/aaa.png",
                     "https://xxx.xxx.xxx/bbb.png",
                     "https://xxx.xxx.xxx/ccc.png"]

    # 多线程并行下载
    Parallel(n_jobs=thread_num)(delayed(download_file)(file_url) for file_url in file_url_list)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49

如果想要按照URL路径创建对应的目录,替换如下函数即可。

from urllib.parse import urlparse


def download_file(file_url):
    try:
        num = 0
        while num < retry:
            # 解析URL
            file_url_parse = urlparse(file_url)
            file_path = output_path + file_url_parse.path
            # 创建目录
            file_base_path, file_name = os.path.split(file_path)
            if not os.path.exists(file_base_path):
                os.makedirs(file_base_path)
            # 保存文件
            res = requests.get(file_url, headers=headers, timeout=timeout)
            if res.status_code == 200:
                with open(file_path, "wb") as f:
                    f.write(res.content)
                break
            else:
                num = num + 1
    except Exception as e:
        print("{}下载失败,异常为{}".format(file_url, e))
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24

[2] 多线程分片并行下载大文件

想要分片流式下载,可以通过请求头修改需要读取的字节范围来实现,当然也需要先检查目标服务器是否支持范围请求。如果请求一个资源时, HTTP响应中出现Accept-Ranges且其值不是none, 那么服务器支持范围请求。

import requests

url = "https://xxx.xxx.xxx/test.tar.gz"
res = requests.head(url)
head = res.headers
data = res.content
print(head)
print(data)

>>> 输出结果
{'Date': 'Sun, 02 Apr 2023 08:13:22 GMT', 'Content-Type': 'application/x-gzip', 'Content-Length': '199282949', 'Connection': 'keep-alive', 'last-modified': 'Wed, 10 Aug 2022 06:41:18 GMT', 'etag': '"be0d105-5e5dd561afd8e"', 'strict-transport-security': 'max-age=15768000', 'Cache-Control': 'max-age=1800', 'CF-Cache-Status': 'HIT', 'Age': '1068', 'Accept-Ranges': 'bytes', 'Report-To': '{"endpoints":[{"url":"https:\\/\\/a.nel.cloudflare.com\\/report\\/v3?s=W2p1i1Vf%2B0TCowTN3RjumoT4vQI0AQoJ%2FyM%2BmAyfMwylgRMO6nshgMLoj6hsKxJhLxAFKUP7T%2Fpn4pqJbQtVTRoq5mwWn6Li3JN6Br7AIhlyuqdKBKCKar4rjOm9QRPcug%3D%3D"}],"group":"cf-nel","max_age":604800}', 'NEL': '{"success_fraction":0,"report_to":"cf-nel","max_age":604800}', 'Vary': 'Accept-Encoding', 'Server': 'cloudflare', 'CF-RAY': '7b179cb6ddf32112-HKG'}
b''
1
2
3
4
5
6
7
8
9
10
11
12

可以看到head请求只返回的响应头,未返回任何数据。上面的响应头中,'Accept-Ranges': 'bytes' 代表可以使用字节作为单位来定义请求范围。Content-Length 则代表该资源的完整大小。

于是我们可以通过Content-Length 响应头获取文件的大小,这就是当前文件的总大小。

filesize = int(res.headers['Content-Length'])
1

这时我们就可以根据总大小对文件进行分片,例如总共分几部分或者多大的部分作为一个分片。

完整代码如下:

# -*- coding: utf-8 -*-

from concurrent.futures import ThreadPoolExecutor, as_completed
import requests

"""
多线程分片并行下载大文件
"""


# 计算分片范围
def calc_divisional_range(filesize, chuck=10):
    step = filesize//chuck
    arr = list(range(0, filesize, step))
    result = []
    for i in range(len(arr)-1):
        s_pos, e_pos = arr[i], arr[i+1]-1
        result.append([s_pos, e_pos])
    result[-1][-1] = filesize-1
    return result


# 下载方法
def range_download(save_name, s_pos, e_pos):
    headers = {"Range": f"bytes={s_pos}-{e_pos}"}
    res = requests.get(url, headers=headers, stream=True)
    with open(save_name, "rb+") as f:
        f.seek(s_pos)
        for chunk in res.iter_content(chunk_size=64*1024):
            if chunk:
                f.write(chunk)


if __name__ == '__main__':

    url = "https://xxx.xxx.xxx/test.tar.gz"
    res = requests.head(url)
    filesize = int(res.headers['Content-Length'])
    chuck = 20      # 分片数量
    divisional_ranges = calc_divisional_range(filesize, chuck)
    save_path = "./test.tar.gz"
    print("正在下载{}地址的文件...".format(url))
    # 先创建空文件
    with open(save_path, "wb") as f:
        pass
    with ThreadPoolExecutor() as p:
        futures = []
        for s_pos, e_pos in divisional_ranges:
            print(s_pos, e_pos)
            futures.append(p.submit(range_download, save_path, s_pos, e_pos))
        # 等待所有任务执行完毕
        as_completed(futures)
    print("{}地址的文件已下载完毕,将其保存在{}路径".format(url, save_path))
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53

实现断点续传的思路:

  • 粗粒度的方法就是以分片为校验单位,某个分片下载失败则重新下载。
  • 细粒度一点的方法是每个分片内部校验已下载的范围,对于下载失败的分布,重新定位起始位置继续下载。

# 5.3 使用Python多进程

# 5.3.1 Python多进程简介

Python中的多进程是通过multiprocessing包来实现的,和多线程的threading.Thread差不多,它可以利用multiprocessing.Process对象来创建一个进程对象。这个进程对象的方法和线程对象的方法差不多也有start(), run(), join()等方法,其中有一个方法不同Thread线程对象中的守护线程方法是setDeamon,而Process进程对象的守护进程是通过设置daemon属性来完成的。

# 5.3.2 使用multiprocessing实现多进程

可以使用multiprocessing实现多进程,相当于开启了n(进程数)个原来的程序去共同执行。需要注意的是,开启了多进程后,在main里定义的一些全局变量就失效了,应当把它们挪到子方法里。默认情况下,各进程之间的内存是不共享的,如果需要共享内存需要单独做特殊配置。

demo1.py

# -*- coding: utf-8 -*-

import time
from decimal import Decimal
from multiprocessing import Process


def fun(name):
    time.sleep(1)
    print('测试多进程%s' %name)


if __name__ == '__main__':

    print("===开始执行===")
    start_time = time.time()
    start_time_str = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())

    process_list = []
    for i in range(5):  # 开启5个子进程执行fun函数
        p = Process(target=fun, args=(str(i),))  # 实例化进程对象
        p.start()
        process_list.append(p)

    for i in process_list:
        p.join()

    end_time = time.time()
    end_time_str = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
    time_consuming_str = str(Decimal(str((end_time - start_time) * 1000)).quantize(Decimal('0.00'))) + 'ms'
    print("===执行结束,总共耗时:{}===".format(time_consuming_str))
    
>>> 结果输出
===开始执行===
测试多进程0
测试多进程3
测试多进程4
测试多进程1
测试多进程2
===执行结束,总共耗时:1796.32ms===
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40

demo2.py

# -*- coding: utf-8 -*-

from decimal import Decimal
from multiprocessing import Pool
import time


def test(p):
    print(p)
    time.sleep(1)


if __name__ == '__main__':

    print("===开始执行===")
    start_time = time.time()
    start_time_str = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())

    pool = Pool(processes=2)
    for i in range(6):
        '''
         (1)循环遍历,将6个子进程添加到进程池(相对父进程会阻塞)
         (2)每次执行2个子进程,等一个子进程执行完后,立马启动新的子进程。(相对父进程不阻塞)
        '''
        pool.apply_async(test, args=(i,))   # 维持执行的进程总数为2,当一个进程执行完后启动一个新进程.
    print('test')
    pool.close()
    pool.join()

    end_time = time.time()
    end_time_str = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
    time_consuming_str = str(Decimal(str((end_time - start_time) * 1000)).quantize(Decimal('0.00'))) + 'ms'
    print("===执行结束,总共耗时:{}===".format(time_consuming_str))
    
>>> 结果输出
===开始执行===
test
0
1
2
3
4
5
===执行结束,总共耗时:3622.63ms===
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44

# 5.4 使用Python协程

# 5.4.1 Python协程简介

协程,又称微线程,是运行在单线程中的“并发”,协程相比多线程的一大优势就是省去了多线程之间的切换开销,获得了更高的运行效率。

在Python3.4之前,官方没有对协程的支持,但存在一些第三方库的实现,比如gevent和tornado,3.4之后有了asyncio,官方才真正实现了协程这一特性。

# 5.4.2 使用asyncio实现协程

# -*- coding: utf-8 -*-

import asyncio


async def func():
    print(1)
    await asyncio.sleep(2)
    print(2)
    return "返回值"


async def main():
    print("main开始")
    # 创建协程,将协程封装到一个Task对象中并立即添加到事件循环的任务列表中,等待事件循环去执行(默认是就绪状态)。
    task1 = asyncio.create_task(func())
    # 创建协程,将协程封装到一个Task对象中并立即添加到事件循环的任务列表中,等待事件循环去执行(默认是就绪状态)。
    task2 = asyncio.create_task(func())
    print("main结束")
    # 当执行某协程遇到IO操作时,会自动化切换执行其他任务。
    # 此处的await是等待相对应的协程全都执行完毕并获取结果
    ret1 = await task1
    ret2 = await task2
    print(ret1, ret2)


loop = asyncio.get_event_loop()  # 创建一个事件循环
loop.run_until_complete(main())  # 将协程对象加入到事件循环
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28

输出结果:

main开始
main结束
1
1
2
2
返回值 返回值
1
2
3
4
5
6
7

# 5.5 Schedule定时任务

# 5.5.1 定时任务库对比

以下是几个好用的 Python 任务调度库。

  • schedule:Python job scheduling for humans. 轻量级,无需配置的作业调度库
  • python-crontab: 针对系统 Cron 操作 crontab 文件的作业调度库
  • Apscheduler:一个高级的 Python 任务调度库
  • Celery: 是一个简单,灵活,可靠的分布式系统,用于处理大量消息,同时为操作提供维护此类系统所需的工具, 也可用于任务调度

优缺点对比:

  • schedule 优点是简单、轻量级、无需配置、语法简单,缺点是阻塞式调用、无法动态添加或删除任务
  • Python-crontab 优点是针对于系统 crontab 操作,支持定时、定期任务,能够动态添加任务,不能实现一次性任务需求
  • Apscheduler 优点支持定时、定期、一次性任务,支持任务持久化及动态添加、支持配置各种持久化存储源(如 redis、MongoDB),支持接入到各种异步框架(如 gevent、asyncio、tornado)
  • Celery 支持配置定期任务、支持 crontab 模式配置,不支持一次性定时任务

# 5.5.2 schedule的使用

依赖安装:

pip install schedule
1

使用示例:

import schedule
import time

# 定义你要周期运行的函数
def job():
    print("I'm working...")

schedule.every(10).minutes.do(job)               # 每隔 10 分钟运行一次 job 函数
schedule.every().hour.do(job)                    # 每隔 1 小时运行一次 job 函数
schedule.every().day.at("10:30").do(job)         # 每天在 10:30 时间点运行 job 函数
schedule.every().monday.do(job)                  # 每周一 运行一次 job 函数
schedule.every().wednesday.at("13:15").do(job)   # 每周三 13:15 时间点运行 job 函数
schedule.every().minute.at(":17").do(job)        # 每分钟的 17 秒时间点运行 job 函数

while True:
    schedule.run_pending()   # 运行所有可以运行的任务
    time.sleep(1)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17

# 5.6 Celery异步框架

# 5.6.1 Celery异步框架简介

Celery 是Python语言实现的分布式队列服务,除了支持即时任务,还支持定时任务,Celery 有5个核心角色。

  • Task:就是你要做的事情,例如一个注册流程里面有很多任务,给用户发验证邮件就是一个任务,这种耗时的任务就可以交给Celery去处理,还有一种任务是定时任务,比如每天定时统计网站的注册人数,这个也可以交给Celery周期性的处理。
  • Broker:相当于数据结构中的队列,介于生产者和消费者之间经纪人。例如一个Web系统中,生产者是主程序,它生产任务,将任务发送给 Broker,消费者是 Worker,是专门用于执行任务的后台服务。Celery本身不提供队列服务,一般用Redis或者RabbitMQ来实现队列服务。
  • Worker:那个一直在后台执行任务的,也成为任务的消费者,它会实时地监控队列中有没有任务,如果有就立即取出来执行。
  • Beat:是一个定时任务调度器,它会根据配置定时将任务发送给 Broker,等待 Worker 来消费。
  • Backend:用于保存任务的执行结果,每个任务都有返回值,比如发送邮件的服务会告诉我们有没有发送成功,这个结果就是存在Backend中,当然我们并不总是要关心任务的执行结果。

项目地址:https://github.com/celery/celery (opens new window) 官方文档:https://docs.celeryq.dev/en/stable/ (opens new window)

# 5.6.2 使用Celery异步框架

Step1:安装依赖

$ pip install celery
$ pip install redis
1
2

注:客户端通过消息队列和 Worker 进行通信,Celery 支持多种方式来实现这些队列,最常用的代理就是 RabbitMQ 和 Redis,此处示例使用Redis。

Step2:编写测试程序

模拟一个用户注册的场景,发送邮件的过程用异步的方式去处理,保存完账号密码之后就立刻通知注册成功,而不等待邮件发送过程。

# -*- coding: utf-8 -*-

from celery import Celery
import time

# 创建Celery实例
# app = Celery('tasks', broker='redis://127.0.0.1:6379/0') 
app = Celery('tasks', broker='redis://:[email protected]:6379/0')


# 按行追加写入txt文件
def write_content_to_txt(txt_path, content):
    a = open(txt_path, 'a')
    a.write(content + '\n')
    a.close()


# 创建任务
@app.task
def send_mail(email):
    time.sleep(5)  # 用sleep模拟邮件发送操作
    content = "send mail to {}".format(email)
    print(content)
    write_content_to_txt('./send_mail.txt', content)
    return "send mail success"


# 调用任务
def register():
    start = time.time()
    print("1. 插入记录到数据库")
    print("2. celery异步发送邮件")
    send_mail.delay("[email protected]")
    print("3. 告知用户注册成功")
    print("耗时:%s 秒 " % (time.time() - start))


if __name__ == '__main__':
    register()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39

Step3:启动Worker

$ celery -A tasks worker --loglevel=info
1

-A 指定 celery 实例所在哪个模块中,启动成功后,能看到如下信息:

 -------------- [email protected] v5.2.7 (dawn-chorus)
--- ***** ----- 
-- ******* ---- macOS-13.3.1-arm64-arm-64bit 2023-05-06 15:15:21
- *** --- * --- 
- ** ---------- [config]
- ** ---------- .> app:         tasks:0x103fd0850
- ** ---------- .> transport:   redis://:**@127.0.0.1:6379/0
- ** ---------- .> results:     disabled://
- *** --- * --- .> concurrency: 8 (prefork)
-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
--- ***** ----- 
 -------------- [queues]
                .> celery           exchange=celery(direct) key=celery
                

[tasks]
  . tasks.send_mail

[2023-05-06 15:15:23,242: INFO/MainProcess] Connected to redis://:**@127.0.0.1:6379/0
[2023-05-06 15:15:24,635: INFO/MainProcess] mingle: searching for neighbors
[2023-05-06 15:15:28,391: INFO/MainProcess] mingle: all alone
[2023-05-06 15:15:32,290: INFO/MainProcess] [email protected] ready.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22

注意:必须要先启动Worker,不然不会执行异步任务,若是修改了异步模块的代码,也需要重新启动Worker才会更新。

Step4:执行测试程序

启动测试程序,程序会依次看到如下日志,可以看到1.67s就走完了整个流程,但此时异步模块还没执行完。

1. 插入记录到数据库
2. celery异步发送邮件
3. 告知用户注册成功
耗时:1.6706061363220215 秒 
1
2
3
4

异步模块在Worker的控制台里可以看到如下日志:

[2023-05-06 15:15:41,840: INFO/MainProcess] Task tasks.send_mail[6dd495fd-d1c4-4459-856b-17ac1dde7902] received
[2023-05-06 15:15:46,845: WARNING/ForkPoolWorker-8] send mail to [email protected]
[2023-05-06 15:15:46,849: INFO/ForkPoolWorker-8] Task tasks.send_mail[6dd495fd-d1c4-4459-856b-17ac1dde7902] succeeded in 5.006026458000001s: 'send mail success'
1
2
3

等待5s后,的确在当前目录里生成了send_mail.txt文件,说明异步模块执行成功了。

# 5.7 使用Python装饰器

# 5.7.1 Python装饰器简介

Python 的装饰器是一种非常便捷的修改函数的方式,不影响原函数的定义而对函数进行一些额外的封装,有点类似 AOP,增加一些小功能却不侵入原有代码,非常简洁强大。

# 5.7.2 与Java注解异同点对比

[1] 对代码块的影响

  • java注解:不会对所修饰的代码产生直接的影响。
  • python装饰器:可以对所修饰的代码产生直接的影响。

[2] 共通处

  • java中注解+反射 可以实现 python装饰器同样的功能,包括面向切面编程、参数校验等。

[3] 从用途看

  • 从用途看注解像是注释文档一样,用于生成javadoc文档(以参数形式标注)、检查等。
  • 装饰器像是为函数提供更多的功能,并装在不同的函数身上。

[4] 从原理看

  • java注解:所有注解本质是继承自接口的接口。
  • python装饰器:被装饰函数的返回值作为参数传给闭包函数执行(这个闭包函数名前面加个@,就是装饰器)。

# 5.7.3 使用装饰器实现权限校验

# -*- coding: utf-8 -*-


# 设置一个类表示个人,有姓名和权限两个属性
class Person():
    def __init__(self, name, permission):
        self.name = name
        self.permission = permission


# 权限校验装饰器
def checkPermission(num):
    def setPermission(func):
        def inner(person):
            # 使用 & 进行按位与运算(只有参与&运算的两个位都为1时,结果才为1,否则为0),当校验通过时,可以执行该操作
            if num & person.permission == num:
                func(person)
            else:
                print(person.name, "无权限")
        return inner
    return setPermission


@checkPermission(1)  # 001
def read(person):
    print(person.name, "读取代码")


@checkPermission(2)  # 010
def write(person):
    print(person.name, "写入代码")


@checkPermission(4)  # 100
def run(person):
    print(person.name, "执行代码")


if __name__ == '__main__':
    p1 = Person("张三", 1)    # 只有读权限(1=2^0,001)
    p2 = Person("李四", 3)    # 读写权限(3=2^1+2^0,011)
    p3 = Person("王五", 6)    # 写与执行权限(6=2^2+2^1,110)
    read(p1), write(p1), run(p1)
    print("===================")
    read(p2), write(p2), run(p2)
    print("===================")
    read(p3), write(p3), run(p3)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47

输出结果:

张三 读取代码
张三 无权限
张三 无权限
===================
李四 读取代码
李四 写入代码
李四 无权限
===================
王五 无权限
王五 写入代码
王五 执行代码
1
2
3
4
5
6
7
8
9
10
11

# 5.8 使用Memray分析程序内存占用

深度学习算法编写或者调用不当时可能会出现内存叠加、内存溢出等问题,可以使用工具对程序内存占用进行分析。

# 5.8.1 Memray简介

# 5.8.2 Memray基本使用

基本使用:安装依赖——用memray运行程序——转换二进制文件

$ pip install memray                     // 安装memray依赖(仅支持Linux)
$ python -m memray run my_script.py      // 运行单个文件
$ python -m memray run -m my_module      // 运行整个模块
$ memray flamegraph my_script.2369.bin   // 将二进制文件转换成火焰图html文件
1
2
3
4

注意必须是Linux平台,其他平台不支持使用,它生成的是一个二进制文件(如my_script.2369.bin),可通过命令将其转换成直观的火焰图html文件。

Memray火焰图

# 5.9 程序执行耗时分析

# 5.9.1 使用py-spy分析执行耗时

[1] py-spy简介

  • 项目描述:用 top 的方式分析 Python 程序性能的工具。一款 Python 程序性能分析工具,它可以让你在不重启程序或修改代码的情况,直观地看到 Python 程序中每个函数花费的时间。适用于 Linux、OSX、Windows 等系统。
  • 项目地址:https://github.com/benfred/py-spy (opens new window)

[2] py-spy基本使用

基本使用:安装依赖——用py-spy运行程序

$ pip install py-spy                                          // 安装py-spy依赖
$ sudo py-spy record -o profile.svg -- python my_script.py    // 运行程序进行分析
1
2

注:控制台会实时显示资源占用情况,执行完毕后会自动生成一个svg格式的火焰图。

py-spy

# 5.9.2 使用cProfile分析执行耗时

cProfile(语言编写的测试模块)是一个标准库内建的性能分析工具,可以在标准输出中看到每一个函数被调用的次数和运行的时间,从而找到程序的性能瓶颈,从而有针对性的进行性能优化。

[1] cProfile分析执行耗时的基本使用

import cProfile

def func(a):
    sum = 0
    for i in range(a):
        sum += i
    return sum

if __name__ == '__main__':
    cProfile.run("func(10000000)")

>>> 输出结果
   4 function calls in 0.306 seconds
   Ordered by: standard name
   ncalls  tottime  percall  cumtime  percall filename:lineno(function)
        1    0.000    0.000    0.306    0.306 <string>:1(<module>)
        1    0.306    0.306    0.306    0.306 test2.py:4(func)
        1    0.000    0.000    0.306    0.306 {built-in method builtins.exec}
        1    0.000    0.000    0.000    0.000 {method 'disable' of '_lsprof.Profiler' objects}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19

第一行是总共调用function次数,和总运行时间。下面是cProfile输出的各个参数说明:

  • ncalls:函数调用次数;
  • tottime:函数的总的运行时间,除掉函数中调用子函数的运行时间;
  • percall:函数运行一次的平均时间等于 tottime/ncalls;
  • cumtime:表示该函数及其所有子函数的调用运行的时间,包含了子函数的运行时间;
  • percall:函数运行一次的平均时间,等于 cumtime/ncalls;
  • filename:lineno(function):每个函数调用的具体信息,函数所在的文件名称、代码行数、函数名称等;

[2] 导出耗时分析文件并对其排序处理

如上示例是直接在代码里使用cProfile的,结果会在控制台里打印,有一定的侵入性,也不利于排序查找,下面将介绍如何导出耗时分析文件并对其排序处理。

$ python -m cProfile -o cpf_out.txt test.py
1

注:这里会生成一个txt格式的分析文件,但这个不是可读的,需要使用程序去查看。

# -*- coding: utf-8 -*-

import pstats

p = pstats.Stats('./cpf_out.txt')
p.print_stats()
# 根据文件名升序
# p.sort_stats('filename').print_stats()
# 根据模块名升序
# p.sort_stats('module').print_stats()
# 根据函数名升序
# p.sort_stats('name').print_stats()
# 根据调用总次数降序
# p.sort_stats('ncalls').print_stats()
# 根据调用总时间(去除子函数)降序
# p.sort_stats('tottime').print_stats()
# 根据调用总时间(包含子函数)降序
p.sort_stats('cumtime').print_stats()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18

# 6. 参考资料

[1] Python 使用 flask 库传递 JSON 数据 from CSDN (opens new window)

[2] Python | Flask 解决跨域问题 from segmentfault (opens new window)

[3] flask小坑--request.json 无法获取请求body的json数据 from CSDN (opens new window)

[4] python清除字符串中间空格的方法 from CSDN (opens new window)

[5] Python 3 实现定义跨模块的全局变量和使用 from 博客园 (opens new window)

[6] python configparser 读取 ini类型的配置文件 from CSDN (opens new window)

[7] 高性能异步框架Celery入坑指南 from 稀土掘金 (opens new window)

[8] 新版xlrd报 Excel xlsx file;not supported from CSDN (opens new window)

[9] Python md5去重图片文件 from 代码交流 (opens new window)

[10] Python递归获取指定文件夹下所有指定后缀的文件路径 from CSDN (opens new window)

[11] Python3生成XML文件 from 知乎 (opens new window)

[12] python 统计list中各个元素出现的次数 from CSDN (opens new window)

[13] python 读写csv文件(创建,追加,覆盖)from CSDN (opens new window)

[14] 用Python编写的CSV文件每行之间都有空行 from QAStack (opens new window)

[15] python操作csv文件实现对特定列排序 from 简书 (opens new window)

[16] 视频抽帧那点事 from cnblogs (opens new window)

[17] 解决flask接口返回的内容中文乱码的问题 from CSDN (opens new window)

[18] json dump 中文乱码 from CSDN (opens new window)

[19] python格式化输出 json数组 from CSDN (opens new window)

[20] Python读取txt文件数据 每行以空格间隔(列表推导式)from CSDN (opens new window)

[21] 如何使用python生成局域网的网络拓扑图 from 知乎 (opens new window)

[22] python导出项目依赖包与导入项目依赖包 from CSDN (opens new window)

[23] requests.exceptions.SSLError: HTTPSConnectionPool 解决方案 from CSDN (opens new window)

[24] python 使用代理的几种方式 from CSDN (opens new window)

[25] python执行shell脚本的几种方法 from CSDN (opens new window)

[26] python字典转为对象,用"."方式访问对象属性 from 博客园 (opens new window)

[27] 农历和阳历日期互转,Python实现 from CSDN (opens new window)

[28] 保护版权,用 Python 为图片添加盲水印 from 程序员客栈 (opens new window)

[29] python多种方法压缩图片,opencv、PIL、tinypng、pngquant压缩图片 from CSDN (opens new window)

[30] Flask 使用流下载文件 from CSDN (opens new window)

[31] Python中匹配IP的正则表达式 from 博客园 (opens new window)

[32] NumPy 数组不是 JSON 可序列化的 from stackoverflow (opens new window)

[33] 为什么有人说 Python 的多线程是鸡肋呢 from 知乎 (opens new window)

[34] python 多CPU并行执行for循环 from CSDN (opens new window)

[35] Python之并行--基于joblib from CSDN (opens new window)

[36] Python代码性能分析之cProfile from 吾非同 (opens new window)

[37] Python并发、Python多进程(三) 进程间数据共享 from CSDN (opens new window)

[38] 多线程分片并行下载大文件 from CSDN (opens new window)

[39] Python使用grequests并发发送请求 from 北京临渊 (opens new window)

[40] Python 文本加密解密 中文TXT数据 from 简书 (opens new window)

[41] Python 新提案:删除全局解释器锁 GIL,解放多线程性能 from 开源中国 (opens new window)

[42] python 网络图片转base64 from CSDN (opens new window)

[43] python之base64编码出现b的问题 from 知乎 (opens new window)

[44] Flask后端实践 连载三 接口标准化及全球化 from CSDN (opens new window)

[45] 使用 Flask-Docs 自动生成 Api 文档 from segmentfault (opens new window)

[46] 令人不悦的Error--requests.exceptions.ProxyError from CSDN (opens new window)

[47] python制作图片缩略图 from 51CTO (opens new window)

[48] Python 读写 ini 配置文件 from IS-RPA (opens new window)

[49] Python3.9使用pandas读取Excel报错 AttributeError: ‘ElementTree‘ object has no attribute ‘getiterator‘ from CSDN (opens new window)

[50] 使用 pre-request 优化 Flask 入参校验 from 稀土掘金 (opens new window)

[51] python高级使用,包括多线程、协同编程、知识协同、网络、装饰器等 from Github (opens new window)

[52] 浅谈java中注解和python中装饰器的区别 from CSDN (opens new window)

[53] python 装饰器学习 校验权限 from CSDN (opens new window)

[54] python 详解multiprocessing多进程-Pool进程池模块(二)from CSDN (opens new window)

[55] 一篇文章搞定Python多进程(全) from 知乎 (opens new window)

[56] python 获取系统信息 from 知乎 (opens new window)

[57] 在 Python 中获取硬件和系统信息 from CSDN (opens new window)

[58] 使用python基于git log统计开发代码量 from CSDN (opens new window)

[59] python asynio错误 There is no current event loop in thread 'Thread-1' from CSDN (opens new window)

[60] Python统计代码耗时的几种方法 from 腾讯云 (opens new window)

[61] Python通过装饰器并使用cprofile对函数进行性能分析 from 简书 (opens new window)

[62] python多线程比单线程效率低的原因及其解决办法 from CSDN (opens new window)

[63] RuntimeError:尝试反序列化 CUDA 设备上的对象 from stackoverflow (opens new window)

[64] Python监听剪切板的两种方法 from CSDN (opens new window)

[65] Python文件压缩与解压 from 稀土掘金 (opens new window)

[66] UnicodeEncodeError: 'ascii' codec can't encode characters in position 0-1: ordinal not in range(128) from 博客园 (opens new window)

[67] python requests发送json格式数据 from CSDN (opens new window)

[68] 用 Python 生成 & 识别二维码 from Python技术 (opens new window)

[69] Python 定时任务最佳实践 from 知乎 (opens new window)

[70] Python - 使用 Gmail 发送电子邮件 from techexpert (opens new window)

[71] pyhton-docx库解析word文档 from 简书 (opens new window)

[72] python将多张图片合并成一张图片 from CSDN (opens new window)

[73] 基于Flask的高并发部署方案 from CSDN (opens new window)

[74] 大型Python项目依赖树很烦?教你一键理清 from 稀土掘金 (opens new window)

[75] 一个傻瓜式构建可视化 web的 Python 神器 -- streamlit 教程 from 稀土掘金 (opens new window)

[76] 机器学习UI开发框架Streamlit快速教程 from 腾讯云 (opens new window)

[77] 快速生成 AI 应用的框架对比:Gradio、Streamlit 和 Dash from 知乎 (opens new window)

[78] Streamlit和Gradio快速构建和部署机器学习模型的交互式应用 from 腾讯云 (opens new window)

[79] Flask 已死,FastAPI 是未来 from InfoQ (opens new window)

[80] Qwen2-7B-Instruct FastApi 部署调用 from CSDN (opens new window)

Last Updated: 5/4/2025, 4:42:45 PM