引言

CAS(Compare And Swap)比较并交换是JUC并发编程中最为重要的一个工具。它在处理并发问题时提供了一个非阻塞的解决方案,引入了一种全新的并发编程思维——乐观锁。这种思想预设所有线程在执行过程中都不会发生冲突,每一个线程都会乐观地认为自己能够成功执行,从而大大降低了线程之间的等待和阻塞,极大地提高了系统的并发性能。

CAS操作的引入,使得我们能够实现一系列复杂的并发数据结构,如Atomic类、并发容器、线程池等,并且它在实现无锁数据结构和算法时也是不可或缺的工具。CAS通过使用硬件级别的原子性操作,保证了多个线程并发修改共享变量时,不会出现数据不一致的问题,为开发者构建线程安全的并发程序提供了强大的保证。

自旋锁是CAS的一个经典应用,其核心思想就是当一个线程试图获取锁时,如果锁已经被其他线程占用,那么该线程将在一个循环中不断地尝试获取锁,直到成功为止。这种方式避免了线程挂起和唤醒带来的高昂代价,是一种低延迟的锁策略。在并发量不高,锁持有时间较短的场景下,自旋锁可以带来很好的性能提升。

在本期文章中,我将以JDK17u源码为例从JDK底层的角度来(C/C++)深入探讨CAS的原理以及通过CAS实现的自旋锁功能。

CAS操作

CAS操作通常被认为是一种乐观锁,它不像悲观锁那样预先组织其他线程的访问,而是先进行尝试性操作,如果没有其他线程修改过数据就算操作成功,否则操作失败。更具体的说,CAS操作包括以下三个步骤:

  1. 获取当前值

  2. 计算新的值

  3. 比较当前值是否未被修改,如果是则将新值写入并返回true,否则返回false

溯源分析

在JUC的很多并发工具中都使用到了CAS,如:ReentrantLock、AbstractQueuedSynchronizer、CountDownLatch等。在AQS中是通过compareAndSetState方法来实现CAS功能的:

private static final Unsafe U = Unsafe.getUnsafe();
protected final boolean compareAndSetState(int expect, int update) {
	return U.compareAndSetInt(this, STATE, expect, update);
}

不难看出compareAndSetState方法间接地调用了来自Unsafe类中的compareAndSetInt方法:

@IntrinsicCandidate  
public final native boolean compareAndSetInt(Object var1, long var2, int var4, int var5);

compareAndSetInt方法是一种JNI技术,是通过调用C/C++编译生成的dll/so库来实现的。那么其底层实现原理就是存在于原始的C/C++文件中。如果读者阅读过Unsafe类的源码,会发现其中大部分方法都是通过native实现的,它们统一在JDK源码的src\hotspot\share\prims\unsafe.cpp文件中被实,聚焦到compareAndSetInt方法上:

UNSAFE_ENTRY(jboolean, Unsafe_CompareAndSetInt(JNIEnv *env, jobject unsafe, jobject obj, jlong offset, jint e, jint x)) {
	oop p = JNIHandles::resolve(obj);
	if (p == NULL) {
		volatile jint* addr = (volatile  jint*)index_oop_from_field_offset_long(p, offset);
		return RawAccess<>::atomic_cmpxchg(addr, e, x) == e;
	} else {
		assert_field_offset_sane(p, offset);
		return HeapAccess<>::atomic_cmpxchg_at(p, (ptrdiff_t)offset, e, x) == e;
	}
} UNSAFE_END

UNSAFE_ENTRYUNSAFE_END进行宏展开可以使得源码更加清晰(源码中的注释详细地阐述了CAS发生的完整过程):

// Unsafe类中compareAndSetInt方法的本地实现,CAS是一种轻量级锁
extern "C" {
	jboolean JNICALL Unsafe_CompareAndSetInt(JNIEnv *env, jobject unsafe, jobject obj, jlong offset, jint e, jint x) {
		// 从JNI环境中获取Java线程
		JavaThread* thread = JavaThread::thread_from_jni_environment(env);
		// 在ARM上启用W^X(写时不执行)内存保护策略,确保一块内存区域在任意时刻要么是可写的(Writable),要么是可执行的(eXecutable),不允许同时具备这两种属性
		MACOS_AARCH64_ONLY(ThreadWXEnable __wx(WXWrite, thread));
		// 将线程状态从"在本地代码中"(JNI)更改为"在VM中"
		ThreadInVMfromNative __tiv(thread);
		// 检查和跟踪VM中的本地方法调用
		debug_only(VMNativeEntryWrapper __vew;)
		// 清理在执行本地方法期间创建的JNI句柄,避免内存泄漏
		HandleMarkCleaner __hm(thread);
		// 用于异常宏的异常处理
		JavaThread* THREAD = thread;
		// 验证当前线程的堆栈对齐是否正确,确保堆栈布局保持一致
		os::verify_stack_alignment();  
		{
			// 函数解析传入的Java对象obj(AQS对象)
			oop p = JNIHandles::resolve(obj);
			if (p == NULL) {
				// p对象为空根据offset计算被操作数的内存地址
				volatile jint* addr = (volatile jint*)index_oop_from_field_offset_long(p, offset);
				// 使用cmpxchg指令执行CAS操作,将addr中的期望值e设置为新值x
				return RawAccess<>::atomic_cmpxchg(addr, e, x) == e;
			} else {
				// p对象不为空先检查字段的偏移量是否合理
				assert_field_offset_sane(p, offset);
				// 在堆上操作对象p,将p偏移offset地址中的期望值e设置为新值x
				return HeapAccess<>::atomic_cmpxchg_at(p, (ptrdiff_t)offset, e, x) == e;
			}
		}
	}
}

在这个被宏展开的CAS方法的尽头,是RawAccess<>::atomic_cmpxchgHeapAccess<>::atomic_cmpxchg_at函数,虽然它们是解决不同的情况下的CAS但它们的本质都是执行了CPU层面的cmpxchg指令

atomic_cmpxchg

这段cpp代码的溯源分析中省略了大部分的操作系统和计算机环境的判断过程,这里只以x86架构下的cmpxchg指令的调用过为例进行分析

  1. 如果继续对RawAccess<>::atomic_cmpxchg函数深入探索就会来到src\hotspot\share\oops\access.hpp中的atomic_cmpxchg函数

  2. access.hpp中的atomic_cmpxchg函数又调用了src\hotspot\share\oops\accessBackend.hpp中的atomic_cmpxchg函数

  3. accessBackend.hpp中的atomic_cmpxchg函数返回了同文件下的atomic_cmpxchg_reduce_types函数

  4. accessBackend.hpp中的atomic_cmpxchg_reduce_types函数通过一系列条件和环境判断会调用到src\hotspot\share\oops\accessBackend.inline.hpp中的atomic_cmpxchg_internal函数

  5. accessBackend.inline.hpp中的atomic_cmpxchg_internal函数最终会调用到来自src\hotspot\share\runtime\atomic.hpp中的cmpxchg函数

  6. atomic.hpp中的cmpxchg函数返回了同文件下的PlatformCmpxchg结构体中的operator函数

  7. atomic.hpp中的operator函数最终在src\hotspot\os_cpu\bsd_x86\atomic_bsd_x86.hpp中被函数operator实现

  8. atomic_bsd_x86.hpp中的函数operator最终使用内联汇编块来调用了CPU层面的cmpxchg指令

template<>
template<typename  T>
inline T Atomic::PlatformCmpxchg<4>::operator()(T volatile* dest, 
												T compare_value, 
												T exchange_value, 
												atomic_memory_order /* order */) const {
	STATIC_ASSERT(4 == sizeof(T));
	__asm__ volatile ( "lock cmpxchgl %1,(%3)"
					  : "=a" (exchange_value)
					  : "r" (exchange_value), "a" (compare_value), "r" (dest)
					  : "cc", "memory");
	return exchange_value;
}

cmpxchg

上述的cmpxchgl内联汇编实现了一个在多核或多线程环境下使用的原子操作:比较和交换(Compare and Swap, CAS)。这是一条X86架构下的CAS指令,lock cmpxchgl %1, (%3),它的作用是如果指定内存位置(%3)的值等于eax寄存器(也就是比较值compare_value),那么将这个内存位置的值设为%1(也就是exchange_value)。如果内存位置的值不等于eax,那么就不做任何修改。无论是否修改,最后eax都会被设置为内存位置原本的值。

  • "lock cmpxchgl %1, (%3)"lock前缀是一个指示符,表示后面的指令应当被原子地执行,即在执行完整个指令前,不会被其他指令打断或重排序。cmpxchgl是一个X86指令,表示比较并交换(Compare and Swap)。%1(%3)分别表示输入的第一个和第三个参数,即exchange_value和内存位置dest

  • : "=a" (exchange_value):是输出部分,"=a"表示将结果(eax寄存器的值)赋给exchange_value

  • : "r" (exchange_value), "a" (compare_value), "r" (dest):是输入部分,"r"表示一个通用寄存器,"a"表示eax寄存器。它们分别对应于exchange_valuecompare_valuedest

  • : "cc", "memory"是clobber部分,它告诉编译器这段汇编代码可能会改变哪些资源。"cc"表示这段代码可能会修改条件代码寄存器,"memory"表示可能会修改内存

综上所述,这段内联汇编实现了一个原子的CAS操作,其语义是:如果内存位置dest的值等于compare_value,那么将其设为exchange_value,并将操作前dest位置的值赋给exchange_value

自旋锁

自旋锁是一种锁定机制,当一个线程尝试获取一个已经被其他线程持有的锁时,该线程将循环(即"自旋")并且不断地检查锁是否已经被释放。自旋锁是一种忙等(busy waiting)锁,它不会让出CPU,而是一直等待直到能够获取锁。

自旋锁中的循环是通过CAS来实现的,这也是CAS最常见的使用场景之一。自旋锁会持续地尝试获取锁,直到成功为止。在这个过程中,它会使用CAS操作来检查锁的状态:如果锁当前未被占用(即锁的状态为“未锁定”),那么就尝试使用CAS操作来锁定它。如果CAS操作成功,那么就说明成功获得了锁;否则,就说明锁已经被其他线程获取,自旋锁就会继续自旋,再次尝试获取锁。这个过程会一直持续,直到成功获取到锁。

自旋锁虽然是一种轻量级锁,但是大量的自旋锁在不断忙等的过程中会消耗大量的CPU时间。通常情况下,自旋锁会设置一个自旋限制,当自旋次数超过一定限制后,锁将会让出CPU,进入阻塞状态,这样可以避免无尽的自旋浪费CPU资源。通过AtomicReference原子类提供的CAS方法实现一个简单的具备自旋限制的自旋锁:

public class SpinLock {
	private AtomicReference<Thread> owner = new AtomicReference<>();
	private static final int SPIN_LIMIT = 1000; // 最大尝试自旋的次数
	public void lock() {
		Thread currentThread = Thread.currentThread();
		int spinCount = 0; // 初始自旋次数
		while (!owner.compareAndSet(null, currentThread)) {
			// 检查自旋限制
			if (++spinCount > SPIN_LIMIT) {
				// 自旋次数超过限制,让出CPU,进入阻塞状态
				try {
					Thread.sleep(1); // 暂停1毫秒
				} catch (InterruptedException e) {
					throw new RuntimeException("Thread interrupted while trying to get lock", e);
				}
				spinCount = 0; // 重置自旋次数
			}
		}
	}
	public void unlock() {
		Thread currentThread = Thread.currentThread();
		if (!owner.compareAndSet(currentThread, null)) {
			throw new IllegalMonitorStateException("Attempted to unlock a lock not owned by the current thread");
		}
	}
}

在这个例子中,自旋锁在尝试获取锁时,会检查自己的自旋次数。如果自旋次数超过了设定的限制,那么当前线程会让出CPU(通过调用Thread.sleep(1)),然后在唤醒后再次尝试获取锁。这种方法可以在一定程度上降低大量线程自旋等待带来的CPU资源浪费。

回顾

综上所述,我对CAS的理解是,它是比较并交换Compare-And-Swap的意思,是一种原子操作的乐观锁,用于在多线程环境中实现无锁同步。

CAS操作通常需要传入一个内存地址、期望值和新值。当内存地址的当前值与期望值相同时,CAS操作将内存地址的值更新为新值,并返回true。若更新失败,则返回false。在Java中,CAS操作主要通过Unsafe类提供的native本地方法实现,JDK底层通过C++函数“Unsafe_CompareAndSetInt”来实现这些方法。

在操作系统层面,CAS操作通常由硬件指令“x86架构下的cmpxchg”实现,保证了原子性。在实际应用中,如果CAS操作失败,通常会进行自旋(spin)重试,即反复尝试CAS操作直至成功。这种自旋重试策略在Java并发工具类:ReentrantLock、AQS、CountDownLatch等中被广泛应用。

虽然CAS操作可以减少锁的开销,但在线程竞争较为激烈的场景下,自旋重试可能导致性能下降。

参考文献

[1] Stone, J. M. (1990). A simple and correct shared-queue algorithm using compare-and-swap. In Proceedings of the 1990 ACM/IEEE conference on Supercomputing (Supercomputing '90) (pp. 495–504). Washington, DC, USA: IEEE Computer Society Press.
[2] Valois, J. D. (1995). Lock-free linked lists using compare-and-swap. In Proceedings of the fourteenth annual ACM symposium on Principles of distributed computing (PODC '95) (pp. 214–222). New York, NY, USA: Association for Computing Machinery. https://doi.org/10.1145/224964.224988
[3] old程序员. (2020). 并发编程的灵魂:CAS机制详解. Retrieved May 18, 2023, from https://zhuanlan.zhihu.com/p/101430930