5

JVM同步方法之偏向锁

 3 years ago
source link: https://zhuanlan.zhihu.com/p/34662715
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

JVM同步方法之偏向锁

中华儿女多奇志,不爱无码爱代码

其实很早之前通过一些资料,就对偏向锁稍微有些了解,周六准备看看Hotspot中关于偏向锁的实现,本以为应该畅通无阻,没想到处处都是拦路虎,细节比较多,真是硬着头皮看了一整天,才大概懂了点。笔者还在不断学习,只是想把自己的笔记分享出来,理解能力有限,可能有不正确的地方,还望指正,别让我误导了他人 。

一:锁的表示

Java里的锁,主要都是对对象进行加锁,如普通的synchronized非静态方法,就是对当前执行方法的对象进行加锁。那么怎么对对象进行加锁呢?对象的锁其实主要就是通过对象头的markOop进行表示的。markOop其实不是一个对象,只是一个字长的数据,在32为机器上,markOop为32个位,在64位上为64个位。markOop中不同的位区域存储着不同的信息,但是需要注意的一点是,markOop每个位区域表示的信息不是一定的,在不同状态下,markWord中存着不同的信息。接下来盗图一张:

v2-9e2974141f1be5d88523a1aff1eacb41_720w.jpg

由上图可知在markWord在对象的不同状态下,会有5种表示形式。

二:何为偏向锁

很多情况下,一个锁对象并不会发生被多个线程访问得情况,更多是被同一个线程进行访问,如果一个锁对象每次都被同一个线程访问,根本没有发生并发,但是每次都进行加锁,那岂不是非常耗费性能。所以偏向锁就被设计出来了。

偏向,也可以理解为偏心。当锁对象第一次被某个线程访问时,它会在其对象头的markOop中记录该线程ID,那么下次该线程再次访问它时,就不需要进行加锁了。但是这中间只要发生了其他线程访问该锁对象的情况,证明这个对象会发生并发,就不能对这个对象再使用偏向锁了,会进行锁的升级,这是后话,我们这里还是主要讨论下偏向锁。

三:源码探究

我们就以synchronized方法为入口吧。

之前在《JVM方法执行的来龙去脉》中提到过,JVM执行方法最后会以对应的entry_point例程作为入口。entry_point例程不仅会进行java方法栈帧的创建,如果是同步方法,还会进行加锁:

address TemplateInterpreterGenerator::generate_normal_entry(bool synchronized) {
  ......
  if (synchronized) {
    // Allocate monitor and lock method
    lock_method();
  } else {
    ......
  }
  // 下面会开始执行方法的字节码
  ......
 
}

可见在执行方法的字节码之前,对于同步方法,entry_point例程插入了一道关卡:lock_method():

void TemplateInterpreterGenerator::lock_method() {
  
  .......
  // get synchronization object
  {
    Label done;
    __ movl(rax, access_flags);
    __ testl(rax, JVM_ACC_STATIC);
    // get receiver (assume this is frequent case)
    // 局部变量表中第一个变量,存放着即将锁的对象指针,移动到rax中
    __ movptr(rax, Address(rlocals, Interpreter::local_offset_in_bytes(0)));
    __ jcc(Assembler::zero, done);
    __ load_mirror(rax, rbx);

    __ bind(done);
  }

  // add space for monitor & lock
  // 在当前栈帧中分配一个空间,用于分配一个BasicObjectLock对象
  __ subptr(rsp, entry_size); // add space for a monitor entry
  __ movptr(monitor_block_top, rsp);  // set new monitor block top
  // store object
  // 将要锁的对象指针移动到分配的BasicObjectLock中的obj变量
  __ movptr(Address(rsp, BasicObjectLock::obj_offset_in_bytes()), rax);
  const Register lockreg = NOT_LP64(rdx) LP64_ONLY(c_rarg1);
  // 将分配的BasicObjectLock的指针移动到lockreg寄存器中
  __ movptr(lockreg, rsp); // object address
  // 加锁
  __ lock_object(lockreg);
}

在上面的lock_method()中,会在当前方法栈帧中分配一段空间,用于分配一个BasicObjectLock对象,这个对象主要干两件事,一是记录将要锁的对象指针,而是用一个字长的空间,复制锁对象的markOop。现在我们可能不知道这么做是为什么,但是后面就会清楚了。主要上面最后一步,调用了lock_object()进行加锁:

void InterpreterMacroAssembler::lock_object(Register lock_reg) {
  ......
  // 如果使用重量级锁,直接进入InterpreterRuntime::monitorenter()执行
  if (UseHeavyMonitors) {
    call_VM(noreg,
            CAST_FROM_FN_PTR(address, InterpreterRuntime::monitorenter),
            lock_reg);
  } else {
    Label done;
    // cmpxchg其实就是CAS操作,必须使用rax寄存器作为老数据的存储。
    const Register swap_reg = rax; // Must use rax for cmpxchg instruction
    const Register tmp_reg = rbx; // Will be passed to biased_locking_enter to avoid a problematic case where tmp_reg = no_reg.
    const Register obj_reg = LP64_ONLY(c_rarg3) NOT_LP64(rcx); // Will contain the oop

    ......
    Label slow_case;

    // Load object pointer into obj_reg
    movptr(obj_reg, Address(lock_reg, obj_offset));
    //如果虚拟机参数允许使用偏向锁,那么进入biased_locking_enter()中
    if (UseBiasedLocking) {
      // lock_reg :存储的是分配的BasicObjectLock的指针
      // obj_reg :存储的是锁对象的指针
      // slow_case :即InterpreterRuntime::monitorenter();
      // done :标志着获取锁成功。
      // slow_case 和 done 也被传入,这样在biased_locking_enter()中,就可以根据情况跳到这两处了。
      biased_locking_enter(lock_reg, obj_reg, swap_reg, tmp_reg, false, done, &slow_case);
    }
    ......
    ......

    // 直接跳到这,需要进入InterpreterRuntime::monitorenter()中去获取锁。
    bind(slow_case);
    // Call the runtime routine for slow case
    call_VM(noreg,
            CAST_FROM_FN_PTR(address, InterpreterRuntime::monitorenter),
            lock_reg);
    // 直接跳到这表明获取锁成功,接下来就会返回到entry_point例程进行字节码的执行了。
    bind(done);
  }
}

上面可知,如果虚拟机参数允许使用偏向锁,那么会进入biased_locking_enter()中,biased_locking_enter()这个方法涉及到了很多细节,说实话在不了解这些细节的情况下直接看代码,简直是一头雾水。接下来还是一边看代码一边去讲解细节吧。

四:偏向锁的获取

biased_locking_enter()也比较长,就不直接贴方法块了,一步步分析比较好。

1:判断锁对象是否为偏向锁状态

  // mark_addr:锁对象头中的markOop指针。
  Address mark_addr      (obj_reg, oopDesc::mark_offset_in_bytes());
  NOT_LP64( Address saved_mark_addr(lock_reg, 0); )

  if (PrintBiasedLockingStatistics && counters == NULL) {
    counters = BiasedLocking::counters();
  }

  Label cas_label;
  int null_check_offset = -1;
  // 如果swap_reg中没存mark_addr,那么就先将mark_addr存入swap_reg中。
  if (!swap_reg_contains_mark) {
    null_check_offset = offset();
    movptr(swap_reg, mark_addr);
  }
  // 将对象的mark_addr,即markOop指针移入tmp_reg中
  movptr(tmp_reg, swap_reg);
  // 将tmp_reg和biased_lock_mask_in_place进行与操作,biased_lock_mask_in_place为111,和它进行与就可以取出markOop中后三位,即(是否偏向锁+锁标志位)
  andptr(tmp_reg, markOopDesc::biased_lock_mask_in_place);
  // 将上面结果,即(是否偏向锁+锁标志位)和biased_lock_pattern再次比较(biased_lock_pattern为5,即101),如果不相等,则表明不为偏向锁状态,需要进行CAS操作,跳往cas_label;否则即为偏向锁状态,接着往下走。
  cmpptr(tmp_reg, markOopDesc::biased_lock_pattern);
  jcc(Assembler::notEqual, cas_label);

2:走到这,表明锁对象已经为偏向锁态,需要判断锁对象之前是否已经偏向当前线程。

  // 将锁对象所属类的prototype_header移动至tmp_reg中,prototype_header中存储的也是markOop。
  // prototype_header是专门为偏向锁打造的,初始时类的prototype_header为偏向锁态,即后三位为101,一旦发生了bulk_revoke,那么就会设为无锁态,即001。
  // bulk_revoke为批量撤销,每次类发生bulk_rebais时(类的所有对象重设偏向锁),类prototype_header中的epoch就会+1,当epoch达到一个阈值时,就会发生bulk_revoke,撤销该类每个对象的偏向锁,这样该类的所有对象以后都不能使用偏向锁了,其实也就是虚拟机认为该对象不适合偏向锁。
  load_prototype_header(tmp_reg, obj_reg);

  // 将当前线程id和类的prototype_header相或,这样得到的markOop为(当前线程id + prototype_header中的(epoch + 分代年龄 + 偏向锁标志 + 锁标志位)),注意prototype_header的分代年龄那4个字节为0
  orptr(tmp_reg, r15_thread);
  // 将上面计算得到的结果与锁对象的markOop进行异或,tmp_reg中相等的位全部被置为0,只剩下不相等的位。
  xorptr(tmp_reg, swap_reg);
  Register header_reg = tmp_reg;
  // 对((int) markOopDesc::age_mask_in_place)进行按位取反,age_mask_in_place为...0001111000,取反后,变成了...1110000111,除了分代年龄那4位,其他位全为1;
  // 将取反后的结果再与header_reg相与,这样就把header_reg中除了分代年龄之外的其他位取了出来,即将上面异或得到的结果中分代年龄给忽略掉。
  andptr(header_reg, ~((int) markOopDesc::age_mask_in_place));
  // 如果除了分代年龄,对象的markOop和(当前线程id+其他位)相等,那么上面与操作的结果应该为0,表明对象之前已经偏向当前线程,即markOop中存放有当前线程id,那么跳到done处,直接进入方法执行即可;否则表明当前线程还不是偏向锁的持有者,会接着往下走。
  jcc(Assembler::equal, done);

3:走到这,表明锁对象并没有偏向当前线程,接下来判断是否需要撤销锁对象的偏向。

  // 将header_reg和111相与,如果结果不为0,则表明header_reg后三位存在不为0的位,证明之前进行异或时,类的prototype_header后面三位与对象markOop的后三位不相等,但是能走到这,表明对象markword后三位为101,即偏向模式。现在类的prototype_header和对象markOop后三位不相等,即对象所属类不再支持偏向,发生了bulk_revoke,所以需要对当前对象进行偏向锁的撤销;否则表明目前该类还支持偏向锁,接着往下走。
  testptr(header_reg, markOopDesc::biased_lock_mask_in_place);
  jccb(Assembler::notZero, try_revoke_bias);

4:走到这,表明锁对象还支持偏向锁,需要判断当前对象的epoch是否合法,如果不合法,需要取进行重偏向。合法的话接着往下走。

  // 测试对象所属类的prototype_header中epoch是否为0,不为0的话则表明之前异或时,类的prototype_header中epoch和对象markOop的epoch不相等,表明类在对象分配后发生过bulk_rebais()(前面提到过,每次发生bulk_rebaise,类的prototype header中的epoch都会+1),所以之前对象的偏向就无效了,需要进行重偏向。否则接着往下走。
  testptr(header_reg, markOopDesc::epoch_mask_in_place);
  jccb(Assembler::notZero, try_rebias);

5:走到这,表明锁对象的偏向态合法,可以尝试去获取锁,使对象偏向当前线程。

  // 取出对象markOop中除线程id之外的其他位
  andptr(swap_reg,
         markOopDesc::biased_lock_mask_in_place | markOopDesc::age_mask_in_place | markOopDesc::epoch_mask_in_place);
  // 将其他位移动至 tmp_reg。
  movptr(tmp_reg, swap_reg);
  // 将其他位和当前线程id进行或,构造成一个新的完整的32位markOop,存入tmp_reg中。新的markOop因为保存了当前线程id,所以会偏向当前线程。
  orptr(tmp_reg, r15_thread);
  // 尝试利用CAS操作将新构成的markOop存入对象头的mark_addr处,如果设置成功,则获取偏向锁成功。
  // 这里说明下,cmpxchgptr操作会强制将rax寄存器(swap_reg)中内容作为老数据,与第二个参数,在这里即mark_addr处的内容进行比较,如果相等,则将第一个参数的内容,即tmp_reg中的新数据,存入mark_addr。
  cmpxchgptr(tmp_reg, mark_addr); // compare tmp_reg and swap_reg
  // 上面CAS操作失败的情况下,表明对象头中的markOop数据已经被篡改,即有其他线程已经获取到偏向锁,因为偏向锁不容许多个线程访问同一个锁对象,所以需要跳到slow_case处,去撤销该对象的偏向锁,并进行锁升级。
  if (slow_case != NULL) {
    jcc(Assembler::notZero, *slow_case);
  }
  // 上面CAS成功的情况下,直接就跳往done处,回去执行方法的字节码了。
  jmp(done);

6:其实到这里,biased_locking_enter()已经结束了,不过上面多处提到了try_rebais和try_revoke,这两个其实就是汇编里的标号,它们对应的代码也定义在biased_locking_enter中。

  bind(try_rebias);
  // 将锁对象所属类的prototype_header送入tmp_reg。
  load_prototype_header(tmp_reg, obj_reg);
  // 尝试用CAS操作,使对象的markOop重置为无线程id的偏向锁态,即不偏向任何线程。
  cmpxchgptr(tmp_reg, mark_addr); 
  // 和第5步一样,如果CAS失败,则表明对象头的markOop数据已经被其他线程更改,需要跳往slow_case进行撤销偏向锁,否则跳往done处,执行字节码。
  if (slow_case != NULL) {
    jcc(Assembler::notZero, *slow_case);
  }
  jmp(done);
  bind(try_revoke_bias);
  // 走到这,表明这个类的prototype_header中已经没有偏向锁的位了,即这个类的所有对象都不再支持偏向锁了,但是当前对象仍为偏向锁状态,所以我们需要重置下当前对象的markOop为无锁态。
  // 将锁对象所属类的prototype_header送入tmp_reg。
  load_prototype_header(tmp_reg, obj_reg);
  // 尝试用CAS操作,使对象的markOop重置为无锁态。这里是否失败无所谓,即使失败了,也表明其他线程已经移除了对象的偏向锁标志。
  cmpxchgptr(tmp_reg, mark_addr); 
  //接下来会回到lock_object()方法中,继续轻量级锁的获取。

上面根据同步方法讲了一下偏向锁,笔者在这上面也啃了差不多整个周六,原理看似很简单,但是在很多细节不清楚的情况下去看源码,尤其是这种全是汇编代码时,往往是一脸懵逼。而且HotSpot用一个并不是对象的markOop去表示锁,涉及到计算时更让人糊涂。如果大家只是想稍微了解下原理,建议还是不要太深入源码细节。。。。


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK