python-modules

python实用模块

阅读本教程,您需要 python 的基础知识

queue

参考文献:CSDN

参考文献:官方文档

queue模块是Python内置的标准模块,模块实现了三种类型的队列,它们的区别仅仅是条目取回的顺序,分别由3个类进行表示,Queue,LifoQueue,PriorityQueue

Queue

maxsize 是个整数,用于设置可以放入队列中的项目数的上限。当达到这个大小的时候,插入操作将阻塞至队列中的项目被消费掉。如果 maxsize 小于等于零,队列尺寸为无限大。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
from queue import Queue

# 初始化队列, maxsize默认为0
q = Queue()
# put方法用于入队
for i in range(10):
q.put(i)
# qsize方法返回队列长度
print(f"qsize={q.qsize()}")
# empty方法判断队列是否为空
while not q.empty():
# get方法用于取出队头元素
front = q.get()
print(front, end=' ')
1
2
qsize=10
0 1 2 3 4 5 6 7 8 9

LifoQueue

实际上是个栈,用法和队列一样

1
2
3
4
5
6
7
8
9
10
11
12
13
14
from queue import LifoQueue

# 初始化栈, maxsize默认为0
q = LifoQueue()
# put方法用于入栈
for i in range(10):
q.put(i)
# qsize方法返回栈长度
print(f"qsize={q.qsize()}")
# empty方法判断栈是否为空
while not q.empty():
# get方法用于取出栈顶元素
front = q.get()
print(front, end=' ')
1
2
qsize=10
9 8 7 6 5 4 3 2 1 0

PriorityQueue

优先队列,底层是一个小根堆

1
2
3
4
5
6
7
8
9
10
11
12
from queue import PriorityQueue

# 初始化优先队列, maxsize默认为0
q = PriorityQueue()
data1 = (1, 'python')
data2 = (2, '-')
data3 = (3, '100')
insert_list = (data2, data3, data1)
for i in insert_list:
q.put(i) # 在队列中依次插入元素 data2、data3、data1
for i in range(3):
print(q.get()) # 依次从队列中取出插入的元素,数据元素输出顺序为 data1、data2、data3
1
2
3
(1, 'python')
(2, '-')
(3, '100')

argparse

参考文献

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
import argparse

# 1.创建解释器
parser = argparse.ArgumentParser(description="可写可不写,只是在命令行参数出现错误的时候,随着错误信息打印出来。")
# 2.添加需要的参数
parser.add_argument('-gf', '--girlfriend', choices=['cy'])
# 参数解释
# -gf 代表短选项,在命令行输入-gf和--girlfriend的效果是一样的,作用是简化参数输入
#--girlfriend 代表完整的参数名称,可以尽量做到让人见名知意,需要注意的是如果想通过解析后的参数取出该值,必须使用带--的名称
# choices 代表输入参数的只能是这个choices里面的内容,其他内容则会保错
parser.add_argument('--house', type=int, default=0)
# 参数解释
# --house 代表参数名称
# type 代表输入的参数类型,从命令行输入的参数,默认是字符串类型
# default 代表如果该参数不输入,则会默认使用该值
parser.add_argument('food')
# 参数解释
# 该种方式则要求必须输入该参数
# 输入该参数不需要指定参数名称,指定反而报错,解释器会自动将输入的参数赋值给food

# 3.进行参数解析
args = parser.parse_args()
print('------args---------',args)
print('-------gf-------', args.girlfriend)

假设上述文件的名字是gf.py
在命令行运行时,则有以下几种情况:

  • python gf.py potato ,这种方式会将potato赋值给food
  • python gf.py -gf cy tomato,该种方式会将cy赋值给girlfriend,tomato赋值给food
  • python gf.py --house 2 chicken,该种方式会将house赋值为2,chicken赋值为food

add_argument()

add_argument() 方法定义如何解析命令行参数

每个参数解释如下:

name or flags - 普通参数或flag参数选项参数的名称或标签,例如 foo 或者 -f, --foo。Flag参数不需要指定参数值,只需要带有参数名即可。

action - 命令行遇到flags参数时的动作。有两个常见的动作,store_true:设定flag参数为true;store_false:设定flag参数为False。

nargs - 应该读取的命令行参数个数,可以是具体的数字,或者是?号,当不指定值时对于 Positional argument 使用 default,对于 Optional argument 使用 const;或者是 * 号,表示 0 或多个参数;或者是 + 号表示 1 或多个参数。

default - 不指定参数时该参数的默认值。

type - 命令行参数应该被转换成的数据类型。

required - 是否为必选参数或可选参数。

help - 参数的帮助信息,当指定为 argparse.SUPPRESS 时表示不显示该参数的帮助信息.

metavar - 在 usage 说明中的参数名称,对于必选参数,默认就是参数名称,对于可选参数默认是全大写的参数名称。

dest - 解析后的参数名称,默认情况下,对于可选参数选取最长的名称,中划线转换为下划线.

choices - 参数可允许的值的一个容器。

const - action 和 nargs 所需要的常量值。

store_const,表示赋值为const;

append,将遇到的值存储成列表,也就是如果参数重复则会保存多个值;

append_const,将参数规范中定义的一个值保存到一个列表;

count,存储遇到的次数;此外,也可以继承 argparse.Action 自定义参数解析;

os

参考https://blog.csdn.net/qq_52007481/article/details/126918076

socket通信

需求:帮我写两个python文件,分别是server.py,运行在192.168.88.101,另一个是client.py,运行在192.168.88.102。server.py先开始运行,监听自己的5001端口,随后client.py会开始运行,主动连接server.py,连接成功后server.py会给client.py发消息,client.py收到消息后会给server.py发信息,然后执行A函数

server.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
# server.py
import socket
import threading

passwd = '114514'
# 创建一个socket对象
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)

# 绑定本机IP和端口
server.bind(('192.168.88.101', 5001))

# 开始监听
server.listen()

# 定义一个处理客户端连接的函数
def handle_client(client,address):
# 接收客户端发来的消息
message = client.recv(1024).decode()
print(f"Received from {address}: {message}")
# 校验密码
if message != passwd:
print("Wrong passwd!")
reply = "Wrong Passwd"
client.send(reply.encode())
client.close()
return
# 给客户端回复消息
reply = "Hello, this is server."
client.send(reply.encode())
print(f"Sent to client: {reply}")

# 调用A函数


# 关闭连接
client.close()

# 无限循环等待客户端连接
while True:
# 接受一个客户端连接
client, address = server.accept()
print(f"Connected with {address}")

# 创建一个线程来处理客户端连接
thread = threading.Thread(target=handle_client, args=(client, address))
thread.start()

client.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
# client.py
import socket
import time

# 密码
passwd = '114514'
# 创建一个socket对象
client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)

# 连接服务器
client.connect(('192.168.88.101', 5001))
print("Connected to server")

# 给服务器发送消息
message = passwd
client.send(message.encode())
print(f"Sent to server: {message}")

# 接收服务器回复的消息
reply = client.recv(1024).decode()
if reply == "Wrong Passwd":
print("Failed to connect: Wrong Passwd")
else:
print(f"Received from server: {reply}")

# 关闭连接
client.close()

文件传输

当然,我很愿意帮助您编写这两个 Python 文件!以下是您所需的 server.pyclient.py 的基本框架。

首先,让我们从 server.py 开始。这个文件将运行在 192.168.88.101 上,它将监听来自 client.py 的文件传输请求并保存接收到的文件。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
# server.py

import os
import socket

def receive_file(client_socket, filename):
try:
with open(filename, 'wb') as file:
while True:
data = client_socket.recv(1024)
if not data:
break
file.write(data)
print(f"Received {filename} successfully.")
except Exception as e:
print(f"Error receiving {filename}: {e}")

def main():
host = '192.168.88.101'
port = 12345

server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server_socket.bind((host, port))
server_socket.listen(1)

print(f"Server listening on {host}:{port}")

while True:
client_socket, addr = server_socket.accept()
print(f"Connection from {addr[0]}:{addr[1]}")

filename = client_socket.recv(1024).decode()
receive_file(client_socket, f"receive/{filename}")

client_socket.close()

if __name__ == '__main__':
if not os.path.exists('receive'):
os.makedirs('receive')
main()

接下来是 client.py,它将运行在 192.168.88.102 上,负责监听 send 文件夹并将新出现的文件发送给 server.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
# client.py

import os
import socket
import time

def send_file(server_ip, filename):
try:
client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
client_socket.connect((server_ip, 12345))

client_socket.send(filename.encode())

with open(filename, 'rb') as file:
while True:
data = file.read(1024)
if not data:
break
client_socket.send(data)

print(f"Sent {filename} to server.")
client_socket.close()
except Exception as e:
print(f"Error sending {filename}: {e}")

def main():
server_ip = '192.168.88.101'

while True:
time.sleep(1) # Adjust the interval as needed
for root, _, files in os.walk('send'):
for filename in files:
filepath = os.path.join(root, filename)
send_file(server_ip, filepath)

if __name__ == '__main__':
if not os.path.exists('send'):
os.makedirs('send')
main()

请注意,您需要在运行这些脚本之前创建 sendreceive 文件夹,以便存储传输的文件。此外,您可以根据实际需求调整代码中的参数和逻辑。

祝您编写愉快!如果您有其他问题或需要进一步的帮助,请随时告知。🚀

watchdog监听文件夹

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
#! /usr/bin/env python
# -*- coding: utf-8 -*-
#from __future__ import print_function

from watchdog.events import FileSystemEventHandler
from watchdog.observers import Observer


WATCH_PATH = 'send' # 监控目录

class FileMonitorHandler(FileSystemEventHandler):
def __init__(self, **kwargs):
super(FileMonitorHandler, self).__init__(**kwargs)
# 监控目录
self._watch_path = WATCH_PATH



# 重写文件创建函数,文件改变都会触发文件夹变化
def on_created(self, event):
if not event.is_directory: # 文件创建都会触发文件夹变化
file_path = event.src_path
print("文件创建: %s " % file_path)



if __name__ == "__main__":
event_handler = FileMonitorHandler()
observer = Observer()
observer.schedule(event_handler, path=WATCH_PATH, recursive=True) # recursive递归的
observer.start()
observer.join()

当send文件夹中出现新文件时,会执行on_created函数

tqdm:漂亮的进度条

1
pip install tqdm

实例代码:

1
2
3
4
from tqdm import tqdm
from time import sleep
for i in tqdm(range(10000)):
sleep(0.01)

效果:

1
2
$ python test1.py 
16%|███▎ | 1600/10000 [00:16<01:25,98.10it/s]

sys.profile性能分析

sys.setprofile 是 Python 标准库中的一个函数,用于设置全局的性能分析(profiling)函数。这个函数允许你指定一个回调函数,当 Python 解释器调用每一个函数时,该回调函数将被调用。这对于调试和性能分析非常有用。

当你调用 sys.setprofile 并传递一个回调函数时,每当 Python 解释器进入或离开一个函数时,这个回调函数就会被调用。回调函数通常接受三个参数:frameeventarg

  • frame:当前执行的栈帧对象,包含关于当前执行状态的信息。
  • event:一个字符串,表示发生的事件类型,如 “call”、“return”、“exception” 等。
  • arg:事件的附加参数,如函数返回值或异常对象。

示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
import sys

def profile_func(frame, event, arg):
if frame.f_code.co_name != "test_function":
# print(f"Profiling function: {frame.f_code.co_name}")
return
if event == 'call':
print(f"Calling function: {frame.f_code.co_name}")
elif event == 'return':
print(f"Returning from function: {frame.f_code.co_name}")

def test_function():
print("Inside test_function")

sys.setprofile(profile_func)

test_function()

sys.setprofile(None) # 停止 profiling

输出

1
2
3
Calling function: test_function
Inside test_function
Returning from function: test_function

ollama

详见https://github.com/ollama/ollama-python

可以使用python调用ollama的API

示例

1
2
3
4
5
6
7
8
9
from ollama import Client
client = Client(host="http://127.0.0.1:11434")
stream = client.chat(
model="qwen2:latest",
messages=[{"role": "user", "content": "原神是什么"}],
stream=True,
)
for chunk in stream:
print(chunk["message"]["content"], end="", flush=True)
1
2
3
4
5
《原神》是由上海米哈游信息科技有限公司自主研发的开源动作角色扮演游戏,于2020年9月28日发行。该游戏讲述了提瓦特大陆的七个神性国家之一的蒙德和璃月,在一次灾难事件后如何在“神”与“人”的协助下重建家园的故事。

游戏的核心是采用开放世界探险的游戏模式,并提供丰富多样的角色供玩家选择,每个角色都有独特的技能和属性。游戏中有大量任务、副本、活动等待玩家去探索和完成,同时也有丰富的社交元素,允许玩家之间组队合作或互相竞技。

《原神》通过精美的画面、动听的音乐以及引人入胜的故事剧情吸引了全球众多玩家的喜爱。游戏不仅在国内市场取得巨大成功,在海外也收获了广泛的好评和用户基础。

获取可用模型

1
client.list()

multiprocessing多线程库

SIMD多线程示例程序

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
# SIMD多线程运行器
import multiprocessing
from multiprocessing import Lock
import random
from time import sleep
from tqdm import tqdm

def worker(mli, mlres, l, i, a):
# sleep(5)
ans = a**2
l.acquire()
mli.append(i)
mlres.append(ans)
l.release()

if __name__ == '__main__':
mli = multiprocessing.Manager().list() # 主进程与子进程共享这个字典
mlres = multiprocessing.Manager().list() # 主进程与子进程共享这个字典
N = 3000 # 数据数量
num_threads = 8 # 线程数量
a = [random.randint(0, N) for _ in range(N)]
l = Lock()
jobs = [multiprocessing.Process(target=worker, args=(mli, mlres, l, i, a[i])) for i in range(N)]
running_job = 0
next_job = 0
for j in jobs:
j.daemon = True # 退出主进程时,子进程也会被自动终止,因为它被设置为守护进程

# 进度条
with tqdm(total=N, desc="Monitoring Progress") as pbar:
old = 0
while True:
l.acquire()
curmli = list(mli)
l.release()
if len(curmli) > old:
pbar.update(len(curmli) - old)
for i in range(old, len(curmli)):
jobs[curmli[i]].join()
running_job -= 1
old = len(curmli)
if len(curmli) == N:
break
while running_job < num_threads and next_job < N:
jobs[next_job].start()
running_job += 1
next_job += 1
sleep(0.01)

print('Results:')

# sort
mzip = zip(mli, mlres)
mzip = sorted(mzip)
mli, mlres = zip(*mzip)
print(list(mlres))

未完待续


python-modules
https://blog.algorithmpark.xyz/2024/02/15/language/python-modules/index/
作者
CJL
发布于
2024年2月15日
更新于
2024年10月14日
许可协议