python-utils

python 实用代码

参考文献:

所有带有 #from d2l 的代码都出自动手学深度学习

计时器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
#from d2l
import time
import numpy as np
class Timer:
"""记录多次运行时间"""
def __init__(self):
self.times = []
self.start()

def start(self):
"""启动计时器"""
self.tik = time.time()

def stop(self):
"""停止计时器并将时间记录在列表中"""
self.times.append(time.time() - self.tik)
return self.times[-1]

def avg(self):
"""返回平均时间"""
return sum(self.times) / len(self.times)

def sum(self):
"""返回时间总和"""
return sum(self.times)

def cumsum(self):
"""返回累计时间"""
return np.array(self.times).cumsum().tolist()

累加器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
class Accumulator:  # from d2l
"""在n个变量上累加"""

def __init__(self, n):
self.data = [0.0] * n

def add(self, *args):
self.data = [a + float(b) for a, b in zip(self.data, args)]

def reset(self):
self.data = [0.0] * len(self.data)

def __getitem__(self, idx):
return self.data[idx]

使用案例

1
2
3
4
5
6
acc = Accumulator(3)
for i in range(10):
acc.add(i, i+10, i+100)
print(acc[0])
for a in acc:
print(a)

Logger

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
class Logger:
"""输出log"""
def __init__(self, level="trace", output="stdout"):
self.output = output
self.level = level
assert level in ['error', 'warning', 'info', 'debug', 'trace']
self.level_map = {
'error': 5,
'warning': 4,
'info': 3,
'debug': 2,
'trace': 1
}
self.level_num = self.level_map[level]

def get_log_text_color(self, level, text) -> str:
if level == 'error':
return f"\x1b[31;1;4m[X] Error\x1b[0m\x1b[31m: {text}\x1b[0m"
elif level == 'warning':
return f"\x1b[33;1;4m[!] Warning\x1b[0m\x1b[33m: {text}\x1b[0m"
elif level == 'info':
return f"\x1b[34;1;4m[+] Info\x1b[0m\x1b[34m: {text}\x1b[0m"
elif level == 'debug':
return f"\x1b[36;1;4m[#] Debug\x1b[0m\x1b[36m: {text}\x1b[0m"
elif level == 'trace':
return f"{text}"
else:
return f"\x1b[31;1;4m[X] Unknown\x1b[0m\x1b[31m: {text}\x1b[0m"

def get_log_text(self, level, text) -> str:
if level == "error":
return f"[X] Error: {text}"
elif level == "warning":
return f"[!] Warning: {text}"
elif level == "info":
return f"[+] Info: {text}"
elif level == "debug":
return f"[#] Debug: {text}"
elif level == "trace":
return f"{text}"
else:
return f"[X] Unknown: {text}"

def print(self, level, text):
if self.output == "stdout":
print(self.get_log_text_color(level, text))
else:
with open(self.output, "a") as f:
f.write(self.get_log_text(level, text) + "\n")

def error(self, text):
self.print('error', text)

def warning(self, text):
if self.level_num <= 4:
self.print('warning', text)

def info(self, text):
if self.level_num <= 3:
self.print('info', text)

def debug(self, text):
if self.level_num <= 2:
self.print('debug', text)

def trace(self, text):
if self.level_num <= 1:
self.print('trace', text)

使用案例

1
2
3
4
5
6
7
l = Logger()

l.error("114514")
l.warning("114514")
l.info("114514")
l.debug("114514")
l.trace("114514")

绘图

静态图片

默认在 jupyter notebook 中绘图

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
#from d2l
from matplotlib_inline import backend_inline
from matplotlib import pyplot as plt

def use_svg_display():
"""Use the svg format to display a plot in Jupyter.

Defined in :numref:`sec_calculus`"""
backend_inline.set_matplotlib_formats("svg")


def set_figsize(figsize=(3.5, 2.5)):
"""Set the figure size for matplotlib.

Defined in :numref:`sec_calculus`"""
use_svg_display()
plt.rcParams["figure.figsize"] = figsize


def set_axes(axes, xlabel, ylabel, xlim, ylim, xscale, yscale, legend):
"""Set the axes for matplotlib.

Defined in :numref:`sec_calculus`"""
axes.set_xlabel(xlabel), axes.set_ylabel(ylabel)
axes.set_xscale(xscale), axes.set_yscale(yscale)
axes.set_xlim(xlim), axes.set_ylim(ylim)
if legend:
axes.legend(legend)
axes.grid()


def plot(X, Y=None, xlabel=None, ylabel=None, legend=[], xlim=None,
ylim=None, xscale='linear', yscale='linear',
fmts=('-', 'm--', 'g-.', 'r:'), figsize=(3.5, 2.5), axes=None):
"""Plot data points.

Defined in :numref:`sec_calculus`"""

def has_one_axis(X): # True if X (tensor or list) has 1 axis
return (hasattr(X, "ndim") and X.ndim == 1 or isinstance(X, list)
and not hasattr(X[0], "__len__"))

if has_one_axis(X): X = [X]
if Y is None:
X, Y = [[]] * len(X), X
elif has_one_axis(Y):
Y = [Y]
if len(X) != len(Y):
X = X * len(Y)

set_figsize(figsize)
if axes is None:
axes = plt.gca()
axes.cla()
for x, y, fmt in zip(X, Y, fmts):
axes.plot(x,y,fmt) if len(x) else axes.plot(y,fmt)
set_axes(axes, xlabel, ylabel, xlim, ylim, xscale, yscale, legend)

测试代码:

1
2
3
4
5
6
7
import numpy as np

x = np.arange(-7, 7, 0.01)
y = x**2
z = x**3
y = [y, z]
plot(x, y, xlabel="x", ylabel="y", figsize=(5, 3), legend=["y=x**2", "y=x**3"])

image-20240816120958479

保存绘图为文件

1
plt.savefig("image.png")

打开窗口预览图片

1
plt.show()

动态图片

jupyter 版本

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
# from d2l
from matplotlib import pyplot as plt
from matplotlib_inline import backend_inline
from IPython import display
import numpy as np
def use_svg_display():
"""Use the svg format to display a plot in Jupyter.

Defined in :numref:`sec_calculus`"""
backend_inline.set_matplotlib_formats("svg")


def set_axes(axes, xlabel, ylabel, xlim, ylim, xscale, yscale, legend):
"""Set the axes for matplotlib.

Defined in :numref:`sec_calculus`"""
axes.set_xlabel(xlabel), axes.set_ylabel(ylabel)
axes.set_xscale(xscale), axes.set_yscale(yscale)
axes.set_xlim(xlim), axes.set_ylim(ylim)
if legend:
axes.legend(legend)
axes.grid()


class Animator:
"""在动画中绘制数据"""

def __init__(
self,
xlabel=None,
ylabel=None,
legend=None,
xlim=None,
ylim=None,
xscale="linear",
yscale="linear",
fmts=("-", "m--", "g-.", "r:"),
nrows=1,
ncols=1,
figsize=(3.5, 2.5),
):
# 增量地绘制多条线
if legend is None:
legend = []
use_svg_display()
self.fig, self.axes = plt.subplots(nrows, ncols, figsize=figsize)
if nrows * ncols == 1:
self.axes = [
self.axes,
]
# 使用lambda函数捕获参数
self.config_axes = lambda: set_axes(
self.axes[0], xlabel, ylabel, xlim, ylim, xscale, yscale, legend
)
self.X, self.Y, self.fmts = None, None, fmts

def add(self, x, y):
# 向图表中添加多个数据点
if not hasattr(y, "__len__"):
y = [y]
n = len(y)
if not hasattr(x, "__len__"):
x = [x] * n
if not self.X:
self.X = [[] for _ in range(n)]
if not self.Y:
self.Y = [[] for _ in range(n)]
for i, (a, b) in enumerate(zip(x, y)):
if a is not None and b is not None:
self.X[i].append(a)
self.Y[i].append(b)
self.axes[0].cla()
for x, y, fmt in zip(self.X, self.Y, self.fmts):
self.axes[0].plot(x, y, fmt)
self.config_axes()
display.display(self.fig)
display.clear_output(wait=True)

测试

1
2
3
4
5
6
ani = Animator(xlabel="x", ylabel="y", legend=["y1", "y2"], xlim=[-7, 7], ylim=[-7, 7])
x = np.arange(-7, 7, 0.1)

for a in x:
y = [2*np.sin(a), 0.01*a ** 3]
ani.add(a, y)

神经网络训练

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
import torch.nn as nn
import torch

# 计算准确度
def accuracy(y_hat, y): # from d2l
"""计算预测正确的数量, 返回类型和y相同"""
if len(y_hat.shape) > 1 and y_hat.shape[1] > 1: # 当y_hat每个位置表示可能性大小时
y_hat = y_hat.argmax(axis=1)
cmp = y_hat.type(y.dtype) == y
return float(cmp.type(y.dtype).sum())


def evaluate_accuracy(net, data_iter, device): # from d2l
if isinstance(net, torch.nn.Module):
net.eval()
metric = Accumulator(2) # 正确预测数、预测总数
with torch.no_grad():
for X, y in data_iter:
X = X.to(device)
y = y.to(device)
metric.add(accuracy(net(X), y), y.numel()) # numel获取tensor中元素总个数
return metric[0] / metric[1]
# 训练一个epoch
def train_epoch_ch3(net, train_iter, loss, updater, device): # from d2l
"""训练模型一个迭代周期(详见d2l第3章)"""
# 将模型设置为训练模式
if isinstance(net, nn.Module):
net.train()
# 训练损失总和、训练准确度总和、样本数
metric = Accumulator(3)
for X, y in train_iter:
X = X.to(device)
y = y.to(device)
y_hat = net(X)
l = loss(y_hat, y)
if isinstance(updater, torch.optim.Optimizer):
# 使用PyTorch内置的优化器和损失函数
updater.zero_grad()
l.mean().backward()
updater.step()
else:
# 使用定制的优化器和损失函数
l.sum().backward()
updater(X.shape[0])
metric.add(float(l.sum()), accuracy(y_hat, y), y.numel())
# 返回训练损失和训练精度
return metric[0] / metric[2], metric[1] / metric[2]
# 训练全过程
def train_ch3(net, train_iter, test_iter, loss, num_epochs, updater, device): # from d2l
"""训练模型(详见d2l第3章)"""
animator = Animator(
xlabel="epoch",
xlim=[1, num_epochs],
ylim=[0.3, 0.9],
legend=["train loss", "train acc", "test acc"],
)
for epoch in range(num_epochs):
train_metrics = train_epoch_ch3(net, train_iter, loss, updater, device)
test_acc = evaluate_accuracy(net, test_iter, device)
animator.add(epoch + 1, train_metrics + (test_acc,))
train_loss, train_acc = train_metrics
# assert train_loss < 0.5, train_loss
# assert train_acc <= 1 and train_acc > 0.7, train_acc
# assert test_acc <= 1 and test_acc > 0.7, test_acc

数据库操作

mysql

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
import mysql.connector

class database:
# 初始化连接数据库
def __init__(self, ip, port, user, pwd, database) -> None:
self.conn = mysql.connector.connect(
host=ip, port=port, user=user, password=pwd, database=database
)
self.cursor = self.conn.cursor()
# 执行SQL语句并返回结果
def exec(self, cmd: str):
self.cursor.execute(cmd)
result = self.cursor.fetchall()
self.conn.commit()
return result

示例

1
2
3
4
5
6
7
8
db = database(
ip="127.0.0.1",
port=3306,
user="node1",
pwd="114514",
database="mysql",
)
print(db.exec("SHOW DATABASES"))

open-gauss

参考https://opengauss.org/zh/blogs/jingjingwu/01.getting-started-with-python.html

1
pip install psycopg2-binary
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
from psycopg2 import connect


class OpenGaussConnector:
def __init__(self, ip, port, user, pwd, database) -> None:
params = {
'database': database,
'user': user,
'password': pwd,
'host': ip,
'port': port
}
self.conn = connect(**params)

def exec(self, cmd:str):
with self.conn:
with self.conn.cursor() as cursor:
cursor.execute(cmd)
result = cursor.fetchone()
return result

示例

1
2
3
4
db = OpenGaussConnector(ip='127.0.0.1', port=5432, user='superuser', pwd='OGSql@123', database='postgres')
cmd = 'select * from PG_ROLES;'
res = db.exec(cmd)
print(res)

运行结果

1
('superuser', False, True, False, False, False, True, False, False, False, -1, '********', None, None, 'default_pool', 0, None, None, 16384, False, 'n', None, None, None, False, False, False)

SIMD多线程运行器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
# SIMD多线程运行器v3
import multiprocessing
from multiprocessing import Lock
from time import sleep
from tqdm import tqdm


class SIMD_runner:
def __init__(self, main_thread_sleep_time=0.01):
self.sleep_time = main_thread_sleep_time

def run(self, inp_data:list, num_threads:int, worker, logger:Logger=None):
self.N = len(inp_data)
self.num_threads = num_threads
self.mli = multiprocessing.Manager().list() # 主进程与子进程共享这个字典
self.mlres = multiprocessing.Manager().list() # 主进程与子进程共享这个字典
self.inp_data = inp_data
# 互斥锁
self.l = Lock()
self.simd_worker = self.get_worker(worker)
self.logger = logger

jobs = [multiprocessing.Process(target=self.simd_worker, args=(self.mli, self.mlres, self.l, i, *self.inp_data[i])) for i in range(self.N)]
running_job = 0
next_job = 0
for j in jobs:
j.daemon = True # 退出主进程时,子进程也会被自动终止,因为它被设置为守护进程
# 进度条
with tqdm(total=self.N, desc="SIMD_runner") as pbar:
old = 0
while True:
self.l.acquire()
curmli = list(self.mli)
self.l.release()
if len(curmli) > old:
pbar.update(len(curmli) - old)
for i in range(old, len(curmli)):
jobs[curmli[i]].join()
running_job -= 1
old = len(curmli)
if len(curmli) == self.N:
break
while running_job < num_threads and next_job < self.N:
jobs[next_job].start()
running_job += 1
next_job += 1
sleep(self.sleep_time)
mzip = sorted(zip(self.mli, self.mlres))
mli, mlres = zip(*mzip)
return list(mlres)

def get_worker(self, calc):
def SIMD_worker(mli, mlres, l, idx, *data):
res = calc(*data)
l.acquire()
if self.logger is not None:
self.logger.trace(f"idx={idx}, input={data}, output={res}")
mli.append(idx)
mlres.append(res)
l.release()
return SIMD_worker

使用方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
# 定义worker函数,以输入数据为参数,以输出数据为返回值
def worker(x, y):
return x + y, x - y
N = 100
# 以列表形式构造输入数据,列表每个元素为元组,元组长度和worker函数输入参数一致
inp_data = [(i*2, i-3) for i in range(N)]
# 初始化runner
runner = SIMD_runner(main_thread_sleep_time=0.01)
# 每个线程计算完毕后都会借助logger写日志
logger = Logger(output='test.log')
# 启动runner
res = runner.run(inp_data=inp_data, num_threads=4, worker=worker, logger=logger)
# 打印输出,每个元素为worker函数的返回值,按照输入顺序排序
print(res)

效果

1
2
3
> python SIMD_v3.py
SIMD_runner: 100%|█████████████████████████████| 100/100 [00:00<00:00, 266.29it/s]
[(-3, 3), (0, 4), (3, 5), (6, 6), (9, 7), (12, 8), (15, 9), (18, 10), (21, 11), (24, 12), (27, 13), (30, 14), (33, 15), (36, 16), (39, 17), (42, 18), (45, 19), (48, 20), (51, 21), (54, 22), (57, 23), (60, 24), (63, 25), (66, 26), (69, 27), (72, 28), (75, 29), (78, 30), (81, 31), (84, 32), (87, 33), (90, 34), (93, 35), (96, 36), (99, 37), (102, 38), (105, 39), (108, 40), (111, 41), (114, 42), (117, 43), (120, 44), (123, 45), (126, 46), (129, 47), (132, 48), (135, 49), (138, 50), (141, 51), (144, 52), (147, 53), (150, 54), (153, 55), (156, 56), (159, 57), (162, 58), (165, 59), (168, 60), (171, 61), (174, 62), (177, 63), (180, 64), (183, 65), (186, 66), (189, 67), (192, 68), (195, 69), (198, 70), (201, 71), (204, 72), (207, 73), (210, 74), (213, 75), (216, 76), (219, 77), (222, 78), (225, 79), (228, 80), (231, 81), (234, 82), (237, 83), (240, 84), (243, 85), (246, 86), (249, 87), (252, 88), (255, 89), (258, 90), (261, 91), (264, 92), (267, 93), (270, 94), (273, 95), (276, 96), (279, 97), (282, 98), (285, 99), (288, 100), (291, 101), (294, 102)]

python-utils
https://blog.algorithmpark.xyz/2024/10/06/language/python-utils/index/
作者
CJL
发布于
2024年10月6日
更新于
2024年10月28日
许可协议