YSOS-rust lab5

YSOS-lab5

1. 实验要求

  1. 了解 fork 的实现原理,实现 fork 系统调用。
  2. 了解并发与锁机制的概念,实现基于操作系统的自旋锁、信号量。
  3. 编写基于 fork 的并发程序,并测试自旋锁、信号量的正确性。

2. 实验过程

milestone1

完成关键代码部分的[实现fork](# 实现fork)之后,可以实现milestone1

启动YSOS,运行[fork测试程序](# fork测试程序),结果如下

image-20240512182216261

milestone2

多线程计数器

实现[自旋锁](# 实现自旋锁)和[信号量](# 实现信号量)之后,可以运行[counter程序](# counter测试程序)检验

image-20240513193726001

image-20240513194237177

image-20240513194535575

消息队列

完成[消息队列](# 实现消息队列)的代码后,启动该程序

  • 在每个线程生产或消费的时候,输出相关的信息。

如下图,截取了部分输出

image-20240513223125053

  • 最终使用父进程等待全部的子进程退出后,输出消息队列的消息数量。

image-20240513223245299

  • 在父进程创建完成 16 个进程后,使用 sys_stat 输出当前的全部进程的信息。

image-20240513223519327

  • 在从队列取出消息时,消息为空吗?
  • 在向队列写入消息时,队列是否满了?

image-20240513222910316

代码中使用了两个panic,当出现上述两种情况时终止程序,确保使用队列的安全性。

哲学家的晚饭

完成[哲学家进餐](# 实现哲学家的晚饭)的代码后,启动该程序

按照如图的方式进行编号

image-20240516110821472

使用函数1

一种会造成死锁的方法

image-20240516112004743

每个哲学家都拿起一根筷子,形成了循环等待

使用函数2

要求奇数号哲学家先拿左边的筷子,然后再拿右边的筷子,而偶数号哲学家刚好相反。不存在死锁和饥饿

image-20240516112729398

观察各个哲学家吃的次数,基本是一致的,调度是公平的

使用函数3

要求哲学家必须按照筷子编号从小到大拿筷子,是不公平的调度方案

image-20240516114833813

所以哲学家吃100次,观察各个哲学家吃的次数

哲学家0和4的次数明显少于哲学家1、2、3,可以认为分配对于哲学家0和4是不公平的

一种可能的解释是,在第一次拿筷子的时候,哲学家1,2,3不会面临竞争,而0和4需要竞争筷子0

使用函数4

使用服务生协调,不存在死锁和饥饿

image-20240516115509815

观察各个哲学家吃的次数,基本是一致的,调度是公平的

milestone3:进程的阻塞与唤醒

这个任务是GZ后来加的,本来应该是加分项

代码实现详见 [实现waitpid](# 实现waitpid)

要求:🤔 参考信号量相关系统调用的实现,尝试修改 waitpid 系统调用,在进程等待另一个进程退出时进行阻塞,并在目标进程退出后携带返回值唤醒进程。

期望:尝试在 Shell 中启动另一个 Shell,然后在其中利用 ps 打印进程信息:前一个 Shell 应当被阻塞 (Blocked),直到后一个 Shell 退出。

实现效果:

image-20240525134531355

思考题

1.在 Lab 2 中设计输入缓冲区时,如果不使用无锁队列实现,而选择使用 Mutex 对一个同步队列进行保护,在编写相关函数时需要注意什么问题?考虑在进行 pop 操作过程中遇到串口输入中断的情形,尝试描述遇到问题的场景,并提出解决方案。

应注意不能用同一个 Mutex 保护push和pop操作

如果在pop的过程中遇到中断,因为在执行pop的时候,已经使用 Mutex 上锁,串口输入中断在执行往队列添加元素时无法获取锁,而且不能被打断,会造成死锁。

一种可能的解决方案是,让push和pop使用不同的 Mutex ,因为即使执行pop被打断执行push操作,也不会出现在队列为空时pop的安全问题


2.在进行 fork 的复制内存的过程中,系统的当前页表、进程页表、子进程页表、内核页表等之间的关系是怎样的?在进行内存复制时,需要注意哪些问题?

创建新进程的时候,会在内核页表的基础上克隆一个新的页表,而不会共用页表,进程修改自己的页表不会影响到内核或其他进程

在fork复制内存的时候,子进程会和父进程共用页表。

fork复制内存过程中,正在使用的是调用fork的父进程的页表

在进行内存复制的时候,需要使用正确的地址,确保将父进程的整个已经映射的栈复制给子进程


3.为什么在实验的实现中,fork 系统调用必须在任何 Rust 内存分配(堆内存分配)之前进行?如果在堆内存分配之后进行 fork,会有什么问题?

在用户程序中添加下面的代码

1
2
3
4
5
6
let pid;
{
let s = String::from("114514");
pid = sys_fork();
println!("{}",s);
}

结果如下

image-20240518220048085

代码中,s在堆上开辟了内存,按照rust的设计,s离开作用域后,所在的堆内存会被自动free,因为fork之后并没有拷贝堆内存,导致s所在堆内存被free两次,触发 Bad free panic


4.进行原子操作时候的 Ordering 参数是什么?此处 Rust 声明的内容与 C++20 规范 中的一致,尝试搜索并简单了解相关内容,简单介绍该枚举的每个值对应于什么含义。

为了保证现代处理器在多线程环境下经过指令重排后的程序的正确性

memory order 作用
memory_order_relaxed 最弱的内存顺序,无fencing作用,没有任何顺序要求,cpu和编译器可以重排指令。原子操作可以以任意顺序执行,并且不保证与其他操作的顺序关系。这种顺序提供了最高的并发性,但可能导致读取操作看到过期的值或写入操作覆盖其他线程的写入结果。
memory_order_consume 消费顺序要求在当前线程中,所有后续依赖于该原子操作的读操作都必须在原子操作完成后执行。这种顺序确保了读操作之间的依赖关系,并且对于写操作或其他线程中的读操作没有顺序要求。
memory_order_acquire 获取顺序要求在当前线程中,所有后续的读操作都必须在原子操作完成后执行。这种顺序确保了当前线程对原子操作的读取操作不会被重新排序到原子操作之前,但对于写操作或其他线程中的读操作没有顺序要求。
memory_order_release 在这条指令执行前的对内存的读写指令都执行完毕,这条语句之后的对内存的修改指令不能超越这条指令优先执行。这像一道栅栏。
memory_order_acq_rel 是memory_order_acquire和memory_order_release的合并,这条语句前后的语句都不能被reorder。
memory_order_seq_cst 比memory_order_acq_rel更加严格的顺序保证,memory_order_seq_cst执行完毕后,所有其它cpu都是确保可以看到之前修改的最新数据的。如果前面的几个memory order模式允许有缓冲存在的话,memory_order_seq_cst指令执行后则保证真正写入内存。一个普通的读就可以看到由memory_order_seq_cst修改的数据,而memory_order_acquire则需要由memory_order_release配合才能看到,否则什么时候一个普通的load能看到memory_order_release修改的数据是不保证的。

5.在实现 SpinLock 的时候,为什么需要实现 Sync trait?类似的 Send trait 又是什么含义?

Sync trait 用于标记类型可以安全地在线程之间共享。如果一个类型实现了 Sync,那么多个线程可以同时访问该类型的实例,而不会引发数据竞争。这句代码的作用是将 SpinLock 标记为可以安全地在线程之间共享,以便在多线程程序中使用自旋锁。

Send trait 的含义是:一个类型可以安全地从一个线程移动到另一个线程。


6.core::hint::spin_loop 使用的 pause 指令和 Lab 4 中的 x86_64::instructions::hlt 指令有什么区别?这里为什么不能使用 hlt 指令?

x86_64::instructions::hlt 是特权指令,在用户态程序中不可使用

spin_loop()x86_64 架构下会被编译成 _mm_pause()

这条指令告诉处理器当前执行流在一个自旋锁上,这能够提升处理器性能、降低功耗,详见英特尔文档

加分项


1.🤔 尝试实现如下用户程序任务,完成用户程序 fish

  • 创建三个子进程,让它们分别能输出且只能输出 ><_
  • 使用学到的方法对这些子进程进行同步,使得打印出的序列总是 <><_><>_ 的组合。

在完成这一任务的基础上,其他细节可以自行决定如何实现,包括输出长度等。

[添加fish用户程序](# 加分项2-fish) 之后,运行,可以看到打印出的🐟

image-20240519110458723

可以看到,所有🐟的打印是正确的,并且为了让🐟的指向具有随机性,代码中引入了sleep随机时长的操作


2.🤔 尝试和前文不同的其他方法解决哲学家就餐问题,并验证你的方法能够正确解决它,简要介绍你的方法,并给出程序代码和测试结果。

采用了三种方法解决哲学家进餐问题,分别是

  1. 要求奇数号哲学家先拿左边的筷子,然后再拿右边的筷子,而偶数号哲学家刚好相反
  2. 要求哲学家必须按照筷子编号从小到大拿筷子
  3. 使用服务生协调

这三种方式都能避免死锁的发生,测试结果见[实验过程](# 哲学家的晚饭)

3. 关键代码

实现fork

内核处理fork

pkg/kernel/src/proc/mod.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
pub fn fork(context: &mut ProcessContext) {
x86_64::instructions::interrupts::without_interrupts(|| {
let manager = get_process_manager();
// FIXME: save_current as parent
let parent_pid = manager.current().pid();
manager.save_current(context);
// FIXME: fork to get child
manager.fork();
// FIXME: push to child & parent to ready queue
manager.push_ready(parent_pid);
// FIXME: switch to next process
manager.switch_next(context);
})
}

pkg/kernel/src/proc/manager.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
impl ProcessManager {
pub fn fork(&self) {
// FIXME: get current process
let current = self.current();
// FIXME: fork to get child
let child = current.fork();
// FIXME: add child to process list
let child_pid = child.pid();
self.add_proc(child_pid, child);
self.push_ready(child_pid);
// FOR DBG: maybe print the process ready queue?
debug!("Ready queue: {:?}", self.ready_queue.lock());
}
}

pkg/kernel/src/proc/process.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
impl Process {
pub fn fork(self: &Arc<Self>) -> Arc<Self> {
// FIXME: lock inner as write
let mut inner = self.inner.write();
// FIXME: inner fork with parent weak ref
let child_inner = inner.fork(Arc::downgrade(self));
let child_pid = ProcessId::new();
// FOR DBG: maybe print the child process info
// e.g. parent, name, pid, etc.
debug!("fork: parrent is {}#{}, child is {}#{}.", inner.name, self.pid, child_inner.name, child_pid);
// FIXME: make the arc of child
let child = Arc::new(Self {
pid: child_pid,
inner: Arc::new(RwLock::new(child_inner)),
});
// FIXME: add child to current process's children list
inner.children.push(child.clone());
// FIXME: set fork ret value for parent with `context.set_rax`
inner.context.set_rax(child.pid.0 as usize);

// FIXME: mark the child as ready & return it
inner.pause(); // !!!!!!!! actually, mark the parent as ready here
child
}
}

impl ProcessInner {
pub fn fork(&mut self, parent: Weak<Process>) -> ProcessInner {
// FIXME: get current process's stack info
let stack_info = self.stack_segment.unwrap();
// FIXME: clone the process data struct
let child_proc_data = self.proc_data.clone().unwrap();
// FIXME: clone the page table context (see instructions)
let child_page_table = self.page_table.as_ref().unwrap().fork();
// FIXME: alloc & map new stack for child (see instructions)
let frame_allocator = &mut *get_frame_alloc_for_sure();
let mapper = &mut self.page_table.as_ref().unwrap().mapper();
let parent_stack_base = stack_info.start.start_address().as_u64();
let parent_stack_top = stack_info.end.start_address().as_u64();
let mut child_stack_base = parent_stack_base - (self.children.len() as u64 + 1)* STACK_MAX_SIZE;
while elf::map_range(child_stack_base, stack_info.count() as u64, mapper, frame_allocator, true, true).is_err(){
trace!("Map thread stack to {:#x} failed.", child_stack_base);
child_stack_base -= STACK_MAX_SIZE;
};
debug!("map child stack to {:#x} succeed", child_stack_base);
// FIXME: copy the *entire stack* from parent to child
ProcessInner::clone_range(parent_stack_base, child_stack_base, stack_info.count());
//debug!("finished clone range");
// FIXME: update child's context with new *stack pointer*
// > update child's stack to new base
let mut child_context = self.context;
child_context.set_stack_offset(child_stack_base - parent_stack_base);
// > keep lower bits of *rsp*, update the higher bits
// > also update the stack record in process data
let mut child_proc_data = self.proc_data.clone().unwrap();
let child_stack_top = child_stack_base + stack_info.count() as u64 * Size4KiB::SIZE;
let child_stack = Page::range(
Page::containing_address(VirtAddr::new_truncate(child_stack_base)),
Page::containing_address(VirtAddr::new_truncate(child_stack_top))
);
child_proc_data.stack_segment = Some(child_stack);
child_proc_data.set_max_stack(VirtAddr::new(child_stack_top-STACK_MAX_SIZE), STACK_MAX_PAGES);
// FIXME: set the return value 0 for child with `context.set_rax`
child_context.set_rax(0);
// FIXME: construct the child process inner
Self {
name: self.name.clone(),
parent: Some(parent),
children: Vec::new(),
ticks_passed: 0,
status: ProgramStatus::Ready,
exit_code: None,
context: child_context,
page_table: Some(child_page_table),
proc_data: Some(child_proc_data)
}
// NOTE: return inner because there's no pid record in inner
}
/// Clone a range of memory
///
/// - `src_addr`: the address of the source memory
/// - `dest_addr`: the address of the target memory
/// - `size`: the count of pages to be cloned
fn clone_range(src_addr: u64, dest_addr: u64, size: usize) {
trace!("Clone range: {:#x} -> {:#x}", src_addr, dest_addr);
unsafe {
copy_nonoverlapping::<u8>(
src_addr as *mut u8,
dest_addr as *mut u8,
size * Size4KiB::SIZE as usize,
);
}
}
}

pkg/kernel/src/proc/context.rs

1
2
3
4
#[inline]
pub fn set_stack_offset(&mut self, offset: u64) {
self.value.stack_frame.stack_pointer += offset;
}

pkg/kernel/src/proc/paging.rs

1
2
3
4
5
6
7
8
9
10
pub fn using_count(&self) -> usize {
Arc::strong_count(&self.reg)
}

pub fn fork(&self) -> Self {
// forked process shares the page table
Self {
reg: self.reg.clone(),
}
}

系统调用支持fork

pkg/syscall/src/lib.rs,设置fork的系统调用号为58

pkg/kernel/src/interrupt/syscall/mod.rs

1
2
3
Syscall::Fork => { /* FIXME: fork process */
sys_fork(context)
},

pkg/kernel/src/interrupt/syscall/service.rs

1
2
3
pub fn sys_fork(context: &mut ProcessContext){
proc::fork(context);
}

从shell启动fork

pkg/app/shell/src/main.rs

1
2
3
4
5
match line.trim() {
// ....
"fork" => {sys_wait_pid(sys_spawn("fork"));},
// ....
}

fork测试程序

pkg/app/fork/src/main.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
#![no_std]
#![no_main]

extern crate alloc;
extern crate lib;

use lib::*;

static mut M: u64 = 0xdeadbeef;

fn main() -> isize {
let mut c = 32;

let pid = sys_fork();

if pid == 0 {
println!("I am the child process");

assert_eq!(c, 32);

unsafe {
println!("child read value of M: {:#x}", M);
M = 0x2333;
println!("child changed the value of M: {:#x}", M);
}

c += 32;
} else {
println!("I am the parent process");

sys_stat();

assert_eq!(c, 32);

println!("Waiting for child to exit...");

let ret = sys_wait_pid(pid);

println!("Child exited with status {}", ret);

assert_eq!(ret, 64);

unsafe {
println!("parent read value of M: {:#x}", M);
assert_eq!(M, 0x2333);
}

c += 1024;

assert_eq!(c, 1056);
}

c
}

entry!(main);

==至此,可实现milestone1==

实现自旋锁

pkg/lib/src/sync.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
pub struct SpinLock {
bolt: AtomicBool,
}

impl SpinLock {
pub const fn new() -> Self {
Self {
bolt: AtomicBool::new(false),
}
}

pub fn acquire(&self) {
// FIXME: acquire the lock, spin if the lock is not available
while self.bolt.compare_exchange(false, true, Ordering::Acquire, Ordering::Relaxed).is_err() {
spin_loop();
}
}

pub fn release(&self) {
// FIXME: release the lock
self.bolt.store(false, Ordering::Release);
}
}

实现信号量

注册信号量的系统调用号为66

用户侧

pkg/lib/src/sync.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord)]
pub struct Semaphore {
/* FIXME: record the sem key */
key: u32
}

impl Semaphore {
pub const fn new(key: u32) -> Self {
Semaphore { key }
}

#[inline(always)]
pub fn init(&self, value: usize) -> bool {
sys_new_sem(self.key, value) == 1
}

/* FIXME: other functions with syscall... */
pub fn remove(&self) -> bool{
sys_del_sem(self.key) == 1
}

pub fn signal(&self){
//self::print!("signal is calledin pkg/lib/src/sync.rs, line 56");
sys_signal(self.key);
}

pub fn wait(&self){
sys_wait(self.key);
}
}

unsafe impl Sync for Semaphore {}

pkg/lib/src/syscall.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
#[inline(always)]
pub fn sys_new_sem(key: u32, value: usize) -> usize {
syscall!(Syscall::Sem, 0, key, value)
}
#[inline(always)]
pub fn sys_del_sem(key: u32) -> usize {
syscall!(Syscall::Sem, 1, key)
}
#[inline(always)]
pub fn sys_signal(key: u32) -> usize{
syscall!(Syscall::Sem, 2, key)
}
#[inline(always)]
pub fn sys_wait(key: u32) -> usize{
syscall!(Syscall::Sem, 3, key)
}

内核侧

pkg/kernel/src/interrupt/syscall/service.rs

1
2
3
4
5
6
7
8
9
pub fn sys_sem(args: &SyscallArgs, context: &mut ProcessContext){
match args.arg0 {
0 => context.set_rax(new_sem(args.arg1 as u32, args.arg2)),
1 => context.set_rax(remove_sem(args.arg1 as u32)),
2 => sem_signal(args.arg1 as u32, context),
3 => sem_wait(args.arg1 as u32, context),
_ => context.set_rax(usize::MAX),
}
}

pkg/kernel/src/interrupt/mod.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
pub fn new_sem(key:u32, value:usize)-> usize{
x86_64::instructions::interrupts::without_interrupts(|| {
get_process_manager().current().write().new_sem(key, value) as usize
})
}

pub fn remove_sem(key:u32)-> usize{
x86_64::instructions::interrupts::without_interrupts(|| {
get_process_manager().current().write().remove_sem(key) as usize
})
}

pub fn sem_signal(key:u32, context: &mut ProcessContext){
//debug!("signal is called");
x86_64::instructions::interrupts::without_interrupts(|| {
let manager = get_process_manager();
match manager.current().write().sem_signal(key) {
SemaphoreResult::Ok => {
context.set_rax(0);
},
SemaphoreResult::NotExist => {
context.set_rax(1);
},
SemaphoreResult::WakeUp(pid) => {
manager.wake_up(pid);
}
SemaphoreResult::Block(pid)=>{
manager.block(pid);
error!("pid = {} is blocked by signal, which is a fatal bug. Plz report this bug to CJL", pid);
}
}
})
}

pub fn sem_wait(key:u32, context: &mut ProcessContext){
//debug!("wait is called");
x86_64::instructions::interrupts::without_interrupts(|| {
let manager = get_process_manager();
let pid = manager.current().pid();
let ret = manager.current().write().sem_wait(key, pid);
match ret {
SemaphoreResult::Ok => {
//debug!("is ok");
context.set_rax(0);
},
SemaphoreResult::NotExist => {
context.set_rax(1);
},
SemaphoreResult::WakeUp(pid) => {
manager.wake_up(pid);
error!("pid = {} is waked by wait, which is a fatal bug. Plz report this bug to CJL", pid);
}
SemaphoreResult::Block(pid)=>{
//debug!("is blocked");
manager.save_current(context);
manager.block(pid);
manager.switch_next(context);
}
}
})
}

pkg/kernel/src/proc/manager.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
impl ProcessManager {
pub fn wake_up(&self, pid: ProcessId) {
if let Some(process) = self.get_proc(&pid) {
process.write().pause();
self.push_ready(pid);
}
}

pub fn block(&self, pid: ProcessId) {
if let Some(process) = self.get_proc(&pid) {
process.write().block();
}
}
}

pkg/kernel/src/proc/data.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
impl ProcessData {
pub fn new_sem(&mut self, key:u32, value:usize) -> bool{
self.semaphores.write().insert(key, value)
}

pub fn remove_sem(&mut self, key:u32) -> bool{
self.semaphores.write().remove(key)
}

pub fn sem_signal(&mut self, key:u32) -> SemaphoreResult{
self.semaphores.write().signal(key)
}

pub fn sem_wait(&mut self, key:u32, pid: ProcessId) -> SemaphoreResult{
self.semaphores.write().wait(key, pid)
}
}

pkg/kernel/src/proc/sync.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
impl Semaphore {
/// Create a new semaphore
pub fn new(value: usize) -> Self {
Self {
count: value,
wait_queue: VecDeque::new(),
}
}

/// Wait the semaphore (acquire/down/proberen)
///
/// if the count is 0, then push the process into the wait queue
/// else decrease the count and return Ok
pub fn wait(&mut self, pid: ProcessId) -> SemaphoreResult {
// FIXME: if the count is 0, then push pid into the wait queue
// return Block(pid)
if self.count == 0 {
self.wait_queue.push_back(pid);
return SemaphoreResult::Block(pid);
}else{
self.count -= 1;
return SemaphoreResult::Ok;
}
// FIXME: else decrease the count and return Ok
}

/// Signal the semaphore (release/up/verhogen)
///
/// if the wait queue is not empty, then pop a process from the wait queue
/// else increase the count
pub fn signal(&mut self) -> SemaphoreResult {
// FIXME: if the wait queue is not empty
// pop a process from the wait queue
// return WakeUp(pid)
// FIXME: else increase the count and return Ok
if !self.wait_queue.is_empty() {
let pid = self.wait_queue.pop_front().unwrap();
return SemaphoreResult::WakeUp(pid);
} else{
self.count += 1;
return SemaphoreResult::Ok;
}
}
}

#[derive(Debug, Default)]
pub struct SemaphoreSet {
sems: BTreeMap<SemaphoreId, Mutex<Semaphore>>,
}

impl SemaphoreSet {
pub fn insert(&mut self, key: u32, value: usize) -> bool {
trace!("Sem Insert: <{:#x}>{}", key, value);

// FIXME: insert a new semaphore into the sems
// use `insert(/* ... */).is_none()`
self.sems.insert(SemaphoreId::new(key), Mutex::new(Semaphore::new(value))).is_none()
}

pub fn remove(&mut self, key: u32) -> bool {
trace!("Sem Remove: <{:#x}>", key);

// FIXME: remove the semaphore from the sems
// use `remove(/* ... */).is_some()`
self.sems.remove(&SemaphoreId::new(key)).is_some()
}

/// Wait the semaphore (acquire/down/proberen)
pub fn wait(&self, key: u32, pid: ProcessId) -> SemaphoreResult {
let sid = SemaphoreId::new(key);

// FIXME: try get the semaphore from the sems
// then do it's operation
match self.sems.get(&sid) {
Some(sem) => {
let mut sem = sem.lock();
sem.wait(pid)
},
None => SemaphoreResult::NotExist,
}
// FIXME: return NotExist if the semaphore is not exist
}

/// Signal the semaphore (release/up/verhogen)
pub fn signal(&self, key: u32) -> SemaphoreResult {
let sid = SemaphoreId::new(key);

// FIXME: try get the semaphore from the sems
// then do it's operation
match self.sems.get(&sid) {
Some(sem)=>{
let mut sem = sem.lock();
sem.signal()
},
None => SemaphoreResult::NotExist,
}
// FIXME: return NotExist if the semaphore is not exist
}
}

counter测试程序

从shell启动counter

pkg/app/shell/src/main.rs

1
2
3
4
5
match line.trim() {
// ....
"counter" => {sys_wait_pid(sys_spawn("counter"));},
// ....
}

counter程序

pkg/app/counter/src/main.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
#![no_std]
#![no_main]

use lib::{sync::{Semaphore, SpinLock}, *};

extern crate lib;

const THREAD_COUNT: usize = 8;
static mut COUNTER: isize = 0;
static LOCK: SpinLock = SpinLock::new();
static SEMA: Semaphore = Semaphore::new(0);

fn test_spin(){
let mut pids = [0u16; THREAD_COUNT];

for i in 0..THREAD_COUNT {
let pid = sys_fork();
if pid == 0 {
do_counter_inc_spin();
sys_exit(0);
} else {
pids[i] = pid; // only parent knows child's pid
}
}

let cpid = sys_get_pid();
println!("process #{} holds threads: {:?}", cpid, &pids);
sys_stat();

for i in 0..THREAD_COUNT {
println!("#{} waiting for #{}...", cpid, pids[i]);
sys_wait_pid(pids[i]);
}

println!("COUNTER result: {}", unsafe { COUNTER });
}

fn test_semaphore(){
let mut pids = [0u16; THREAD_COUNT];
let ret = SEMA.init(1);
//print!("ret = {}", ret);
for i in 0..THREAD_COUNT {
let pid = sys_fork();
if pid == 0 {
do_counter_inc_sema();
sys_exit(0);
} else {
pids[i] = pid; // only parent knows child's pid
}
}

let cpid = sys_get_pid();
println!("process #{} holds threads: {:?}", cpid, &pids);
sys_stat();

for i in 0..THREAD_COUNT {
println!("#{} waiting for #{}...", cpid, pids[i]);
sys_wait_pid(pids[i]);
}

println!("COUNTER result: {}", unsafe { COUNTER });
}

fn main() -> isize {
let pid = sys_fork();

if pid == 0 {
print!("\x1b[32m test semaphore begin now\n\x1b[0m");
test_semaphore();
print!("\x1b[32m test semaphore end\n\x1b[0m");
} else {
sys_wait_pid(pid);
print!("\x1b[32m test spin begin now\n\x1b[0m");
unsafe{
COUNTER = 0;
}
test_spin();
print!("\x1b[32m test spin end\n\x1b[0m");
}
0
}

fn do_counter_inc_spin() {
for _ in 0..100 {
// FIXME: protect the critical section
LOCK.acquire();
inc_counter();
LOCK.release();
}
}
fn do_counter_inc_sema() {
for _ in 0..100 {
// FIXME: protect the critical section
SEMA.wait();
//self::print!("after wait");
inc_counter();
//self::print!("before signal");
SEMA.signal();
}
}

/// Increment the counter
///
/// this function simulate a critical section by delay
/// DO NOT MODIFY THIS FUNCTION
fn inc_counter() {
unsafe {
delay();
let mut val = COUNTER;
delay();
val += 1;
delay();
COUNTER = val;
}
}

#[inline(never)]
#[no_mangle]
fn delay() {
for _ in 0..0x100 {
core::hint::spin_loop();
}
}

entry!(main);

实现消息队列

pkg/app/mq/src/main.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
#![no_std]
#![no_main]

use lib::{sync::Semaphore, *};
extern crate lib;
static MUTEX: Semaphore = Semaphore::new(1);
static EMPTY: Semaphore = Semaphore::new(2);
static FULL: Semaphore = Semaphore::new(3);
static MAKE_PRODUCER_COMPLETE: Semaphore = Semaphore::new(4);
static COMSUMER_SHOW_COMPLETE: Semaphore = Semaphore::new(5);
const QUEUE_LEN: usize = 5;
static mut QUEUE: [u8; QUEUE_LEN] = [0; QUEUE_LEN];
static mut QUEUE_HEAD: usize = 0;
const WORKER_NUM: usize = 8;

fn main() -> isize {
MUTEX.init(1);
EMPTY.init(QUEUE_LEN);
FULL.init(0);
MAKE_PRODUCER_COMPLETE.init(0);
COMSUMER_SHOW_COMPLETE.init(0);
let pid = sys_fork();
if pid == 0{ // child
make_producer();
}else{ // parent
make_consumer();
sys_wait_pid(pid);
unsafe{
print!("QUEUE_HEAD = {}\n", QUEUE_HEAD);
}
COMSUMER_SHOW_COMPLETE.remove();
MAKE_PRODUCER_COMPLETE.remove();
FULL.remove();
EMPTY.remove();
MUTEX.remove();
}

0
}
entry!(main);

fn make_producer(){
let mut pids: [u16; WORKER_NUM] = [0u16; WORKER_NUM];
for i in 0..WORKER_NUM{
let pid = sys_fork();
if pid == 0{
producer();
sys_exit(0);
}
pids[i] = pid;
}
MAKE_PRODUCER_COMPLETE.signal();
COMSUMER_SHOW_COMPLETE.wait();
let cpid = sys_get_pid();
for i in 0..WORKER_NUM {
println!("#{} waiting for #{}...", cpid, pids[i]);
sys_wait_pid(pids[i]);
}
sys_exit(0);
}

fn producer(){
for i in 0..10{
EMPTY.wait();
MUTEX.wait();
print!("push {} in queue\n", i);
unsafe{
if QUEUE_HEAD >= QUEUE_LEN{
panic!("queue overflow");
}
QUEUE[QUEUE_HEAD] = i;
QUEUE_HEAD += 1;
}
MUTEX.signal();
FULL.signal();
}
}

fn make_consumer(){
let mut pids: [u16; WORKER_NUM] = [0u16; WORKER_NUM];
for i in 0..WORKER_NUM{
let pid = sys_fork();
if pid == 0{
consumer();
sys_exit(0);
}
pids[i] = pid;
}
MAKE_PRODUCER_COMPLETE.wait();
sys_stat();
COMSUMER_SHOW_COMPLETE.signal();
let cpid = sys_get_pid();
for i in 0..WORKER_NUM {
println!("#{} waiting for #{}...", cpid, pids[i]);
sys_wait_pid(pids[i]);
}
}
fn consumer(){
for _i in 0..10{
FULL.wait();
MUTEX.wait();
unsafe{
if QUEUE_HEAD == 0{
panic!("read when queue empty")
}
QUEUE_HEAD -= 1;
let data = QUEUE[QUEUE_HEAD];
print!("read {} from queue\n", data);
}
MUTEX.signal();
EMPTY.signal();
}
}

实现哲学家的晚饭

pkg/app/dinner/src/main.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
#![no_std]
#![no_main]


use lib::{sync::Semaphore, *};

extern crate lib;
const PHI_NUM: usize = 5;
static CHOPSTICK: [Semaphore; 5] = semaphore_array![0, 1, 2, 3, 4];
static S1: Semaphore = Semaphore::new(5);
static S2: Semaphore = Semaphore::new(6);
static mut PHI_COUNT: [i32; PHI_NUM] = [0; PHI_NUM];
fn main() -> isize {
let help = "help: \n函数1:常规解法,会造成死锁\n函数2:要求奇数号哲学家先拿左边的筷子,然后再拿右边的筷子,而偶数号哲学家刚好相反。不存在死锁和饥饿\n函数3,要求哲学家必须按照筷子编号从小到大拿筷子,会造成不公平\n函数4:使用服务生协调,不存在死锁和饥饿";
let stdin1 = stdin();
println!("请选择使用的函数");
println!("{}",help);
let s = stdin1.read_line();
let s = s.trim();
match s{
"1" => {
println!("函数1:常规解法,会造成死锁");
}
"2" => {
println!("函数2:要求奇数号哲学家先拿左边的筷子,然后再拿右边的筷子,而偶数号哲学家刚好相反。不存在死锁和饥饿");
}
"3" => {
println!("函数3,要求哲学家必须按照筷子编号从小到大拿筷子,会造成不公平");
},
"4" => {
println!("函数4:使用服务生协调,不存在死锁和饥饿");
}
_ => {
println!("invalid input");
println!("{}",help);
return 0;
}
};
//初始化信号量
for i in 0..PHI_NUM{
CHOPSTICK[i].init(1);
}
S1.init(1);
S2.init(1);
let mut pids: [u16; PHI_NUM] = [0u16; PHI_NUM];
for i in 0..PHI_NUM{
let pid = sys_fork();
if pid == 0{
if s == "1"{
philosopher1(i);
}else if s == "2"{
philosopher2(i);
}else if s == "3"{
philosopher3(i);
}else if s == "4"{
philosopher4(i);
}else{
panic!("s should be 1, 2 or 3");
}
sys_exit(0);
}
pids[i] = pid;
}
let cpid = sys_get_pid();
for i in 0..PHI_NUM {
//println!("#{} waiting for #{}...", cpid, pids[i]);
sys_wait_pid(pids[i]);
}
//销毁信号量
for i in 0..PHI_NUM{
CHOPSTICK[i].remove();
}
S1.remove();
S2.remove();
0
}
const SLEEP_TIME: i64 = 2;
// 函数1,常规解法,会造成死锁
fn philosopher1(i: usize){
let c1 = i;
let c2 = (i + 1) % PHI_NUM;
for _a in 0..20{
//thinking
CHOPSTICK[c1].wait();
println!("Philosopher {} get chopstick {}", i, c1);
sleep(SLEEP_TIME);
CHOPSTICK[c2].wait();
println!("Philosopher {} get chopstick {}", i, c2);
sleep(SLEEP_TIME);
//eating
println!("\x1b[32mPhilosopher {} is eating\x1b[0m", i);
CHOPSTICK[c1].signal();
println!("Philosopher {} release chopstick {}", i, c1);
sleep(SLEEP_TIME);
CHOPSTICK[c2].signal();
println!("Philosopher {} release chopstick {}", i, c2);
}
}
// 函数2,要求奇数号哲学家先拿左边的筷子,然后再拿右边的筷子,而偶数号哲学家刚好相反。不存在死锁和饥饿
fn philosopher2(i: usize){
let mut c1 = i;
let mut c2 = (i + 1) % PHI_NUM;
for _a in 0..20{
//thinking
if i % 2 == 0 {
c1 = c1 ^ c2;
c2 = c1 ^ c2;
c1 = c1 ^ c2;
}
CHOPSTICK[c1].wait();
println!("Philosopher {} get chopstick {}", i, c1);
sleep(SLEEP_TIME);
CHOPSTICK[c2].wait();
println!("Philosopher {} get chopstick {}", i, c2);
sleep(SLEEP_TIME);
//eating
unsafe{
PHI_COUNT[i] += 1;
println!("\x1b[32mPhilosopher {} is eating, he has eaten {} times.\x1b[0m", i, PHI_COUNT[i]);
}
CHOPSTICK[c1].signal();
println!("Philosopher {} release chopstick {}", i, c1);
sleep(SLEEP_TIME);
CHOPSTICK[c2].signal();
println!("Philosopher {} release chopstick {}", i, c2);
}
}

// 函数3,要求哲学家必须按照筷子编号从小到大拿筷子,会造成不公平
fn philosopher3(i: usize){
let mut c1 = i;
let mut c2 = (i + 1) % PHI_NUM;
for _a in 0..100{
//thinking
if c1 > c2 {
c1 = c1 ^ c2;
c2 = c1 ^ c2;
c1 = c1 ^ c2;
}
CHOPSTICK[c1].wait();
//println!("Philosopher {} get chopstick {}", i, c1);
sleep(SLEEP_TIME);
CHOPSTICK[c2].wait();
//println!("Philosopher {} get chopstick {}", i, c2);
sleep(SLEEP_TIME);
//eating
unsafe{
PHI_COUNT[i] += 1;
println!("\x1b[32mPhilosopher {} is eating, he has eaten {} times.\x1b[0m", i, PHI_COUNT[i]);
}

CHOPSTICK[c1].signal();
//println!("Philosopher {} release chopstick {}", i, c1);
sleep(SLEEP_TIME);
CHOPSTICK[c2].signal();
//println!("Philosopher {} release chopstick {}", i, c2);
}
}

//函数4:使用服务生协调,不存在死锁和饥饿
static mut CHO_USED: [bool; 5] = [false; 5];

fn ask_server_for_cho(i: usize) -> bool{
let c1 = i;
let c2 = (i + 1) % PHI_NUM;
unsafe{
if !CHO_USED[c1] && !CHO_USED[c2] {
CHO_USED[c1] = true;
CHO_USED[c2] = true;
return true;
}else{
return false;
}
}
}
fn release_cho(i: usize){
let c1 = i;
let c2 = (i + 1) % PHI_NUM;
unsafe{
CHO_USED[c1] = false;
CHO_USED[c2] = false;
}
}
fn philosopher4(i: usize){
let mut c1 = i;
let mut c2 = (i + 1) % PHI_NUM;
for _a in 0..30{
sleep(SLEEP_TIME);
//thinking
loop{
S1.wait();
let ret = ask_server_for_cho(i);
S1.signal();
if ret{
break;
}
}
// eating
println!("Philosopher {} get chopstick {}", i, c1);
println!("Philosopher {} get chopstick {}", i, c2);
unsafe{
PHI_COUNT[i] += 1;
println!("\x1b[32mPhilosopher {} is eating, he has eaten {} times.\x1b[0m", i, PHI_COUNT[i]);
}
println!("Philosopher {} release chopstick {}", i, c1);
println!("Philosopher {} release chopstick {}", i, c2);
S2.wait();
release_cho(i);
S2.signal();

}
}

entry!(main);

加分项2-fish

pkg/app/fish/src/main.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
#![no_std]
#![no_main]
use rand::prelude::*;
use rand_chacha::ChaCha20Rng;
use lib::{sync::Semaphore, *};

extern crate lib;
const WORKER_NUM: usize = 3;
static S: [Semaphore; 8] = semaphore_array![0, 1, 2, 3, 4, 5, 6, 7];
static mut FIRST: u8 =0;
fn main() -> isize {
for i in 0..8{
if i != 1{
S[i].init(0);
}else{
S[i].init(1);
}
}
let mut pids: [u16; WORKER_NUM] = [0u16; WORKER_NUM];
for i in 0..WORKER_NUM{
let pid = sys_fork();
if pid == 0{
match i {
0 => f1(),
1 => f2(),
2 => f3(),
_ => panic!("error"),
};
sys_exit(0);
}
pids[i] = pid;
}
for i in 0..WORKER_NUM {
//println!("#{} waiting for #{}...", cpid, pids[i]);
sys_wait_pid(pids[i]);
}
for i in 0..8{
S[i].remove();
}
0
}
entry!(main);

const TIMES: u8 = 50;

fn f1(){//>
let s = ">";
let time = lib::sys_time();
let mut rng = ChaCha20Rng::seed_from_u64(time.timestamp() as u64);
for _ in 0..TIMES{

sleep((rng.gen::<u64>() % 10).try_into().unwrap());
S[1].wait();
unsafe{
if FIRST == 0{
FIRST = 1;
}
}
print!("{}",s);
S[1].signal();
S[3].signal();
S[2].wait();
unsafe{
if FIRST == 2{
S[4].wait();
S[7].signal();
}else{
print!("{}",s);
S[4].signal();
S[7].wait();
S[5].signal();
}
}
S[6].wait();
}
}
fn f2(){//<
let s = "<";
let time = lib::sys_time();
let mut rng = ChaCha20Rng::seed_from_u64(time.timestamp() as u64);
for _ in 0..TIMES{
sleep((rng.gen::<u64>() % 10).try_into().unwrap());
S[1].wait();
unsafe{
if FIRST == 0{
FIRST = 2;
}
}
print!("{}",s);
S[1].signal();
S[2].signal();
S[3].wait();
unsafe{
if FIRST == 2{
print!("{}",s);
S[4].signal();
S[7].wait();
S[5].signal();
}else{
S[4].wait();
S[7].signal();
}
}
S[0].wait();
}
}
fn f3(){//_
let s = "_";
for _ in 0..TIMES{
S[5].wait();
print!("{}", s);
unsafe{
FIRST = 0;
}
S[6].signal();
S[0].signal();
}
}

实现waitpid

pkg/kernel/src/interrupt/syscall/service.rs

1
2
3
pub fn wait_service_pid(args: &SyscallArgs, context: &mut ProcessContext){
proc::wait_pid(ProcessId(args.arg0.try_into().unwrap()), context)
}

pkg/kernel/src/proc/mod.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
pub fn wait_pid(pid: ProcessId, context: &mut ProcessContext) {
x86_64::instructions::interrupts::without_interrupts(|| {
let manager = get_process_manager();
if let Some(ret) = manager.get_exit_code(&pid) {
context.set_rax(ret as usize);
} else {
manager.wait_pid(pid);
manager.save_current(context);
manager.current().write().block();
manager.switch_next(context);
}
})
}

pkg/kernel/src/proc/manager.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
pub struct ProcessManager {
//...
wait_queue: Mutex<BTreeMap<ProcessId, BTreeSet<ProcessId>>>,
}
impl ProcessManager {
//...
pub fn kill(&self, pid: ProcessId, ret: isize) {
//...
if let Some(pids) = self.wait_queue.lock().remove(&pid) {
for pid in pids {
self.wait_pid_wake_up(pid, Some(ret));
}
}
//...
}
//...
pub fn wait_pid(&self, pid: ProcessId) {
let mut wait_queue = self.wait_queue.lock();
// FIXME: push the current process to the wait queue
// `processor::current_pid()` is waiting for `pid`
let k = wait_queue.entry(pid).or_insert(BTreeSet::new());
k.insert(self.current().pid());
}
/// Wake up the process with the given pid
///
/// If `ret` is `Some`, set the return value of the process
pub fn wait_pid_wake_up(&self, pid: ProcessId, ret: Option<isize>) {
if let Some(proc) = self.get_proc(&pid) {
let mut inner = proc.write();
if let Some(ret) = ret {
// FIXME: set the return value of the process
// like `context.set_rax(ret as usize)`
inner.set_rax(ret as usize);
}
// FIXME: set the process as ready
inner.pause();
// FIXME: push to ready queue
self.push_ready(pid);
}
}
}

用户库侧:

pkg/lib/src/syscall.rs

1
2
3
4
5
6
7
#[inline(always)]
pub fn sys_wait_pid(pid: u16) -> isize {
// FIXME: try to get the return value for process
// loop until the process is finished
let status: isize = syscall!(Syscall::WaitPid, pid as u64) as isize;
status
}

完成后可实现[milestone3:进程的阻塞与唤醒](# milestone3:进程的阻塞与唤醒)

4. 实验结果

见[多线程计数器](# 多线程计数器)

5. 总结

了解了信号量和自旋锁的底层实现原理,并且掌握如何在用户程序中实现同步和互斥


YSOS-rust lab5
https://blog.algorithmpark.xyz/2024/05/25/YSOS/lab5/index/
作者
CJL
发布于
2024年5月25日
更新于
2024年7月15日
许可协议