副标题[/!--empirenews.page--]
ReentrantLock的实现网上有很多文章了,本篇文章会简单介绍下其java层实现,重点放在分析竞争锁失败后如何阻塞线程。 因篇幅有限,synchronized的内容将会放到下篇文章。

Java Lock的实现
ReentrantLock是jdk中常用的锁实现,其实现逻辑主语基于AQS(juc包中的大多数同步类实现都是基于AQS);接下来会简单介绍AQS的大致原理,关于其实现细节以及各种应用,之后会写一篇文章具体分析。
AQS
AQS是类AbstractQueuedSynchronizer.java的简称,JUC包下的ReentrantLock、CyclicBarrier、CountdownLatch都使用到了AQS。
其大致原理如下:
- AQS维护一个叫做state的int型变量和一个双向链表,state用来表示同步状态,双向链表存储的是等待锁的线程
- 加锁时首先调用tryAcquire尝试获得锁,如果获得锁失败,则将线程插入到双向链表中,并调用LockSupport.park()方法阻塞当前线程。
- 释放锁时调用LockSupport.unpark()唤起链表中的第一个节点的线程。被唤起的线程会重新走一遍竞争锁的流程。
其中tryAcquire方法是抽象方法,具体实现取决于实现类,我们常说的公平锁和非公平锁的区别就在于该方法的实现。
ReentrantLock
ReentrantLock分为公平锁和非公平锁,我们只看公平锁。 ReentrantLock.lock会调用到ReentrantLock#FairSync.lock中:
FairSync.java
- static final class FairSync extends Sync {
- final void lock() {
- acquire(1);
- }
- /**
- * Fair version of tryAcquire. Don't grant access unless
- * recursive call or no waiters or is first.
- */
- protected final boolean tryAcquire(int acquires) {
- final Thread current = Thread.currentThread();
- int c = getState();
- if (c == 0) {
- if (!hasQueuedPredecessors() &&
- compareAndSetState(0, acquires)) {
- setExclusiveOwnerThread(current);
- return true;
- }
- }
- else if (current == getExclusiveOwnerThread()) {
- int nextc = c + acquires;
- if (nextc < 0)
- throw new Error("Maximum lock count exceeded");
- setState(nextc);
- return true;
- }
- return false;
- }
- }
AbstractQueuedSynchronizer.java
- public final void acquire(int arg) {
- if (!tryAcquire(arg) &&
- acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
- selfInterrupt();
- }
可以看到FairSync.lock调用了AQS的acquire方法,而在acquire中首先调用tryAcquire尝试获得锁,以下两种情况返回true:
- state==0(代表没有线程持有锁),且等待队列为空(公平的实现),且cas修改state成功。
- 当前线程已经获得了锁,这次调用是重入
如果tryAcquire失败则调用acquireQueued阻塞当前线程。acquireQueued最终会调用到LockSupport.park()阻塞线程。
LockSupport.park
个人认为,要深入理解锁机制,一个很重要的点是理解系统是如何阻塞线程的。
LockSupport.java
- public static void park(Object blocker) {
- Thread t = Thread.currentThread();
- setBlocker(t, blocker);
- UNSAFE.park(false, 0L);
- setBlocker(t, null);
- }
park方法的参数blocker是用于负责这次阻塞的同步对象,在AQS的调用中,这个对象就是AQS本身。我们知道synchronized关键字是需要指定一个对象的(如果作用于方法上则是当前对象或当前类),与之类似blocker就是LockSupport指定的对象。
park方法调用了native方法UNSAFE.park,第一个参数代表第二个参数是否是绝对时间,第二个参数代表最长阻塞时间。
其实现如下,只保留核心代码,完整代码看查看unsafe.cpp
- Unsafe_Park(JNIEnv *env, jobject unsafe, jboolean isAbsolute, jlong time){
- ...
- thread->parker()->park(isAbsolute != 0, time);
- ...
- }
park方法在os_linux.cpp中(其他操作系统的实现在os_xxx中)
- void Parker::park(bool isAbsolute, jlong time) {
- ...
- //获得当前线程
- Thread* thread = Thread::current();
- assert(thread->is_Java_thread(), "Must be JavaThread");
- JavaThread *jt = (JavaThread *)thread;
- //如果当前线程被设置了interrupted标记,则直接返回
- if (Thread::is_interrupted(thread, false)) {
- return;
- }
- if (time > 0) {
- //unpacktime中根据isAbsolute的值来填充absTime结构体,isAbsolute为true时,time代表绝对时间且单位是毫秒,否则time是相对时间且单位是纳秒
- //absTime.tvsec代表了对于时间的秒
- //absTime.tv_nsec代表对应时间的纳秒
- unpackTime(&absTime, isAbsolute, time);
- }
- //调用mutex trylock方法
- if (Thread::is_interrupted(thread, false) || pthread_mutex_trylock(_mutex) != 0) {
- return;
- }
- //_counter是一个许可的数量,跟ReentrantLock里定义的许可变量基本都是一个原理。 unpack方法调用时会将_counter赋值为1。
- //_counter>0代表已经有人调用了unpark,所以不用阻塞
- int status ;
- if (_counter > 0) { // no wait needed
- _counter = 0;
- //释放mutex锁
- status = pthread_mutex_unlock(_mutex);
- return;
- }
- //设置线程状态为CONDVAR_WAIT
- OSThreadWaitState osts(thread->osthread(), false /* not Object.wait() */);
- ...
- //等待
- _cur_index = isAbsolute ? ABS_INDEX : REL_INDEX;
- pthread_cond_timedwait(&_cond[_cur_index], _mutex, &absTime);
- ...
- //释放mutex锁
- status = pthread_mutex_unlock(_mutex) ;
- }
(编辑:西安站长网)
【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!
|