Lab 8: locks 这一章实验虽然简单,但是要想真正将锁运用自如那是非常难的。
并行与并发:这两个术语是针对任务推进或者进程执行而言的。并行是指在某一时刻 多个任务得到推进;而并发是指在某一时间段内 多个任务得到推进;并行需要物理多核,而并发是通过时间片轮转的方式在单核上实现的
race condition 的定义:一块内存地址被并行地访问,且至少有一个访问是写操作,此时 race condition 成立
concurrency 的定义:指多条指令流交织的情况。多处理器并行执行,线程切换,中断都可以导致 concurrency
数据结构的不变性(invariants)条件:Data structure invariants are properties that the data structures in a program must satisfy in valid states
concurrency 会带来 race condition,导致程序无法正确执行或共享数据结构遭到破坏,因此需要并发控制技术(Concurrency Control Techniques,CCT)来 avoid race condition
使用的最广泛的 CCT 自然就是锁了。锁是容易理解的,锁的作用其实就是保证了 invariants of data struct,但是如果使用不当会导致程序性能下降。
spinlock 与 sleeplock Memory allocator 本质上,锁的竞争会导致程序性能的下降,而本实验的第一部分就是对 freelist 进行重新设计来避免大量的锁竞争。这一部分实验也启示我们可以根据 lock contention 的情况来判断自己的多线程程序是否在锁的这一方面设计不当。
原来的 xv6 在 freelist 的设计上是这样的:只有一个 freelist 全局数据结构,通过一把锁来实现线程安全,由于 xv6 是 multi-processor 的,在分配内存和释放内存时每个 CPU 都会去尝试获得锁,这就导致了锁的竞争。
我们可以将 freelist 设计成,每一个 CPU 都持有一个 freelist,这样当 CPU 去分配获释放内存时,只需要持有自己的锁而不需要去竞争其他 CPU 的锁,这就大大降低了 lock contention,提高了程序的并发性。但是这里需要考虑一个问题,那就是当某一个 CPU 的 freelist 空了,但其他 CPU 的 freelist 中还是有空闲页的,此时就应该从其他 CPU 的 freelist 中 steal 一个空闲页,这里就要涉及到其他 CPU 的锁了!
由于这一部分代码实现很简单就不做过多介绍了:
1 2 3 4 5 6 7 8 9 10 11 kernel/kalloc.c: struct cpu { struct proc *proc ; struct context context ; int noff; int intena; struct kmem *kmem ; };
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 kernel/kalloc.c: struct kmem { struct spinlock lock ; struct run *freelist ; } kmem[NCPU]; void kinit() { for (int i = 0 ;i < NCPU;++i) { char lockname[] = "kmem_i" ; lockname[5 ] = '0' + i; cpus[i].kmem = kmem + i; initlock(&(cpus[i].kmem->lock), lockname); } freerange(end, (void *)PHYSTOP); } void kfree(void *pa) { struct run *r ; if (((uint64)pa % PGSIZE) != 0 || (char *)pa < end || (uint64)pa >= PHYSTOP) panic("kfree" ); memset (pa, 1 , PGSIZE); r = (struct run*)pa; struct cpu *c = mycpu(); acquire(&c->kmem->lock); r->next = c->kmem->freelist; c->kmem->freelist = r; release(&c->kmem->lock); } void *kalloc(void ) { struct run *r ; struct cpu *c = mycpu(); acquire(&c->kmem->lock); r = c->kmem->freelist; if (r) c->kmem->freelist = r->next; else { for (int i = 0 ;i < NCPU;++i) { if (&cpus[i] == c) continue ; acquire(&cpus[i].kmem->lock); r = cpus[i].kmem->freelist; if (r) { cpus[i].kmem->freelist = r->next; release(&cpus[i].kmem->lock); break ; } release(&cpus[i].kmem->lock); } } release(&c->kmem->lock); if (r) memset ((char *)r, 5 , PGSIZE); return (void *)r; }
Buffer cache 这一部分也是为了减少锁的竞争,但是是对 Buffer cache 的锁而言的。它就不能像前面对于 freelist 而言只需要为每一个 CPU 都维护一个 freelist 那么简单了。
其实是存在优化空间的,原来的实现是不管对哪一个 Buffer 进行操作,都要进行加锁,我们可以改成 为每一个 Buffer 都有自己对应的锁,具体操作哪一个 Buffer 时对其对应的锁进行加锁。
我们可以使用 哈希表 来做 逻辑块号 到 Buffer 对应锁 之间的映射。因此我们初始化哈希表:
1 2 kernel/bio.c: struct spinlock bcache_locks [31];
由于在原来的设计中,LRU 策略是通过双向链表来实现的,对链表的操作相当于要在全局加一把锁来实现原子性以保证 invariants,这样的话又会带来巨大的 lock contention,因此我们将 LRU 策略的实现改一下:在 buf 结构体中增加一个 ticks 变量来记录最近一次 Buffer 使用的时间,在 brelse 中更新这个时间,用于 LRU 策略,因此在缓存替换时,我们只要遍历一遍未使用的 Buffer 中 ticks 最小的那个缓存,将其替换即可。因此我们就不需要双向链表了:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 kernel/bio.c: struct { struct spinlock lock ; struct buf buf [NBUF ]; } bcache; kernel/buf.h: struct buf {··· uchar data[BSIZE]; uint64 ticks; };
有了这么多锁,自然要初始化他们:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 kernel/bio.c: void binit(void ) { struct buf *b ; for (int i = 0 ;i < 31 ;++i) { char lock_name[] = "bcache0" ; lock_name[6 ] = '0' + i; initlock(&bcache_locks[i], lock_name); } for (b = bcache.buf; b < bcache.buf+NBUF; b++){ initsleeplock(&b->lock, "buffer" ); } }
对 bget 和 brelse 进行相应的改动:注意在 LRU 进行缓存替换的时候,一定要持有 bcache 的锁,因为这里只持有 对应逻辑块号的锁是无法保证线程安全的。假设 CPU0 和 CPU1 同时执行到 缓存替换这一行,都看到 Buffer1 是最近最久未使用的 Buffer,它们同时对其进行替换并返回,这就导致了两个文件对应一个 Buffer 的情况了,这肯定是会发生问题的!因此在做 缓存替换时,一定要加上 对 bcache 的锁。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 kernel/bio.c: static struct buf *bget (uint dev , uint blockno ){ struct buf *b ; struct spinlock *l = &bcache_locks[blockno % 31 ]; acquire(l); for (int i = 0 ;i < NBUF;++i) { b = &bcache.buf[i]; if (b->dev == dev && b->blockno == blockno) { b->refcnt++; release(l); acquiresleep(&b->lock); return b; } } uint min_ticks = 0x0fffffff ; uint idx = 0 ; int flag = 0 ; acquire(&bcache.lock); for (int i = 0 ;i < NBUF;++i){ b = &bcache.buf[i]; if (b->refcnt == 0 && b->ticks < min_ticks) { idx = i; min_ticks = b->ticks; flag = 1 ; } } if (flag == 1 ) { b = &bcache.buf[idx]; b->dev = dev; b->blockno = blockno; b->valid = 0 ; b->refcnt = 1 ; release(&bcache.lock); release(l); acquiresleep(&b->lock); return b; } panic("bget: no buffers" ); }