Lab 8: locks

这一章实验虽然简单,但是要想真正将锁运用自如那是非常难的。

并行与并发:这两个术语是针对任务推进或者进程执行而言的。并行是指在某一时刻多个任务得到推进;而并发是指在某一时间段内多个任务得到推进;并行需要物理多核,而并发是通过时间片轮转的方式在单核上实现的

race condition 的定义:一块内存地址被并行地访问,且至少有一个访问是写操作,此时 race condition 成立

concurrency 的定义:指多条指令流交织的情况。多处理器并行执行,线程切换,中断都可以导致 concurrency

数据结构的不变性(invariants)条件:Data structure invariants are properties that the data structures in a program must satisfy in valid states

​ concurrency 会带来 race condition,导致程序无法正确执行或共享数据结构遭到破坏,因此需要并发控制技术(Concurrency Control Techniques,CCT)来 avoid race condition

​ 使用的最广泛的 CCT 自然就是锁了。锁是容易理解的,锁的作用其实就是保证了 invariants of data struct,但是如果使用不当会导致程序性能下降。

spinlock 与 sleeplock

Memory allocator

​ 本质上,锁的竞争会导致程序性能的下降,而本实验的第一部分就是对 freelist 进行重新设计来避免大量的锁竞争。这一部分实验也启示我们可以根据 lock contention 的情况来判断自己的多线程程序是否在锁的这一方面设计不当。

​ 原来的 xv6 在 freelist 的设计上是这样的:只有一个 freelist 全局数据结构,通过一把锁来实现线程安全,由于 xv6 是 multi-processor 的,在分配内存和释放内存时每个 CPU 都会去尝试获得锁,这就导致了锁的竞争。

​ 我们可以将 freelist 设计成,每一个 CPU 都持有一个 freelist,这样当 CPU 去分配获释放内存时,只需要持有自己的锁而不需要去竞争其他 CPU 的锁,这就大大降低了 lock contention,提高了程序的并发性。但是这里需要考虑一个问题,那就是当某一个 CPU 的 freelist 空了,但其他 CPU 的 freelist 中还是有空闲页的,此时就应该从其他 CPU 的 freelist 中 steal 一个空闲页,这里就要涉及到其他 CPU 的锁了!

​ 由于这一部分代码实现很简单就不做过多介绍了:

1
2
3
4
5
6
7
8
9
10
11
kernel/kalloc.c:

struct cpu {
struct proc *proc; // The process running on this cpu, or null.
struct context context; // swtch() here to enter scheduler().
int noff; // Depth of push_off() nesting.
int intena; // Were interrupts enabled before push_off()?

// my code:
struct kmem *kmem; // 增加一个 kmem 字段,指向 CPU 自己的空闲链表
};
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
kernel/kalloc.c:

struct kmem{
struct spinlock lock;
struct run *freelist;
} kmem[NCPU]; // 为每一个 CPU 都分配一个 空闲列表

void
kinit()
{
for (int i = 0;i < NCPU;++i) { // 为每一把锁都进行初始化
char lockname[] = "kmem_i";
lockname[5] = '0' + i;
cpus[i].kmem = kmem + i;
initlock(&(cpus[i].kmem->lock), lockname);
}
freerange(end, (void*)PHYSTOP);
}

void
kfree(void *pa)
{
struct run *r;

if(((uint64)pa % PGSIZE) != 0 || (char*)pa < end || (uint64)pa >= PHYSTOP)
panic("kfree");

// Fill with junk to catch dangling refs.
memset(pa, 1, PGSIZE);

r = (struct run*)pa;

// my code: // 根据当前 CPU 来对自己的 freelist 进行操作
struct cpu *c = mycpu();
acquire(&c->kmem->lock);
r->next = c->kmem->freelist;
c->kmem->freelist = r;
release(&c->kmem->lock);
}

void *
kalloc(void)
{
struct run *r;

// my code:
struct cpu *c = mycpu();
acquire(&c->kmem->lock);
r = c->kmem->freelist;
if(r)
c->kmem->freelist = r->next;
else { // 去别的 cpu 的freelist上 steal
for (int i = 0;i < NCPU;++i) {
if (&cpus[i] == c) continue; // 跳过自己
acquire(&cpus[i].kmem->lock);
r = cpus[i].kmem->freelist;
if (r) { // 找到空闲的页
cpus[i].kmem->freelist = r->next;
release(&cpus[i].kmem->lock);
break;
}
release(&cpus[i].kmem->lock);
}
}
release(&c->kmem->lock);

if(r)
memset((char*)r, 5, PGSIZE); // fill with junk
return (void*)r;
}

Buffer cache

​ 这一部分也是为了减少锁的竞争,但是是对 Buffer cache 的锁而言的。它就不能像前面对于 freelist 而言只需要为每一个 CPU 都维护一个 freelist 那么简单了。

​ 其实是存在优化空间的,原来的实现是不管对哪一个 Buffer 进行操作,都要进行加锁,我们可以改成 为每一个 Buffer 都有自己对应的锁,具体操作哪一个 Buffer 时对其对应的锁进行加锁。

​ 我们可以使用 哈希表 来做 逻辑块号Buffer 对应锁 之间的映射。因此我们初始化哈希表:

1
2
kernel/bio.c:
struct spinlock bcache_locks[31]; // 最好用质数来减少 hash 冲突。

​ 由于在原来的设计中,LRU 策略是通过双向链表来实现的,对链表的操作相当于要在全局加一把锁来实现原子性以保证 invariants,这样的话又会带来巨大的 lock contention,因此我们将 LRU 策略的实现改一下:在 buf 结构体中增加一个 ticks 变量来记录最近一次 Buffer 使用的时间,在 brelse 中更新这个时间,用于 LRU 策略,因此在缓存替换时,我们只要遍历一遍未使用的 Buffer 中 ticks 最小的那个缓存,将其替换即可。因此我们就不需要双向链表了:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
kernel/bio.c:

struct {
struct spinlock lock;
struct buf buf[NBUF];

// Linked list of all buffers, through prev/next.
// Sorted by how recently the buffer was used.
// head.next is most recent, head.prev is least.
// struct buf head;
} bcache;

kernel/buf.h:

struct buf {
···
// struct buf *prev; // LRU cache list
// struct buf *next;
uchar data[BSIZE];
uint64 ticks; // 用 ticks 来实现 LRU
};

​ 有了这么多锁,自然要初始化他们:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
kernel/bio.c:
void
binit(void)
{
struct buf *b;

for (int i = 0;i < 31;++i) {
char lock_name[] = "bcache0";
lock_name[6] = '0' + i;
initlock(&bcache_locks[i], lock_name);
}

// Create linked list of buffers
// bcache.head.prev = &bcache.head;
// bcache.head.next = &bcache.head;
for(b = bcache.buf; b < bcache.buf+NBUF; b++){
// b->next = bcache.head.next;
// b->prev = &bcache.head;
initsleeplock(&b->lock, "buffer");
// bcache.head.next->prev = b;
// bcache.head.next = b;
}
}

​ 对 bget 和 brelse 进行相应的改动:注意在 LRU 进行缓存替换的时候,一定要持有 bcache 的锁,因为这里只持有 对应逻辑块号的锁是无法保证线程安全的。假设 CPU0 和 CPU1 同时执行到 缓存替换这一行,都看到 Buffer1 是最近最久未使用的 Buffer,它们同时对其进行替换并返回,这就导致了两个文件对应一个 Buffer 的情况了,这肯定是会发生问题的!因此在做 缓存替换时,一定要加上 对 bcache 的锁。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
kernel/bio.c:

static struct buf*
bget(uint dev, uint blockno)
{
struct buf *b;

struct spinlock *l = &bcache_locks[blockno % 31];
acquire(l);

for (int i = 0;i < NBUF;++i) {
b = &bcache.buf[i];
if (b->dev == dev && b->blockno == blockno) {
b->refcnt++;
release(l);
acquiresleep(&b->lock);
return b;
}
}

// Not cached.
// Recycle the least recently used (LRU) unused buffer.
uint min_ticks = 0x0fffffff;
uint idx = 0;
int flag = 0;
acquire(&bcache.lock); // 这里一定要持有一个 bcache 的锁,因为如果此时另一个 CPU 也执行到 缓存替换这一步,就会导致程序执行错误!
for(int i = 0;i < NBUF;++i){
b = &bcache.buf[i];
if(b->refcnt == 0 && b->ticks < min_ticks) {
idx = i;
min_ticks = b->ticks;
flag = 1;
}
}
if (flag == 1) {
b = &bcache.buf[idx];
b->dev = dev;
b->blockno = blockno;
b->valid = 0;
b->refcnt = 1;
release(&bcache.lock);
release(l);
acquiresleep(&b->lock);
return b;
}

panic("bget: no buffers");
}