技术交流

好好学习,天天向上。

0%

第五章——并发和竞态

介绍

在SMP系统中,资源共享是很常见的事情;在DMA、抢占、中断的复杂环境中,资源共享是很常见的事情;硬件资源被多个程序共用,是很常见的事情。本章讲资源共享时的各种保护方式。

信号量和Mutex

信号量,本质就是一个变量。可以对其执行加减(P/V)操作来表示该信号量是可用还是空闲,进而表示软硬件资源是可用还是空闲。信号量的值是几,表示资源值最多可以被几个请求者使用。每个请求者用资源之前,都尝试对信号量执行P操作(原子操作,如果减去1结果大于等于0,则执行前去1,否则可能挂起进程),释放资源时都执行V操作(原子操作,信号量加1,然后按照设定唤醒挂在P操作上的进程)。当信号量的初始值被设备为1的时候,信号量事实上就成为了Mutex。

常规的信号量

信号量的常用操作如下所示。从函数原型可以看到,在内核中信号量和Mutex是同一种实现

1
2
3
4
5
6
7
8
9
10
11
#include <asm/semaphore.h>                         // 头文件
void sema_init(struct semaphore *sem, int val); // 动态分配sem内存,动态初始化 val时信号量初始值
DECLARE_MUTEX(name); // 静态声明一个未抢占的Mutex
DECLARE_MUTEX_LOCKED(name); // 静态声明一个抢占的Mutex
void init_MUTEX(struct semaphore *sem); // 将sema_init的val设为1是一样的
void init_MUTEX_LOCKED(struct semaphore *sem); // 锁定版本

void down(struct semaphore *sem); // P操作,P不成则挂起
int down_interruptible(struct semaphore *sem); // P操作,P不成进入可中断睡眠状态,能被其他的事件唤醒
int down_trylock(struct semaphore *sem); // P操作,P不成立即返回,不会挂起
void up(struct semaphore *sem); // V操作

读写信号量

读写信号量用于一个数据生产者对应多个数据消费者场景的资源共享。修改资源和读取资源互斥,读取资源与读取资源不互斥,对应的常用操作如下

1
2
3
4
5
6
7
8
9
10
11
#include <linux/rwsem.h>                           // 头文件
void init_rwsem(struct rw_semaphore *sem);
void down_read(struct rw_semaphore *sem);
int down_read_trylock(struct rw_semaphore *sem);
void up_read(struct rw_semaphore *sem);

void down_write(struct rw_semaphore *sem);
int down_write_trylock(struct rw_semaphore *sem);
void up_write(struct rw_semaphore *sem);
// 获取写入资格的代码,在写完部分数据之后,剩下的都只是读取资源,可以用下面接口将自己的写入身份转换为读取身份
void downgrade_write(struct rw_semaphore *sem);

完成量

理论上讲,对于异步处理模型,可以用Mutex来实现。主线程申请一个锁定的Mutex,然后将该Mutex交给异步线程处理做V操作。而主线程在异步线程开始执行后,自己挂在该Mutex上。等到异步线程执行完成后,执行V操作主线程会被自动唤醒。上述方法有个问题,如果Mutex是在内核栈上分配的,某些情况该下在异步线程完成操作前,该变量可能会消失。还有个问题就是信号量不是为这种模型设计的,不高效。所以,内核实现了完成量,其常用接口如下所示

1
2
3
4
5
6
7
8
9
DECLARE_COMPLETION(my_completion);     // 创建 + 初始化
INIT_COMPLETION(struct completion c); // 完成量是one-shot的,complete_all之后必须重新初始化,该宏可以一步到位重初始化
struct completion my_completion; // 创建
init_completion(&my_completion); // 初始

void wait_for_completion(struct completion *c); // 等待一个完成量,可以多个线程同时等待
void complete(struct completion *c); // 将一个完成量完成,唤醒一个挂在该完成量上的线程
void complete_all(struct completion *c); // 将一个完成量完成,唤醒所有挂在该完成量上的线程
void complete_and_exit(struct completion *c, long retval); // 一般是内核线程用

完成量典型应用场景就是模块退出的时候,停止内核线程。模块主要功能以内核线程形式工作,退出时exit告知线程停止,然后挂在完成量上等待内核线程完成。内核线准备好退出后,complete完成量。

自旋锁

自旋锁是一种开销更小的互斥保护机制,挂在自旋锁上面的线程不需要复杂的sleep,而是直接原地无意义消耗CPU。自旋锁不能再中断处理函数中使用,因为中断优先执行,想拿的锁可能在调度到中断处理之前已经被被的地方锁定过了,强行拿锁会导致死锁。实时上,使用自旋锁的基本属性就是,在拿到锁以后不能给其他任何地方再次强锁的机会。可以按照以下编码方式实现这种假定

  • 中断不拿自旋锁
  • 拿到自旋锁的代码,不能丢CPU(关中断、关抢占、关调度、不使用可能休眠的函数)

自旋锁API如下所示

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
spinlock_t my_lock = SPIN_LOCK_UNLOCKED; // 创建一个未锁的自旋锁
void spin_lock_init(spinlock_t *lock); // 初始化一个自旋锁

void spin_lock(spinlock_t *lock); // 锁定一个自旋锁
void spin_unlock(spinlock_t *lock); // 释放一个自旋锁

// 锁定一个自旋锁前,关闭所有中断(本地CPU),flags中保存原始中断控制数据
// 该API假设不知道上锁之前中断是开的还是关的,所以干脆原封不动保存,用完锁再原封不动恢复
void spin_lock_irqsave(spinlock_t *lock, unsigned long flags);

// 锁定一个自旋锁前,关闭所有中断(本地CPU),该函数假定之前开了中断,该且它执行关闭中断操作
void spin_lock_irq(spinlock_t *lock);

// 锁定一个自旋锁前,只关闭软件中断
void spin_lock_bh(spinlock_t *lock)

void spin_unlock_irqrestore(spinlock_t *lock, unsigned long flags); // 释放一个自旋锁,同时恢复原始中断装填
// 释放一个自旋锁,打开本地软硬中断
void spin_unlock_irq(spinlock_t *lock);
// 释放一个自旋锁,打开本地软件中断
void spin_unlock_bh(spinlock_t *lock);

int spin_trylock(spinlock_t *lock); // 尝试上锁,锁不上不挂起。锁定成功返回非零值,反之失败
int spin_trylock_bh(spinlock_t *lock); // 上一函数的禁止本地软件中断版本,没有完全禁止本地中断的try版本

读写自旋锁

读写信号量的自旋锁版本,自旋锁的读写拆分版本。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
rwlock_t my_rwlock = RW_LOCK_UNLOCKED; // 一次性完成申请和初始化* Static way */
rwlock_t my_rwlock; //
rwlock_init(&my_rwlock); //

void read_lock(rwlock_t *lock);
void read_lock_irqsave(rwlock_t *lock, unsigned long flags);
void read_lock_irq(rwlock_t *lock);
void read_lock_bh(rwlock_t *lock);
void read_unlock(rwlock_t *lock);
void read_unlock_irqrestore(rwlock_t *lock, unsigned long flags);
void read_unlock_irq(rwlock_t *lock);
void read_unlock_bh(rwlock_t *lock); // 注意,没有try版本

void write_lock(rwlock_t *lock);
void write_lock_irqsave(rwlock_t *lock, unsigned long flags);
void write_lock_irq(rwlock_t *lock);
void write_lock_bh(rwlock_t *lock);
int write_trylock(rwlock_t *lock);
void write_unlock(rwlock_t *lock);
void write_unlock_irqrestore(rwlock_t *lock, unsigned long flags);
void write_unlock_irq(rwlock_t *lock);
void write_unlock_bh(rwlock_t *lock); // 注意,没有try版本

多锁多信号量以及混用规则

  • 多个锁/信号量需要同时锁定时,保持用相同的顺序拿锁/信号量

    这样不会出现A线程拿到锁A,B线程拿到锁B,A、B线程死锁在另外一个线程上的情景。

  • 局部锁先上锁,然后锁定再上更大范围的锁

    屋子收拾干净再接客,内部处理好了再去影响更大范围,否则因为局部锁还没拿到,大家都因为这个小区域被挂起。

  • 信号量和自旋锁混用,应该先拿信号量,再拿自旋锁

    因为自旋锁不能睡眠,信号量可以睡眠。

原子变量

当共享的资源是一个简单的整数值的时候,就可以用原子变量。原子变量在所有的内核架构上都是用int存储的,但是某些架构在将int用于实现原子操作时不能使用完整的int,因此原子变量不能记录大于24位的整数。原子变量只能通过封装的接口来访问,不能将原子变量用于任何显式、隐式的类型转换。多个原子变量共同使用的时候,仍然需要用锁。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
void atomic_set(atomic_t *v, int i);
atomic_t v = ATOMIC_INIT(0);
int atomic_read(atomic_t *v);

void atomic_add(int i, atomic_t *v);
void atomic_sub(int i, atomic_t *v);
void atomic_inc(atomic_t *v);
void atomic_dec(atomic_t *v);
int atomic_add_return(int i, atomic_t *v);
int atomic_sub_return(int i, atomic_t *v);
int atomic_inc_return(atomic_t *v);
int atomic_dec_return(atomic_t *v);
// 原子变量*v加了i之后,若为负数则返回true,否则返回false
int atomic_add_negative(int i, atomic_t *v);

// 下面的接口在操作完成后,原子值为0,则返回true,否则返回false
int atomic_inc_and_test(atomic_t *v);
int atomic_dec_and_test(atomic_t *v);
int atomic_sub_and_test(int i, atomic_t *v);

原子位操作

原子操作用来计数比较方便,如果需要将掩码用原子方式处理,可以用内核封装的原子位操作。

1
2
3
4
5
6
7
8
9
10
void set_bit(nr, void *addr);
void clear_bit(nr, void *addr);
void change_bit(nr, void *addr);

// 下面接口不仅实现目标功能,还返回操作前nr bit的值
int test_and_set_bit(nr, void *addr);
int test_and_clear_bit(nr, void *addr);
int test_and_change_bit(nr, void *addr);

test_bit(nr, void *addr); // 返回*addr的nr bit的值,可以不原子实现

seqlock

如果某个资源大部分时间都是读取,仅仅很少时间会写入,那么可以用seqlock来做保护。seqlock锁用于那种,读取者在读取数据时,认为极大概率写入者不会修改内容,就算改了,重新执行读取流程也无明显cycle消耗的场景。seqlock让读取者自行验证在数据读取期间是否和写入者发生了冲突。seqlock不用于保护指针资源,因为中途被写入者修改了,读取者可能错误的解指针。使用seqlock的经典代码如下

1
2
3
4
5
unsigned int seq;
do {
seq = read_seqbegin(&the_lock); // 声明读取
/* 读取者读取数据,然后处理 */
} while read_seqretry(&the_lock, seq); // 验证读取期间是否有写入者修改,若有修改read_seqretry返回true

seqlock接口如下(不全,具体看内核代码)

1
2
3
4
5
6
7
8
9
10
11
12
13
// 在中断中用seqlock要关中断, 注意这里只是读取可以在中断例程中用,因为读取不会挂起
unsigned int read_seqbegin_irqsave(seqlock_t *lock, unsigned long flags);
int read_seqretry_irqrestore(seqlock_t *lock, unsigned int seq, unsigned long flags);

// seqlock通过自旋锁实现,有很多和自旋锁API相似的API
void write_seqlock(seqlock_t *lock);
void write_sequnlock(seqlock_t *lock);
void write_seqlock_irqsave(seqlock_t *lock, unsigned long flags);
void write_seqlock_irq(seqlock_t *lock);
void write_seqlock_bh(seqlock_t *lock);
void write_sequnlock_irqrestore(seqlock_t *lock, unsigned long flags);
void write_sequnlock_irq(seqlock_t *lock);
void write_sequnlock_bh(seqlock_t *lock);

RCU

RCU同样用于保护大部分时间都是读取,少部分时间是修改的资源,不过RCU保护的资源只能是指针资源。RCU的设计思路是,写入者首先复制一份要修改的资源,然后修改副本,修改完后做指针替换就完成资源修改。这种设计主要出于不影响读取者的考虑, 例如路由表就用RCU保护。此外,RCU还有个假定,那就是所有读取数据的代码,对RCU所保护的指针指向的数据的引用都是原子的。什么意思呢,就是说假设某个代码块使用了RCU保护的数据,那么它必须保证在自己被调度出CPU之前完成在该数据上的操作。换句话下次调度回来就不能再用RCU保护的指针了,再换句话说,就是在*rcu_read_unlock();*之后就不能再用RCU保护的指针了。为什么RCU这么设计呢?因为这样RCU修改者就确定什么时候可以删除修改前的数据而不会影响读取者。

1
2
3
4
5
struct my_stuff *stuff;
rcu_read_lock( );
stuff = getxxxx();
/*这个地方是原子的,RCU默认关本地中断和抢占,出了这个区域,保护的指针就不能用了*/
rcu_read_unlock( );

正是由于上面的RCU的约定,那么RCU的写入者就可以确定,在一轮调度之后,修改前的数据块一定没有读取者使用了,此时可以安全释放该资源。所以RCU写入者用回调的形式释放资源,释放时刚好是一轮调度之后。具体RCU使用方法、API可参考内核文档。

CPU变量

CPU变量本书未做介绍,但是是一种非常重要的免锁实现方案。CPU变量将资源在每个CPU都存放一份,每个CPU各自维护,独立管理。具体用法参考内核文档。