由于调度随时发生,而物理内存被多个 CPU 共享,所以两个 CPU 或线程有可能会同时访问同一个数据结构,如果不小心地限制并发访问,同时读写或同时写都可能会得到错误的结果、甚至损坏数据结构。有许多策略可以用来保持并发下代码的正确性,锁是其中常用的一种

锁具有互斥性,确保同一把锁在同一时间只有一个 CPU 可以持有,如果将锁与某个共享数据关联,并始终在持有锁的情况下访问数据,那么这个数据就在同一时间也只有一个 CPU 可以使用,在这种情况下,可以说锁保护了这个数据

以链表为例

1  struct element {
2    int data;
3    struct element *next;
4  };
5
6  struct element *list = 0; 
7
8  void
9  push(int data)
10 {
11   struct element *l;
12
13   l = malloc(sizeof *l);
14   l->data = data;
15   l->next = list;
16   list = l;
17 }

这段代码在串行环境中可以正确运行,但并行则会出错。假设有两个 CPU 同时在调用 push 且都执行完了第 15 行,那么当它们先后执行第 16 行时,后执行的赋值会把先执行的 push 结果覆盖掉,两次 push 只增加了一个元素

上面是竞争条件(race condition)的一个例子,当一个内存地址被并发访问,且其中至少有一个访问是写操作时,就会产生竞争条件,竞争的结果通常是 bug,要么其中一个写操作丢失,要么读到了还没更新完的数据,结果取决于 CPU 的执行顺序和内存操作顺序,所以 bug 通常不容易复现,加一行日志也可能改变上述顺序,使竞争消失

当我们说锁保护了数据,实际上是指锁保护了关于数据的一些不变量(invariant)集合,不变量是指数据结构在操作过后依然保持一致的属性,一个操作的正确性常常取决于开始操作时数据的这些不变性是否为真。对于示例代码而言,数据的不变量是 list 始终指向列表的第一个元素且每个元素的 next 指针指向下一个元素,而 第 15 行把 l->next 指向 list 暂时破坏了数据的不变性(这时候 list 指向的不是列表的第一个元素了),并在 16 行恢复,即这两行代码之间存在不变量被破坏的空隙,当像上面说的有两个 CPU 同时执行到这儿,操作了不变性为假的数据时结果就会出错。在第 15 之前、16 行之后加锁就可以保证不会有两个 CPU 同时执行这些代码,这两行代码常常称为临界区代码(critical section)

逻辑上,自旋锁的实现应该是像下面这样的

void acquire(struct spinlock *lk) { // does not work!
  for(;;) {
    if(lk->locked == 0) {
        lk->locked = 1;
        break; 
    }
  } 
}

但这个实现并不能保证并行环境下互斥,如果有两个 CPU 同时执行了 if 语句,都判断 lk->locked == 0 为真,就都会认为自己持有了锁并继续执行,出错的原因在于判断能否持有锁的代码和持有锁的代码之间存在空隙,这个过程不是原子性的

由于锁的广泛使用,多核处理器通常会提供指令来解决上面所说原子性的问题,例如在 RISC-V 中,C 语言的内建函数 __sync_lock_test_and_set 会被编译成对 amoswap r, a 指令的调用,后者将 r 的值与 a 指向的值交换,并返回 r 的旧值。例如假设有两个 CPU 同时执行到 __sync_lock_test_and_set(&lk->locked, 1) 来持有同一把锁,先执行的 CPU 会在一条指令内把 lk->locked 设为 1 并读到 0 的旧值,后执行的 CPU 则只能读到 1,持有锁失败

所以正确的自旋代码应该是像这样的

void acquire(struct spinlock *lk) {
  while(__sync_lock_test_and_set(&lk->locked, 1) != 0)
    ;

  /* ... */
}

自旋锁在 while 循环中不停检测能否持有锁,在成功持有锁之前会一直占用着 CPU 资源,适用于临界区代码较少、持有后快速释放锁的场景,当确定能很快得到锁时,自旋会比上下文切换到其他进程更节省 CPU 资源。除了自旋锁外,xv6 还实现了另一种适用于其他场景的睡眠锁(sleep lock),当进程试图持有一个睡眠锁时,如果发现锁已经被其他进程持有,前者就会进入睡眠状态,将 CPU 资源让给其他进程,并在唤醒后重新检查锁的状态,再决定继续睡眠还是持有锁,与自旋锁相反,睡眠锁在睡眠和唤醒过程中存在大量的上下文切换,但不会长时间占用 CPU,所以更适合在耗时较长的任务中使用

void acquiresleep(struct sleeplock *lk) {
  acquire(&lk->lk);
  while (lk->locked) {
    sleep(lk, &lk->lk);
  }
  lk->locked = 1;
  lk->pid = myproc()->pid;
  release(&lk->lk);
}

锁通常在同一个函数里持有(acquire)和释放(release),但更恰当的用法是:在一系列需要确保原子性的操作前持有锁,并在这一系列操作结束后释放,如果这些操作在不同函数,甚至不同线程、不同 CPU 开始和结束,锁的持有和释放也应该这么做

从表面上看,上面的 acquiresleep 函数是标准的开头持有、结尾释放锁的结构。但实际上,代码持有用于保护 lk->lockedlk->lk 自旋锁,在 while 条件里检测锁的状态为锁上后,就把 lk->lk 作为参数传给了 sleep,后者在获取到所在进程的 p->lock 之后,把 lk->lk 给释放了,直到进程睡眠被唤醒后才重新持有它,然后返回到 acquiresleep 继续执行,即在 acquiresleep 整个函数的代码中, lk->lk 是在 acquiresleep 函数被持有、然后在 sleep 中被释放、再在 sleep 函数被持有、在 acquiresleep 函数释放的,正是上面说的在不同函数持有和释放的场景

void sleep(void *chan, struct spinlock *lk) {
  struct proc *p = myproc();
  
  if(lk != &p->lock) {  
    acquire(&p->lock);
    release(lk);
  }

  /* sleep and waked up */

  // Reacquire original lock.
  if(lk != &p->lock){
    release(&p->lock);
    acquire(lk);
  }
}