Java并发-无锁策略CAS与atomic包
在高并发场景下,传统的锁机制虽然能保证线程安全,但可能带来性能瓶颈。今天要介绍一种无锁并发策略——Compare And Set
(比较并交换),简称CAS
。它通过一种乐观的比较交换技术来鉴别线程冲突,一旦检测到冲突发生,就重试当前操作直到成功为止。
# 一、CAS原理详解
# 1、什么是CAS
CAS
是一种无锁算法,它包含三个操作数:
- V(内存位置):需要读写的内存位置
- E(预期值):期望内存中的值
- N(新值):要写入的新值
CAS操作的逻辑是:当且仅当V的值等于E时,才将V的值设置为N,否则什么都不做。整个操作是原子的,不会被中断。
# 2、无锁的优势
相比传统锁机制,无锁策略具有以下特点:
- 高性能:避免了线程阻塞和上下文切换的开销
- 无死锁:不存在锁的获取和释放,天然免疫死锁问题
- 响应时间可预测:不会因为等待锁而产生不确定的延迟
- 更好的伸缩性:在高并发场景下性能优势更明显
# 3、CAS的底层实现
CAS
底层依赖于CPU的原子指令,如x86架构的CMPXCHG
指令。现代处理器都已支持这类原子指令。
在Java中,主要通过Unsafe
类中的compareAndSwap
系列方法来调用CAS指令:
// var1: 操作的对象
// var2: 对象中的字段偏移量
// var4: 期望值
// var5: 新值
public final native boolean compareAndSwapObject(Object var1, long var2, Object var4, Object var5);
public final native boolean compareAndSwapInt(Object var1, long var2, int var4, int var5);
public final native boolean compareAndSwapLong(Object var1, long var2, long var4, long var6);
# 4、CAS的问题与限制
虽然CAS很强大,但也存在一些问题:
- ABA问题:如果一个值原来是A,变成了B,又变成了A,那么使用CAS进行检查时会发现它的值没有发生变化,但实际上却变化了。
- 循环时间长开销大:自旋CAS如果长时间不成功,会给CPU带来非常大的执行开销。
- 只能保证一个共享变量的原子操作:对多个共享变量操作时,CAS无法保证操作的原子性。
# 二、atomic包详解
Java 1.5中引入了java.util.concurrent.atomic
包,提供了一系列原子操作类,它们都是基于CAS实现的无锁并发工具。
# 1、原子类分类
atomic包中的类可以分为以下几类:
基本类型原子类
AtomicBoolean
:原子布尔类型AtomicInteger
:原子整型AtomicLong
:原子长整型
数组类型原子类
AtomicIntegerArray
:原子整型数组AtomicLongArray
:原子长整型数组AtomicReferenceArray
:原子引用数组
引用类型原子类
AtomicReference
:原子引用AtomicStampedReference
:带版本号的原子引用(解决ABA问题)AtomicMarkableReference
:带标记的原子引用
字段更新器
AtomicIntegerFieldUpdater
:原子更新整型字段AtomicLongFieldUpdater
:原子更新长整型字段AtomicReferenceFieldUpdater
:原子更新引用字段
# 2、AtomicInteger使用示例
以AtomicInteger
为例,它提供了线程安全的整数操作:
public class AtomicIntegerExample {
// 使用AtomicInteger保证线程安全
private static AtomicInteger counter = new AtomicInteger(0);
public static void main(String[] args) throws InterruptedException {
// 创建多个线程并发操作
Thread t1 = new Thread(() -> {
for (int i = 0; i < 1000; i++) {
counter.incrementAndGet(); // 原子递增
}
});
Thread t2 = new Thread(() -> {
for (int i = 0; i < 1000; i++) {
counter.incrementAndGet();
}
});
t1.start();
t2.start();
t1.join();
t2.join();
System.out.println("Final counter value: " + counter.get()); // 结果总是2000
}
}
AtomicInteger
提供了丰富的原子操作方法:
// 常用方法示例
AtomicInteger ai = new AtomicInteger(10);
// 获取并增加
int oldValue1 = ai.getAndIncrement(); // 返回10,当前值变为11
// 增加并获取
int newValue1 = ai.incrementAndGet(); // 返回12,当前值变为12
// 获取并设置
int oldValue2 = ai.getAndSet(20); // 返回12,当前值变为20
// 比较并设置
boolean success = ai.compareAndSet(20, 30); // 期望值为20,更新为30,返回true
// 获取并累加
int oldValue3 = ai.getAndAdd(5); // 返回30,当前值变为35
// 累加并获取
int newValue2 = ai.addAndGet(5); // 返回40,当前值变为40
// Lambda表达式更新
int result = ai.updateAndGet(x -> x * 2); // 当前值变为80,返回80
# 3、源码分析
查看incrementAndGet
方法的源码,可以发现它调用的是Unsafe#getAndAddInt
方法:
public final int getAndAddInt(Object o, long offset, int delta) {
int v;
do {
v = getIntVolatile(o, offset); // 获取当前值
} while (!weakCompareAndSetInt(o, offset, v, v + delta)); // CAS更新
return v;
}
这里使用了自旋CAS:如果CAS失败,就不断重试,直到成功为止。
# 三、深入理解ABA问题
# 1、ABA问题示例
AtomicReference
可以保证你在修改对象引用时的线程安全性,但它存在ABA问题。来看一个具体的例子:
public class ABADemo {
static class Node {
public final String value;
public Node next;
public Node(String value) {
this.value = value;
}
}
public static void main(String[] args) throws InterruptedException {
Node nodeA = new Node("A");
Node nodeB = new Node("B");
AtomicReference<Node> atomicRef = new AtomicReference<>(nodeA);
// 线程1:读取nodeA,准备修改为nodeC
Thread thread1 = new Thread(() -> {
Node oldNode = atomicRef.get();
System.out.println("Thread1: 读取到节点 " + oldNode.value);
try {
Thread.sleep(1000); // 模拟处理时间
} catch (InterruptedException e) {
e.printStackTrace();
}
// 尝试CAS更新
Node nodeC = new Node("C");
boolean success = atomicRef.compareAndSet(oldNode, nodeC);
System.out.println("Thread1: CAS结果 = " + success);
});
// 线程2:将nodeA改为nodeB,再改回nodeA
Thread thread2 = new Thread(() -> {
atomicRef.compareAndSet(nodeA, nodeB);
System.out.println("Thread2: A -> B");
Node newNodeA = new Node("A"); // 新的nodeA对象
atomicRef.compareAndSet(nodeB, newNodeA);
System.out.println("Thread2: B -> A (新对象)");
});
thread1.start();
Thread.sleep(100); // 确保线程1先执行
thread2.start();
thread1.join();
thread2.join();
}
}
在这个例子中,线程1的CAS操作会成功,但实际上引用已经被修改过了(虽然值看起来一样)。
# 2、使用AtomicStampedReference解决ABA问题
AtomicStampedReference
通过引入版本号(stamp)来解决ABA问题:
public class AtomicStampedReferenceDemo {
public static void main(String[] args) throws InterruptedException {
String initialRef = "A";
int initialStamp = 0;
AtomicStampedReference<String> atomicStampedRef =
new AtomicStampedReference<>(initialRef, initialStamp);
// 线程1:尝试将A改为C
Thread thread1 = new Thread(() -> {
int stamp = atomicStampedRef.getStamp();
String reference = atomicStampedRef.getReference();
System.out.println("Thread1: 初始值=" + reference + ", stamp=" + stamp);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
// 使用期望的stamp进行CAS
boolean success = atomicStampedRef.compareAndSet(
reference, "C", stamp, stamp + 1);
System.out.println("Thread1: CAS结果=" + success);
});
// 线程2:A->B->A,但stamp会改变
Thread thread2 = new Thread(() -> {
int stamp = atomicStampedRef.getStamp();
atomicStampedRef.compareAndSet("A", "B", stamp, stamp + 1);
System.out.println("Thread2: A->B, 新stamp=" + (stamp + 1));
stamp = atomicStampedRef.getStamp();
atomicStampedRef.compareAndSet("B", "A", stamp, stamp + 1);
System.out.println("Thread2: B->A, 新stamp=" + (stamp + 1));
});
thread1.start();
Thread.sleep(100);
thread2.start();
thread1.join();
thread2.join();
System.out.println("最终值: " + atomicStampedRef.getReference() +
", stamp=" + atomicStampedRef.getStamp());
}
}
在这个例子中,即使值从A变成B又变回A,由于stamp(版本号)发生了变化,线程1的CAS操作会失败,从而避免了ABA问题。
# 四、高性能原子累加器:LongAdder
# 1、为什么需要LongAdder
AtomicLong
在高并发场景下存在性能问题。当多个线程同时更新同一个原子变量时,只有一个线程能成功,其他线程都会自旋重试,导致大量的CPU资源浪费在自旋上。
# 2、LongAdder的设计思想
LongAdder
采用了分段累加的思想(类似于ConcurrentHashMap
的分段锁):
- 热点分离:将一个变量分解为多个变量,让不同的线程去更新不同的变量
- 最终一致性:读取时将所有变量求和,得到最终结果
- 动态扩容:根据竞争情况动态调整内部数组大小
# 3、LongAdder内部结构
// LongAdder的核心组成
class LongAdder extends Striped64 {
// 继承自Striped64的主要字段:
// base: 基础值,在没有竞争时使用
// cells[]: Cell数组,存在竞争时使用
// cellsBusy: 自旋锁,用于cells数组的初始化和扩容
}
# 4、LongAdder工作原理
- 无竞争时:直接更新base值
- 低竞争时:如果CAS更新base失败,创建cells数组
- 高竞争时:不同线程更新cells数组的不同元素,减少冲突
# 5、性能对比示例
public class LongAdderVsAtomicLong {
private static final int THREAD_COUNT = 50;
private static final int INCREMENT_COUNT = 1000000;
public static void main(String[] args) throws InterruptedException {
// 测试AtomicLong
AtomicLong atomicLong = new AtomicLong(0);
long startTime = System.currentTimeMillis();
Thread[] threads1 = new Thread[THREAD_COUNT];
for (int i = 0; i < THREAD_COUNT; i++) {
threads1[i] = new Thread(() -> {
for (int j = 0; j < INCREMENT_COUNT; j++) {
atomicLong.incrementAndGet();
}
});
threads1[i].start();
}
for (Thread t : threads1) {
t.join();
}
long atomicTime = System.currentTimeMillis() - startTime;
System.out.println("AtomicLong result: " + atomicLong.get());
System.out.println("AtomicLong time: " + atomicTime + "ms");
// 测试LongAdder
LongAdder longAdder = new LongAdder();
startTime = System.currentTimeMillis();
Thread[] threads2 = new Thread[THREAD_COUNT];
for (int i = 0; i < THREAD_COUNT; i++) {
threads2[i] = new Thread(() -> {
for (int j = 0; j < INCREMENT_COUNT; j++) {
longAdder.increment();
}
});
threads2[i].start();
}
for (Thread t : threads2) {
t.join();
}
long adderTime = System.currentTimeMillis() - startTime;
System.out.println("LongAdder result: " + longAdder.sum());
System.out.println("LongAdder time: " + adderTime + "ms");
System.out.println("性能提升: " + (atomicTime * 100.0 / adderTime - 100) + "%");
}
}
# 6、伪共享问题与解决
LongAdder
通过@Contended
注解解决了伪共享问题:
// Cell类使用@Contended注解防止伪共享
@sun.misc.Contended
static final class Cell {
volatile long value;
Cell(long x) { value = x; }
}
要启用@Contended
注解,需要添加JVM参数:-XX:-RestrictContended
# 7、LongAccumulator:通用累加器
LongAccumulator
是LongAdder
的通用版本,支持自定义的累加函数:
public class LongAccumulatorExample {
public static void main(String[] args) throws InterruptedException {
// 使用max函数作为累加器
LongAccumulator maxAccumulator = new LongAccumulator(Long::max, Long.MIN_VALUE);
// 使用自定义函数:计算所有值的乘积
LongAccumulator productAccumulator = new LongAccumulator((x, y) -> x * y, 1);
Thread[] threads = new Thread[10];
for (int i = 0; i < 10; i++) {
final int threadId = i;
threads[i] = new Thread(() -> {
for (int j = 0; j < 100; j++) {
maxAccumulator.accumulate(threadId * 100 + j);
productAccumulator.accumulate(2);
}
});
threads[i].start();
}
for (Thread t : threads) {
t.join();
}
System.out.println("Max value: " + maxAccumulator.get()); // 输出最大值
System.out.println("Product: " + productAccumulator.get()); // 输出2^1000
}
}
# 8、使用场景建议
LongAdder适用场景:
- 高并发计数场景
- 统计信息收集
- 需要高吞吐量的累加操作
AtomicLong适用场景:
- 需要精确的实时值
- 低并发场景
- 需要其他原子操作(如
compareAndSet
)
# 五、实战案例:构建无锁计数器
让我们通过一个实际案例来综合运用所学知识,构建一个高性能的分布式计数器:
public class DistributedCounter {
// 使用LongAdder进行高性能计数
private final LongAdder totalCount = new LongAdder();
// 使用AtomicReference管理配置
private final AtomicReference<CounterConfig> config = new AtomicReference<>();
// 使用ConcurrentHashMap存储分类计数
private final ConcurrentHashMap<String, LongAdder> categoryCounters = new ConcurrentHashMap<>();
static class CounterConfig {
final boolean enableCategory;
final int threshold;
CounterConfig(boolean enableCategory, int threshold) {
this.enableCategory = enableCategory;
this.threshold = threshold;
}
}
public DistributedCounter() {
config.set(new CounterConfig(true, 1000));
}
public void increment(String category) {
// 总计数始终增加
totalCount.increment();
// 根据配置决定是否记录分类
CounterConfig currentConfig = config.get();
if (currentConfig.enableCategory) {
categoryCounters.computeIfAbsent(category, k -> new LongAdder())
.increment();
}
}
public long getTotal() {
return totalCount.sum();
}
public Map<String, Long> getCategoryStats() {
Map<String, Long> stats = new HashMap<>();
categoryCounters.forEach((k, v) -> stats.put(k, v.sum()));
return stats;
}
// 无锁更新配置
public boolean updateConfig(boolean enableCategory, int threshold) {
CounterConfig oldConfig = config.get();
CounterConfig newConfig = new CounterConfig(enableCategory, threshold);
return config.compareAndSet(oldConfig, newConfig);
}
public static void main(String[] args) throws InterruptedException {
DistributedCounter counter = new DistributedCounter();
int threadCount = 100;
int operationsPerThread = 10000;
Thread[] threads = new Thread[threadCount];
long startTime = System.currentTimeMillis();
for (int i = 0; i < threadCount; i++) {
final int threadId = i;
threads[i] = new Thread(() -> {
String category = "category-" + (threadId % 10);
for (int j = 0; j < operationsPerThread; j++) {
counter.increment(category);
}
});
threads[i].start();
}
for (Thread t : threads) {
t.join();
}
long endTime = System.currentTimeMillis();
System.out.println("总计数: " + counter.getTotal());
System.out.println("分类统计: " + counter.getCategoryStats());
System.out.println("耗时: " + (endTime - startTime) + "ms");
System.out.println("TPS: " + (threadCount * operationsPerThread * 1000L / (endTime - startTime)));
}
}
# 六、最佳实践与性能优化建议
# 1、选择合适的原子类
- 简单计数:使用
LongAdder
而不是AtomicLong
- 对象引用:优先使用
AtomicReference
- 避免ABA:使用
AtomicStampedReference
或AtomicMarkableReference
# 2、减少竞争的技巧
// 不好的做法:所有线程竞争同一个原子变量
private AtomicLong counter = new AtomicLong();
// 好的做法:使用ThreadLocal减少竞争
private ThreadLocal<AtomicLong> localCounter =
ThreadLocal.withInitial(AtomicLong::new);
# 3、合理使用自旋
// 限制自旋次数,避免CPU空转
public boolean updateWithRetry(AtomicInteger atomic, int expect, int update) {
int retries = 100;
while (retries-- > 0) {
if (atomic.compareAndSet(expect, update)) {
return true;
}
// 可以加入退避策略
Thread.yield();
}
return false;
}
# 4、批量操作优化
// 批量更新时,先收集再一次性更新
public void batchUpdate(LongAdder adder, List<Long> values) {
long sum = values.stream().mapToLong(Long::longValue).sum();
adder.add(sum); // 一次更新,减少竞争
}
# 七、总结
本文深入探讨了Java中的无锁并发策略:
- CAS原理:理解了比较并交换的原子操作原理及其优缺点
- atomic包:掌握了各种原子类的使用场景和方法
- ABA问题:学习了ABA问题的产生原因和解决方案
- LongAdder:了解了高性能累加器的设计思想和实现原理
- 实战应用:通过实际案例掌握了无锁编程的最佳实践
无锁编程是高性能并发编程的重要技术,从传统的悲观锁到乐观的CAS,再到分段思想的LongAdder
,体现了并发编程技术的不断演进。掌握这些技术,能够帮助我们在高并发场景下构建更高效、更可靠的系统。
祝你变得更强!