介绍
在SMP系统中,资源共享是很常见的事情;在DMA、抢占、中断的复杂环境中,资源共享是很常见的事情;硬件资源被多个程序共用,是很常见的事情。本章讲资源共享时的各种保护方式。
信号量和Mutex
信号量,本质就是一个变量。可以对其执行加减(P/V)操作来表示该信号量是可用还是空闲,进而表示软硬件资源是可用还是空闲。信号量的值是几,表示资源值最多可以被几个请求者使用。每个请求者用资源之前,都尝试对信号量执行P操作(原子操作,如果减去1结果大于等于0,则执行前去1,否则可能挂起进程),释放资源时都执行V操作(原子操作,信号量加1,然后按照设定唤醒挂在P操作上的进程)。当信号量的初始值被设备为1的时候,信号量事实上就成为了Mutex。
常规的信号量
信号量的常用操作如下所示。从函数原型可以看到,在内核中信号量和Mutex是同一种实现
1 2 3 4 5 6 7 8 9 10 11
| #include <asm/semaphore.h> // 头文件 void sema_init(struct semaphore *sem, int val); DECLARE_MUTEX(name); DECLARE_MUTEX_LOCKED(name); void init_MUTEX(struct semaphore *sem); void init_MUTEX_LOCKED(struct semaphore *sem);
void down(struct semaphore *sem); int down_interruptible(struct semaphore *sem); int down_trylock(struct semaphore *sem); void up(struct semaphore *sem);
|
读写信号量
读写信号量用于一个数据生产者对应多个数据消费者场景的资源共享。修改资源和读取资源互斥,读取资源与读取资源不互斥,对应的常用操作如下
1 2 3 4 5 6 7 8 9 10 11
| #include <linux/rwsem.h> // 头文件 void init_rwsem(struct rw_semaphore *sem); void down_read(struct rw_semaphore *sem); int down_read_trylock(struct rw_semaphore *sem); void up_read(struct rw_semaphore *sem);
void down_write(struct rw_semaphore *sem); int down_write_trylock(struct rw_semaphore *sem); void up_write(struct rw_semaphore *sem);
void downgrade_write(struct rw_semaphore *sem);
|
完成量
理论上讲,对于异步处理模型,可以用Mutex来实现。主线程申请一个锁定的Mutex,然后将该Mutex交给异步线程处理做V操作。而主线程在异步线程开始执行后,自己挂在该Mutex上。等到异步线程执行完成后,执行V操作主线程会被自动唤醒。上述方法有个问题,如果Mutex是在内核栈上分配的,某些情况该下在异步线程完成操作前,该变量可能会消失。还有个问题就是信号量不是为这种模型设计的,不高效。所以,内核实现了完成量,其常用接口如下所示
1 2 3 4 5 6 7 8 9
| DECLARE_COMPLETION(my_completion); INIT_COMPLETION(struct completion c); struct completion my_completion; init_completion(&my_completion);
void wait_for_completion(struct completion *c); void complete(struct completion *c); void complete_all(struct completion *c); void complete_and_exit(struct completion *c, long retval);
|
完成量典型应用场景就是模块退出的时候,停止内核线程。模块主要功能以内核线程形式工作,退出时exit告知线程停止,然后挂在完成量上等待内核线程完成。内核线准备好退出后,complete完成量。
自旋锁
自旋锁是一种开销更小的互斥保护机制,挂在自旋锁上面的线程不需要复杂的sleep,而是直接原地无意义消耗CPU。自旋锁不能再中断处理函数中使用,因为中断优先执行,想拿的锁可能在调度到中断处理之前已经被被的地方锁定过了,强行拿锁会导致死锁。实时上,使用自旋锁的基本属性就是,在拿到锁以后不能给其他任何地方再次强锁的机会。可以按照以下编码方式实现这种假定
- 中断不拿自旋锁
- 拿到自旋锁的代码,不能丢CPU(关中断、关抢占、关调度、不使用可能休眠的函数)
自旋锁API如下所示
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
| spinlock_t my_lock = SPIN_LOCK_UNLOCKED; void spin_lock_init(spinlock_t *lock);
void spin_lock(spinlock_t *lock); void spin_unlock(spinlock_t *lock);
void spin_lock_irqsave(spinlock_t *lock, unsigned long flags);
void spin_lock_irq(spinlock_t *lock);
void spin_lock_bh(spinlock_t *lock)
void spin_unlock_irqrestore(spinlock_t *lock, unsigned long flags);
void spin_unlock_irq(spinlock_t *lock);
void spin_unlock_bh(spinlock_t *lock);
int spin_trylock(spinlock_t *lock); int spin_trylock_bh(spinlock_t *lock);
|
读写自旋锁
读写信号量的自旋锁版本,自旋锁的读写拆分版本。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| rwlock_t my_rwlock = RW_LOCK_UNLOCKED; rwlock_t my_rwlock; rwlock_init(&my_rwlock);
void read_lock(rwlock_t *lock); void read_lock_irqsave(rwlock_t *lock, unsigned long flags); void read_lock_irq(rwlock_t *lock); void read_lock_bh(rwlock_t *lock); void read_unlock(rwlock_t *lock); void read_unlock_irqrestore(rwlock_t *lock, unsigned long flags); void read_unlock_irq(rwlock_t *lock); void read_unlock_bh(rwlock_t *lock);
void write_lock(rwlock_t *lock); void write_lock_irqsave(rwlock_t *lock, unsigned long flags); void write_lock_irq(rwlock_t *lock); void write_lock_bh(rwlock_t *lock); int write_trylock(rwlock_t *lock); void write_unlock(rwlock_t *lock); void write_unlock_irqrestore(rwlock_t *lock, unsigned long flags); void write_unlock_irq(rwlock_t *lock); void write_unlock_bh(rwlock_t *lock);
|
多锁多信号量以及混用规则
多个锁/信号量需要同时锁定时,保持用相同的顺序拿锁/信号量
这样不会出现A线程拿到锁A,B线程拿到锁B,A、B线程死锁在另外一个线程上的情景。
局部锁先上锁,然后锁定再上更大范围的锁
屋子收拾干净再接客,内部处理好了再去影响更大范围,否则因为局部锁还没拿到,大家都因为这个小区域被挂起。
信号量和自旋锁混用,应该先拿信号量,再拿自旋锁
因为自旋锁不能睡眠,信号量可以睡眠。
原子变量
当共享的资源是一个简单的整数值的时候,就可以用原子变量。原子变量在所有的内核架构上都是用int存储的,但是某些架构在将int用于实现原子操作时不能使用完整的int,因此原子变量不能记录大于24位的整数。原子变量只能通过封装的接口来访问,不能将原子变量用于任何显式、隐式的类型转换。多个原子变量共同使用的时候,仍然需要用锁。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| void atomic_set(atomic_t *v, int i); atomic_t v = ATOMIC_INIT(0); int atomic_read(atomic_t *v);
void atomic_add(int i, atomic_t *v); void atomic_sub(int i, atomic_t *v); void atomic_inc(atomic_t *v); void atomic_dec(atomic_t *v); int atomic_add_return(int i, atomic_t *v); int atomic_sub_return(int i, atomic_t *v); int atomic_inc_return(atomic_t *v); int atomic_dec_return(atomic_t *v);
int atomic_add_negative(int i, atomic_t *v);
int atomic_inc_and_test(atomic_t *v); int atomic_dec_and_test(atomic_t *v); int atomic_sub_and_test(int i, atomic_t *v);
|
原子位操作
原子操作用来计数比较方便,如果需要将掩码用原子方式处理,可以用内核封装的原子位操作。
1 2 3 4 5 6 7 8 9 10
| void set_bit(nr, void *addr); void clear_bit(nr, void *addr); void change_bit(nr, void *addr);
int test_and_set_bit(nr, void *addr); int test_and_clear_bit(nr, void *addr); int test_and_change_bit(nr, void *addr);
test_bit(nr, void *addr);
|
seqlock
如果某个资源大部分时间都是读取,仅仅很少时间会写入,那么可以用seqlock来做保护。seqlock锁用于那种,读取者在读取数据时,认为极大概率写入者不会修改内容,就算改了,重新执行读取流程也无明显cycle消耗的场景。seqlock让读取者自行验证在数据读取期间是否和写入者发生了冲突。seqlock不用于保护指针资源,因为中途被写入者修改了,读取者可能错误的解指针。使用seqlock的经典代码如下
1 2 3 4 5
| unsigned int seq; do { seq = read_seqbegin(&the_lock); } while read_seqretry(&the_lock, seq);
|
seqlock接口如下(不全,具体看内核代码)
1 2 3 4 5 6 7 8 9 10 11 12 13
| unsigned int read_seqbegin_irqsave(seqlock_t *lock, unsigned long flags); int read_seqretry_irqrestore(seqlock_t *lock, unsigned int seq, unsigned long flags);
void write_seqlock(seqlock_t *lock); void write_sequnlock(seqlock_t *lock); void write_seqlock_irqsave(seqlock_t *lock, unsigned long flags); void write_seqlock_irq(seqlock_t *lock); void write_seqlock_bh(seqlock_t *lock); void write_sequnlock_irqrestore(seqlock_t *lock, unsigned long flags); void write_sequnlock_irq(seqlock_t *lock); void write_sequnlock_bh(seqlock_t *lock);
|
RCU
RCU同样用于保护大部分时间都是读取,少部分时间是修改的资源,不过RCU保护的资源只能是指针资源。RCU的设计思路是,写入者首先复制一份要修改的资源,然后修改副本,修改完后做指针替换就完成资源修改。这种设计主要出于不影响读取者的考虑, 例如路由表就用RCU保护。此外,RCU还有个假定,那就是所有读取数据的代码,对RCU所保护的指针指向的数据的引用都是原子的。什么意思呢,就是说假设某个代码块使用了RCU保护的数据,那么它必须保证在自己被调度出CPU之前完成在该数据上的操作。换句话下次调度回来就不能再用RCU保护的指针了,再换句话说,就是在*rcu_read_unlock();*之后就不能再用RCU保护的指针了。为什么RCU这么设计呢?因为这样RCU修改者就确定什么时候可以删除修改前的数据而不会影响读取者。
1 2 3 4 5
| struct my_stuff *stuff; rcu_read_lock( ); stuff = getxxxx();
rcu_read_unlock( );
|
正是由于上面的RCU的约定,那么RCU的写入者就可以确定,在一轮调度之后,修改前的数据块一定没有读取者使用了,此时可以安全释放该资源。所以RCU写入者用回调的形式释放资源,释放时刚好是一轮调度之后。具体RCU使用方法、API可参考内核文档。
CPU变量
CPU变量本书未做介绍,但是是一种非常重要的免锁实现方案。CPU变量将资源在每个CPU都存放一份,每个CPU各自维护,独立管理。具体用法参考内核文档。