static inline voidspin_lock(spinlock_t *lock){raw_spin_lock(&lock->rlock);}/* * Define the various spin_lock methods. Note we define these * regardless of whether CONFIG_SMP or CONFIG_PREEMPT are set. The * various methods are defined as nops in the case they are not * required. */#define raw_spin_trylock(lock) __cond_lock(lock,_raw_spin_trylock(lock))//linux/include/linux/compiler.h# define __cond_lock(x,c) (c)//linux/include/linux/spinlock_api_up.h#define _raw_spin_trylock(lock) ({ __LOCK(lock); 1; })/* * In the UP-nondebug case there's no real locking going on, so the * only thing we have to do is to keep the preempt counts and irq * flags straight, to suppress compiler warnings of unused lock * variables, and to add the proper checker annotations: */#define ___LOCK(lock) \do { __acquire(lock); (void)(lock); } while (0)//preempt_disable保证进程在临界区时不会被中断,来预防导致死锁发生#define __LOCK(lock) \do { preempt_disable(); ___LOCK(lock); } while (0)//linux/include/linux/compiler.h# define __acquire(x) __context__(x,1)//这是一对用于sparse对代码检测的相互关联的函数定义,第一句表示要增加变量x的计数,增加量为1,第二句则正好相反,这个是用来函数编译的过程中。如果在代码中出现了不平衡的状况,那么在Sparse的检测中就会报警。
由上述通用代码追溯没有看到具体的实现,目前也不知道如何与不同平台架构的具体实现相关联。下面将分析内核代码中针对 ARM 平台arch_spin_lock的实现代码。
result:50000000duration:98213ms➜ c++ ./a.outresult:50000000duration:97476ms➜ c++ ./a.outresult:50000000duration:98720ms➜ c++ ./a.outresult:50000000duration:98079ms
不使用互斥锁结果如下,出错
➜ c++ ./a.out result:6347619duration:1140050ms➜ c++ ./a.outresult:6945086duration:1003280ms➜ c++ ./a.outresult:4890718duration:992402ms➜ c++ ./a.outresult:7036019duration:966157ms
可以看出c++使用互斥锁,比使用自旋锁或不使用锁,耗时少。
c中互斥锁的使用
编译命令cc xxx.c -lpthread
# include <stdio.h># include <pthread.h>long total =0; static constint numthread =50;pthread_mutex_t mute;// 点击函数void*click(void*arg){pthread_mutex_lock(&mute);for(int i=0; i<1000000;++i) {// 对全局数据进行无锁访问 total +=1; }pthread_mutex_unlock(&mute);}int main(){// 计时开始 clock_t start =clock();// 创建100个线程模拟点击统计 pthread_t mythreads[numthread]; int thread_id[numthread];for(int i=0; i<numthread; ++i){ thread_id[i] = i;pthread_create(&mythreads[i],NULL, click, (void*)&thread_id[i]); }for (int i =0; i < numthread; i++) { int rc =pthread_join(mythreads[i],NULL); }// 计时结束 clock_t finish =clock();// 输出结果printf("result:%ld\n",total);printf("result:%ld\n",finish -start);return0;}
结果如下
result:50000000result:99046➜ c ./a.outresult:50000000result:100372➜ c ./a.outresult:50000000result:99839
不使用互斥锁时,结果如下,可以看出使用互斥锁后程序耗时更少。
result:5512872result:981536➜ c ./a.outresult:5164347result:981757➜ c ./a.outresult:6051030result:963732➜ c ./a.outresult:7926930result:967352
static int futex_wait(u32 __user *uaddr, unsigned int flags, u32 val, ktime_t *abs_time, u32 bitset){ struct hrtimer_sleeper timeout,*to; struct restart_block *restart; struct futex_hash_bucket *hb; struct futex_q q = futex_q_init; int ret;if (!bitset)return-EINVAL;q.bitset = bitset;//设置定时任务,如果一定时间后进程还没被唤醒这唤醒wait的线程 to =futex_setup_timer(abs_time,&timeout, flags, current->timer_slack_ns);retry:/* * Prepare to wait on uaddr. On success, holds hb lock and increments * q.key refs. *///该函数中,将用户空间值写入内核空间,若成功,返回0,以及一些初始化操作 ret =futex_wait_setup(uaddr, val, flags,&q,&hb);if (ret)goto out;//将当前进程状态改为TASK_INTERRUPTIBLE,并插入到futex等待队列,然后重新调度。/* queue_me and wait for wakeup, timeout, or a signal. */futex_wait_queue_me(hb,&q, to);/* If we were woken (and unqueued), we succeeded, whatever. */ ret =0;//如果unqueue_me成功,则说明是超时触发(因为futex_wake唤醒时,会将该进程移出等待队列,所以这里会失败/* unqueue_me() drops q.key ref */if (!unqueue_me(&q))goto out; ret =-ETIMEDOUT;if (to &&!to->task)goto out;/* * We expect signal_pending(current), but we might be the * victim of a spurious wakeup as well. */if (!signal_pending(current))goto retry; ret =-ERESTARTSYS;if (!abs_time)goto out; restart =¤t->restart_block; restart->fn = futex_wait_restart; restart->futex.uaddr = uaddr; restart->futex.val = val; restart->futex.time =*abs_time; restart->futex.bitset = bitset; restart->futex.flags = flags |FLAGS_HAS_TIMEOUT; ret =-ERESTART_RESTARTBLOCK;out:if (to) {hrtimer_cancel(&to->timer);destroy_hrtimer_on_stack(&to->timer); }return ret;}
在将进程阻塞前会将当期进程插入到一个全局唯一的等待队列中。 着重看futex_wait_setup
/** * futex_wait_setup() - Prepare to wait on a futex * @uaddr: the futex userspace address * @val: the expected value * @flags: futex flags (FLAGS_SHARED, etc.) * @q: the associated futex_q * @hb: storage for hash_bucket pointer to be returned to caller * * Setup the futex_q and locate the hash_bucket. Get the futex value and * compare it with the expected value. Handle atomic faults internally. * Return with the hb lock held and a q.key reference on success, and unlocked * with no q.key reference on failure. * * Return: * - 0 - uaddr contains val and hb has been locked; * - <1 - -EFAULT or -EWOULDBLOCK (uaddr does not contain val) and hb is unlocked */static int futex_wait_setup(u32 __user *uaddr, u32 val, unsigned int flags, struct futex_q *q, struct futex_hash_bucket **hb){ u32 uval; int ret;/* * Access the page AFTER the hash-bucket is locked. * Order is important: * * Userspace waiter: val = var; if (cond(val)) futex_wait(&var, val); * Userspace waker: if (cond(var)) { var = new; futex_wake(&var); } * * The basic logical guarantee of a futex is that it blocks ONLY * if cond(var) is known to be true at the time of blocking, for * any cond. If we locked the hash-bucket after testing *uaddr, that * would open a race condition where we could block indefinitely with * cond(var) false, which would violate the guarantee. * * On the other hand, we insert q and release the hash-bucket only * after testing *uaddr. This guarantees that futex_wait() will NOT * absorb a wakeup if *uaddr does not match the desired values * while the syscall executes. */retry:ret =get_futex_key(uaddr, flags &FLAGS_SHARED,&q->key,FUTEX_READ);if (unlikely(ret !=0))return ret;retry_private://获得自旋锁*hb =queue_lock(q);//把用户空间的值放到内核空间中,即原子的将uaddr的值设置到uval中,成功返回0 ret =get_futex_value_locked(&uval, uaddr);if (ret) {queue_unlock(*hb); ret =get_user(uval, uaddr);if (ret)return ret;if (!(flags &FLAGS_SHARED))goto retry_private;goto retry; }//如果当期uaddr指向的值不等于val,即说明其他进程修改了//uaddr指向的值,等待条件不再成立,不用阻塞直接返回if (uval != val) {//释放锁queue_unlock(*hb); ret =-EWOULDBLOCK; }return ret;}
utex_wake(u32 __user *uaddr, unsigned int flags, int nr_wake, u32 bitset){ struct futex_hash_bucket *hb; struct futex_q *this,*next; union futex_key key =FUTEX_KEY_INIT; int ret;DEFINE_WAKE_Q(wake_q);if (!bitset)return-EINVAL;//根据uaddr的值填充&key的内容 ret =get_futex_key(uaddr, flags &FLAGS_SHARED,&key,FUTEX_READ);if (unlikely(ret !=0))return ret;//根据&key获得对应uaddr所在的futex_hash_bucket hb =hash_futex(&key);/* Make sure we really have tasks to wakeup */if (!hb_waiters_pending(hb))return ret;spin_lock(&hb->lock);//遍历该hb的链表,注意链表中存储的节点是plist_node类型,而而这里的this却是futex_q类型,这种类型转换是通过c中的container_of机制实现的plist_for_each_entry_safe(this, next,&hb->chain, list) {if (match_futex (&this->key,&key)) {if (this->pi_state ||this->rt_waiter) { ret =-EINVAL;break; }/* Check if one of the bits is set in both bitsets */if (!(this->bitset & bitset))continue;mark_wake_futex(&wake_q,this);if (++ret >= nr_wake)break; } }spin_unlock(&hb->lock);//唤醒对应进程wake_up_q(&wake_q);return ret;}