本文最后更新于 2024-10-14T12:23:00+00:00
python实用模块
阅读本教程,您需要 python 的基础知识
queue
参考文献:CSDN
参考文献:官方文档
queue模块是Python内置的标准模块,模块实现了三种类型的队列,它们的区别仅仅是条目取回的顺序,分别由3个类进行表示,Queue,LifoQueue,PriorityQueue
Queue
maxsize 是个整数,用于设置可以放入队列中的项目数的上限。当达到这个大小的时候,插入操作将阻塞至队列中的项目被消费掉。如果 maxsize 小于等于零,队列尺寸为无限大。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 from queue import Queue q = Queue()for i in range (10 ): q.put(i)print (f"qsize={q.qsize()} " )while not q.empty(): front = q.get() print (front, end=' ' )
1 2 qsize =10 0 1 2 3 4 5 6 7 8 9
LifoQueue
实际上是个栈,用法和队列一样
1 2 3 4 5 6 7 8 9 10 11 12 13 14 from queue import LifoQueue q = LifoQueue()for i in range (10 ): q.put(i)print (f"qsize={q.qsize()} " )while not q.empty(): front = q.get() print (front, end=' ' )
1 2 qsize =10 9 8 7 6 5 4 3 2 1 0
PriorityQueue
优先队列,底层是一个小根堆
1 2 3 4 5 6 7 8 9 10 11 12 from queue import PriorityQueue q = PriorityQueue() data1 = (1 , 'python' ) data2 = (2 , '-' ) data3 = (3 , '100' ) insert_list = (data2, data3, data1)for i in insert_list: q.put(i) for i in range (3 ): print (q.get())
1 2 3 (1 , 'python' ) (2 , '-' ) (3 , '100' )
argparse
参考文献
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 import argparse parser = argparse.ArgumentParser(description="可写可不写,只是在命令行参数出现错误的时候,随着错误信息打印出来。" ) parser.add_argument('-gf' , '--girlfriend' , choices=['cy' ]) parser.add_argument('--house' , type =int , default=0 ) parser.add_argument('food' ) args = parser.parse_args() print ('------args---------' ,args)print ('-------gf-------' , args.girlfriend)
假设上述文件的名字是gf.py
在命令行运行时,则有以下几种情况:
python gf.py potato
,这种方式会将potato赋值给food
python gf.py -gf cy tomato
,该种方式会将cy赋值给girlfriend,tomato赋值给food
python gf.py --house 2 chicken
,该种方式会将house赋值为2,chicken赋值为food
add_argument()
add_argument() 方法定义如何解析命令行参数
每个参数解释如下:
name or flags - 普通参数或flag参数选项参数的名称或标签,例如 foo 或者 -f, --foo。Flag参数不需要指定参数值,只需要带有参数名即可。
action - 命令行遇到flags参数时的动作。有两个常见的动作,store_true:设定flag参数为true;store_false:设定flag参数为False。
nargs - 应该读取的命令行参数个数,可以是具体的数字,或者是?号,当不指定值时对于 Positional argument 使用 default,对于 Optional argument 使用 const;或者是 * 号,表示 0 或多个参数;或者是 + 号表示 1 或多个参数。
default - 不指定参数时该参数的默认值。
type - 命令行参数应该被转换成的数据类型。
required - 是否为必选参数或可选参数。
help - 参数的帮助信息,当指定为 argparse.SUPPRESS 时表示不显示该参数的帮助信息.
metavar - 在 usage 说明中的参数名称,对于必选参数,默认就是参数名称,对于可选参数默认是全大写的参数名称。
dest - 解析后的参数名称,默认情况下,对于可选参数选取最长的名称,中划线转换为下划线.
choices - 参数可允许的值的一个容器。
const - action 和 nargs 所需要的常量值。
store_const,表示赋值为const;
append,将遇到的值存储成列表,也就是如果参数重复则会保存多个值;
append_const,将参数规范中定义的一个值保存到一个列表;
count,存储遇到的次数;此外,也可以继承 argparse.Action 自定义参数解析;
os
参考https://blog.csdn.net/qq_52007481/article/details/126918076
socket通信
需求:帮我写两个python文件,分别是server.py ,运行在192.168.88.101,另一个是client.py ,运行在192.168.88.102。server.py先开始运行,监听自己的5001端口,随后client.py会开始运行,主动连接server.py ,连接成功后server.py会给client.py发消息,client.py收到消息后会给server.py发信息,然后执行A函数
server.py
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 import socketimport threading passwd = '114514' server = socket.socket(socket.AF_INET, socket.SOCK_STREAM) server.bind(('192.168.88.101' , 5001 )) server.listen()def handle_client (client,address ): message = client.recv(1024 ).decode() print (f"Received from {address} : {message} " ) if message != passwd: print ("Wrong passwd!" ) reply = "Wrong Passwd" client.send(reply.encode()) client.close() return reply = "Hello, this is server." client.send(reply.encode()) print (f"Sent to client: {reply} " ) client.close()while True : client, address = server.accept() print (f"Connected with {address} " ) thread = threading.Thread(target=handle_client, args=(client, address)) thread.start()
client.py
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 import socketimport time passwd = '114514' client = socket.socket(socket.AF_INET, socket.SOCK_STREAM) client.connect(('192.168.88.101' , 5001 ))print ("Connected to server" ) message = passwd client.send(message.encode())print (f"Sent to server: {message} " ) reply = client.recv(1024 ).decode()if reply == "Wrong Passwd" : print ("Failed to connect: Wrong Passwd" )else : print (f"Received from server: {reply} " ) client.close()
文件传输
当然,我很愿意帮助您编写这两个 Python 文件!以下是您所需的 server.py
和 client.py
的基本框架。
首先,让我们从 server.py
开始。这个文件将运行在 192.168.88.101
上,它将监听来自 client.py
的文件传输请求并保存接收到的文件。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 import osimport socketdef receive_file (client_socket, filename ): try : with open (filename, 'wb' ) as file: while True : data = client_socket.recv(1024 ) if not data: break file.write(data) print (f"Received {filename} successfully." ) except Exception as e: print (f"Error receiving {filename} : {e} " )def main (): host = '192.168.88.101' port = 12345 server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) server_socket.bind((host, port)) server_socket.listen(1 ) print (f"Server listening on {host} :{port} " ) while True : client_socket, addr = server_socket.accept() print (f"Connection from {addr[0 ]} :{addr[1 ]} " ) filename = client_socket.recv(1024 ).decode() receive_file(client_socket, f"receive/{filename} " ) client_socket.close()if __name__ == '__main__' : if not os.path.exists('receive' ): os.makedirs('receive' ) main()
接下来是 client.py
,它将运行在 192.168.88.102
上,负责监听 send
文件夹并将新出现的文件发送给 server.py
。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 import osimport socketimport timedef send_file (server_ip, filename ): try : client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) client_socket.connect((server_ip, 12345 )) client_socket.send(filename.encode()) with open (filename, 'rb' ) as file: while True : data = file.read(1024 ) if not data: break client_socket.send(data) print (f"Sent {filename} to server." ) client_socket.close() except Exception as e: print (f"Error sending {filename} : {e} " )def main (): server_ip = '192.168.88.101' while True : time.sleep(1 ) for root, _, files in os.walk('send' ): for filename in files: filepath = os.path.join(root, filename) send_file(server_ip, filepath)if __name__ == '__main__' : if not os.path.exists('send' ): os.makedirs('send' ) main()
请注意,您需要在运行这些脚本之前创建 send
和 receive
文件夹,以便存储传输的文件。此外,您可以根据实际需求调整代码中的参数和逻辑。
祝您编写愉快!如果您有其他问题或需要进一步的帮助,请随时告知。🚀
watchdog监听文件夹
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 from watchdog.events import FileSystemEventHandlerfrom watchdog.observers import Observer WATCH_PATH = 'send' class FileMonitorHandler (FileSystemEventHandler ): def __init__ (self, **kwargs ): super (FileMonitorHandler, self).__init__(**kwargs) self._watch_path = WATCH_PATH def on_created (self, event ): if not event.is_directory: file_path = event.src_path print ("文件创建: %s " % file_path)if __name__ == "__main__" : event_handler = FileMonitorHandler() observer = Observer() observer.schedule(event_handler, path=WATCH_PATH, recursive=True ) observer.start() observer.join()
当send文件夹中出现新文件时,会执行on_created函数
tqdm:漂亮的进度条
实例代码:
1 2 3 4 from tqdm import tqdmfrom time import sleepfor i in tqdm(range (10000 )): sleep(0.01 )
效果:
1 2 $ python test1.py 16% |███▎ | 1600/10000 [00:16<01:25,98.10it/s]
sys.profile性能分析
sys.setprofile
是 Python 标准库中的一个函数,用于设置全局的性能分析(profiling)函数。这个函数允许你指定一个回调函数,当 Python 解释器调用每一个函数时,该回调函数将被调用。这对于调试和性能分析非常有用。
当你调用 sys.setprofile
并传递一个回调函数时,每当 Python 解释器进入或离开一个函数时,这个回调函数就会被调用。回调函数通常接受三个参数:frame
,event
和 arg
。
frame
:当前执行的栈帧对象,包含关于当前执行状态的信息。
event
:一个字符串,表示发生的事件类型,如 “call”、“return”、“exception” 等。
arg
:事件的附加参数,如函数返回值或异常对象。
示例:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 import sysdef profile_func (frame, event, arg ): if frame.f_code.co_name != "test_function" : return if event == 'call' : print (f"Calling function: {frame.f_code.co_name} " ) elif event == 'return' : print (f"Returning from function: {frame.f_code.co_name} " )def test_function (): print ("Inside test_function" ) sys.setprofile(profile_func) test_function() sys.setprofile(None )
输出
1 2 3 Calling function : test_function Inside test_function Returning from function : test_function
ollama
详见https://github.com/ollama/ollama-python
可以使用python调用ollama的API
示例
1 2 3 4 5 6 7 8 9 from ollama import Client client = Client(host="http://127.0.0.1:11434" ) stream = client.chat( model="qwen2:latest" , messages=[{"role" : "user" , "content" : "原神是什么" }], stream=True , )for chunk in stream: print (chunk["message" ]["content" ], end="" , flush=True )
1 2 3 4 5 《原神》是由上海米哈游信息科技有限公司自主研发的开源动作角色扮演游戏,于2020年9月28 日发行。该游戏讲述了提瓦特大陆的七个神性国家之一的蒙德和璃月,在一次灾难事件后如何在“神”与“人”的协助下重建家园的故事。 游戏的核心是采用开放世界探险的游戏模式,并提供丰富多样的角色供玩家选择,每个角色都有独特的技能和属性。游戏中有大量任务、副本、活动等待玩家去探索和完成,同时也有丰富的社交元素,允许玩家之间组队合作或互相竞技。 《原神》通过精美的画面、动听的音乐以及引人入胜的故事剧情吸引了全球众多玩家的喜爱。游戏不仅在国内市场取得巨大成功,在海外也收获了广泛的好评和用户基础。
获取可用模型
multiprocessing多线程库
SIMD多线程示例程序
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 import multiprocessingfrom multiprocessing import Lockimport randomfrom time import sleepfrom tqdm import tqdmdef worker (mli, mlres, l, i, a ): ans = a**2 l.acquire() mli.append(i) mlres.append(ans) l.release() if __name__ == '__main__' : mli = multiprocessing.Manager().list () mlres = multiprocessing.Manager().list () N = 3000 num_threads = 8 a = [random.randint(0 , N) for _ in range (N)] l = Lock() jobs = [multiprocessing.Process(target=worker, args=(mli, mlres, l, i, a[i])) for i in range (N)] running_job = 0 next_job = 0 for j in jobs: j.daemon = True with tqdm(total=N, desc="Monitoring Progress" ) as pbar: old = 0 while True : l.acquire() curmli = list (mli) l.release() if len (curmli) > old: pbar.update(len (curmli) - old) for i in range (old, len (curmli)): jobs[curmli[i]].join() running_job -= 1 old = len (curmli) if len (curmli) == N: break while running_job < num_threads and next_job < N: jobs[next_job].start() running_job += 1 next_job += 1 sleep(0.01 ) print ('Results:' ) mzip = zip (mli, mlres) mzip = sorted (mzip) mli, mlres = zip (*mzip) print (list (mlres))
未完待续